xiaoming728

xiaoming728

Spring Data Elasticsearch 5.2

Spring Data Elasticsearch 5.2

一、配置依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

二、Elasticsearch配置

springboot对于elasticsearch活着的其它一些工具比如kafka,redis的配置都可以在resorces/application.properties中配置,新版的配置与旧版的不太一样,从ElasticsearchProperties.java中可以看到,前缀是spring.elasticsearch,主机配置项已经变成了uris的List,以前好像是spring.data.elasticsearch.cluster-nodes,因此我们在resorces/application.properties这样子配置

server:
  port: 8090
 
 
spring:
  redis:
    database: 0
    port: 6379
    host: 192.168.69.201
    password: wtl1992
 
  elasticsearch:
    password: 'XXXXXXXXXXXX'
    username: elastic
    # uris:是个以,分隔的数组
    uris: http://ljxwtl.cn:9200

三、配置实体类和Repository

  1. 在类前加上注解@Document(indexName = “discusspost”),indexName为索引名称,新版本好像移除了分片和副本的配置。

  1. 配置类的属性,和之前一样

@Document(indexName = "discusspost")
public class DiscussPost {
    @Id
    private int id;
    @Field(type = FieldType.Integer)
    private int userId;
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String title;
    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String content;
    @Field(type = FieldType.Integer)
    private int type;
    @Field(type = FieldType.Integer)
    private int status;
    @Field(type = FieldType.Date)
    private Date createTime;
    @Field(type = FieldType.Integer)
    private int commentCount;
    @Field(type = FieldType.Double)
    private double score;
  1. 声明Repository接口,程序中注入这个接口,可以完成一些简单的操作,比如插入更新,全亮查询等

@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {

}

四、匹配查询

ElasticsearchTemplate已经过时

匹配查询的主要作用就是提供一个关键字和字段,将关键字分词后将ES中对应字段包含相应关键字的结果返回,主要运用在搜索功能中。

	//注入elasticsearchTemplate,用它进行匹配查询,Repository功能不支持
    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
        @Test void testSearch(){

		//查询标准构建,匹配字段“content”和“title”中包含“你是谁啊”关键字的数据
		//在这里使用matches,而不是contains,contains必须包含完整的关键字,而matches不需要,如果要匹配更多字段使用or或者and。
        Criteria criteria = new Criteria("title").matches("你是谁啊").or(new Criteria("content").matches("你是谁啊"));
        //添加排序条件,下面写了两种排序条件的方式,其中第二种方式如果有多个排序条件,测试下来并不满足需求。
        //因此采用第一种并在最后构建匹配查询的时候依次添加条件。
        //yes
//        Sort sort = Sort.by(Sort.Direction.DESC,"type");
//        Sort sort2 = Sort.by(Sort.Direction.DESC,"createTime");
//        Sort sort3 = Sort.by(Sort.Direction.DESC,"createTime");
        //no
//        sort.and(Sort.by(Sort.Direction.ASC,"createTime"));
//        sort.and(Sort.sort(DiscussPost.class).by(DiscussPost::getCreateTime).ascending());
        //高亮查询
        List<HighlightField> highlightFieldList = new ArrayList<>();
        HighlightField highlightField = new HighlightField("title", HighlightFieldParameters.builder().withPreTags("<em>").withPostTags("</em>").build());
        highlightFieldList.add(highlightField);
        highlightField = new HighlightField("content", HighlightFieldParameters.builder().withPreTags("<em>").withPostTags("</em>").build());
        highlightFieldList.add(highlightField);

        Highlight highlight = new Highlight(highlightFieldList);
        HighlightQuery highlightQuery = new HighlightQuery(highlight,DiscussPost.class);
        //构建查询
        CriteriaQueryBuilder builder = new CriteriaQueryBuilder(criteria)
                .withSort(Sort.by(Sort.Direction.DESC,"type"))
                .withSort(Sort.by(Sort.Direction.DESC,"score"))
                .withSort(Sort.by(Sort.Direction.DESC,"createTime"))
                .withHighlightQuery(highlightQuery)
                .withPageable(PageRequest.of(0,10));
        CriteriaQuery query = new CriteriaQuery(builder);
        //通过elasticsearchTemplate查询
        SearchHits<DiscussPost> result = elasticsearchTemplate.search(query, DiscussPost.class);
        //处理结果
        List<SearchHit<DiscussPost>> searchHitList = result.getSearchHits();
        List<DiscussPost> discussPostList = new ArrayList<>();
        for(SearchHit<DiscussPost> hit:searchHitList){
            DiscussPost post = hit.getContent();
            //将高亮结果添加到返回的结果类中显示
            var titleHighlight = hit.getHighlightField("title");
            if(titleHighlight.size()!=0){
                post.setTitle(titleHighlight.get(0));
            }
            var contentHighlight = hit.getHighlightField("content");
            if(contentHighlight.size()!=0){
                post.setContent(contentHighlight.get(0));
            }
            discussPostList.add(post);

        }
        //构建Page对象
        Page<DiscussPost> page = new PageImpl<>(discussPostList,PageRequest.of(9,10), result.getTotalHits());
        for (DiscussPost post:page){
            System.out.println(post);
        }
    }

五、测试示例

package ljxwtl;
 
import ljxwtl.dao.PersonRepository;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.*;
import org.springframework.data.elasticsearch.core.clients.elasticsearch7.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * @author: wtl
 * @License: (C) Copyright 2022, wtl Corporation Limited.
 * @Contact: 1050100468@qq.com
 * @Date: 2022/4/10 7:43
 * @Version: 1.0
 * @Description:
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SpringBootApplicationMain.class)
public class ApplicationMainTest {
 
    @Resource
    private PersonRepository personRepository;
 
    @Resource
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
 
    @Test
    public void test(){
        System.out.println(personRepository);
        System.out.println(elasticsearchRestTemplate);
    }
 
    @Test
    public void createIndex(){
        IndexCoordinates indexCoordinates = IndexCoordinates.of("person");
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(indexCoordinates);
        boolean created = indexOperations.create();
        System.out.println(created);
    }
 
    @Test
    public void setIndexMappings(){
        IndexCoordinates indexCoordinates = IndexCoordinates.of("person");
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(indexCoordinates);
        boolean created = indexOperations.putMapping(Person.class);
        System.out.println(created);
    }
 
    @Test
    public void createIndexWithSetSettings(){
        IndexCoordinates indexCoordinates = IndexCoordinates.of("person");
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(indexCoordinates);
        Map<String,Object> settings = new HashMap<>();
 
        //----------------------------------------静态设置开始----------------------------------------------
        // 静态设置:只能在索引创建时或者在状态为 closed index(闭合的索引)上设置
 
        //主分片数,默认为5.只能在创建索引时设置,不能修改
        settings.put("index.number_of_shards",2);
 
        //是否应在索引打开前检查分片是否损坏,当检查到分片损坏将禁止分片被打开
        settings.put("index.shard.check_on_startup","false");//默认值
//        settings.put("index.shard.check_on_startup","checksum");//检查物理损坏
//        settings.put("index.shard.check_on_startup","true");//检查物理和逻辑损坏,这将消耗大量内存和CPU
//        settings.put("index.shard.check_on_startup","fix");//检查物理和逻辑损坏。有损坏的分片将被集群自动删除,这可能导致数据丢失
 
        //自定义路由值可以转发的目的分片数。默认为 1,只能在索引创建时设置。此值必须小于index.number_of_shards
        settings.put("index.routing_partition_size",1);
 
        //默认使用LZ4压缩方式存储数据,也可以设置为 best_compression,它使用 DEFLATE 方式以牺牲字段存储性能为代价来获得更高的压缩比例。
        settings.put("index.codec","best_compression");
        //----------------------------------------静态设置结束----------------------------------------------
 
 
        //----------------------------------------动态设置开始----------------------------------------------
        //每个主分片的副本数。默认为 1。
        settings.put("index.number_of_replicas",0);
 
        //基于可用节点的数量自动分配副本数量,默认为 false(即禁用此功能)
        settings.put("index.auto_expand_replicas",false);
 
        //执行刷新操作的频率,这使得索引的最近更改可以被搜索。默认为 1s。可以设置为 -1 以禁用刷新。
        settings.put("index.refresh_interval","1s");
 
        //用于索引搜索的 from+size 的最大值。默认为 10000
        settings.put("index.max_result_window",10000);
 
        // 在搜索此索引中 rescore 的 window_size 的最大值
        settings.put("index.max_rescore_window",10000);
 
        //设置为 true 使索引和索引元数据为只读,false 为允许写入和元数据更改。
        settings.put("index.blocks.read_only",false);
 
        // 设置为 true 可禁用对索引的读取操作
        settings.put("index.blocks.read",false);
 
        //设置为 true 可禁用对索引的写入操作。
        settings.put("index.blocks.write",false);
 
        // 设置为 true 可禁用索引元数据的读取和写入。
        settings.put("index.blocks.metadata",false);
 
        //索引的每个分片上可用的最大刷新侦听器数
        settings.put("index.max_refresh_listeners",1000);
        //----------------------------------------动态设置结束----------------------------------------------
 
        boolean created = indexOperations.create(settings);
        System.out.println(created);
    }
 
    @Test
    public void deleteIndex(){
        IndexCoordinates indexCoordinates = IndexCoordinates.of("person");
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(indexCoordinates);
        boolean isDeleted = indexOperations.delete();
        System.out.println(isDeleted);
    }
 
    @Test
    public void saveOne(){
        Person person = new Person();
        person.setId("1");
        person.setName("王天龙");
        person.setAge(30);
        person.setSex("man");
        person.setTel("1111111");
        person.setCreateTime(new Date());
 
        Person savePerson = personRepository.save(person);
 
        System.out.println(savePerson);
    }
 
    @Test
    public void updateOne(){
        Document document = Document.create();
        document.setId("1");
        document.put("name","天龙战神");
 
        UpdateQuery.Builder builder = UpdateQuery.builder("1").withDocument(document).withScriptedUpsert(true);
 
        UpdateResponse updateResponse = elasticsearchRestTemplate.update(builder.build(), IndexCoordinates.of("person"));
 
        System.out.println(updateResponse.getResult());
    }
 
    @Test
    public void saveAll(){
        List<Person> personList = new ArrayList<>(3);
        Person person2 = new Person();
        person2.setId("2");
        person2.setName("王天祥");
        person2.setAge(26);
        person2.setSex("男");
        person2.setTel("222222222222");
        person2.setCreateTime(new Date());
        personList.add(person2);
 
        Person person3 = new Person();
        person3.setId("3");
        person3.setName("王杰");
        person3.setAge(31);
        person3.setSex("女");
        person3.setTel("3333333333");
        person3.setCreateTime(new Date());
        personList.add(person3);
        personRepository.saveAll(personList);
    }
 
    @Test
    public void searchMatchAll() throws IOException {
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder().must(new MatchAllQueryBuilder());
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder);
 
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
 
        searchHits.getSearchHits().forEach(personSearchHit -> {
            Person content = personSearchHit.getContent();
            System.out.println(content);
        });
    }
 
 
    @Test
    public void searchBoolMustWhere(){
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.matchQuery("name","王天龙"));
        boolQueryBuilder.must(QueryBuilders.matchQuery("age",30));
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder);
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
 
        searchHits.forEach(personSearchHit -> {
            System.out.println(personSearchHit.getContent());
        });
    }
 
    @Test
    public void searchShouldWhere(){
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.should(QueryBuilders.matchQuery("name","天龙"));
        boolQueryBuilder.should(QueryBuilders.matchQuery("age",26));
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder);
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
        searchHits.forEach(personSearchHit -> {
            System.out.println(personSearchHit.getContent());
        });
    }
 
    @Test
    public void searchRange(){
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("age").gte(26).lt(31);
        boolQueryBuilder.filter(rangeQueryBuilder);
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder);
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
        searchHits.forEach(personSearchHit -> {
            System.out.println(personSearchHit.getContent());
        });
    }
 
 
    /**
     * 聚合搜索
     * 聚合搜索,aggs,类似于group by,对age字段进行聚合,
     */
    @Test
    public void aggregations() {
        NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
                .withAggregations(AggregationBuilders.terms("count").field("sex"))
                .build();
 
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
        //取出聚合结果
        ElasticsearchAggregations elasticsearchAggregations = (ElasticsearchAggregations) searchHits.getAggregations();
        Aggregations aggregations = elasticsearchAggregations.aggregations();
        Terms terms = (Terms) aggregations.asMap().get("count");
 
        for (Terms.Bucket bucket : terms.getBuckets()) {
            String keyAsString = bucket.getKeyAsString();   // 聚合字段列的值
            long docCount = bucket.getDocCount();           // 聚合字段对应的数量
            System.out.println(keyAsString + " " + docCount);
        }
    }
 
    /**
     * 分页实现
     */
    @Test
    public void searchWithPageable(){
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder().must(new MatchAllQueryBuilder());
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                //分页实现
                .withPageable(PageRequest.of(0,2));
 
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
 
        searchHits.getSearchHits().forEach(personSearchHit -> {
            Person content = personSearchHit.getContent();
            System.out.println(content);
        });
    }
 
    /**
     * 排序实现
     */
    @Test
    public void searchWithSort(){
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder().must(new MatchAllQueryBuilder());
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                //分页实现
                .withPageable(PageRequest.of(0,10))
                //排序
                .withSorts(SortBuilders.fieldSort("createTime").order(SortOrder.DESC),SortBuilders.fieldSort("age").order(SortOrder.DESC));
 
        NativeSearchQuery nativeSearchQuery = nativeSearchQueryBuilder.build();
        SearchHits<Person> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Person.class);
 
        searchHits.getSearchHits().forEach(personSearchHit -> {
            Person content = personSearchHit.getContent();
            System.out.println(content);
        });
    }
 
    //高并发场景下模拟ES分布式悲观锁的实现
    @Test
    public void highConcurrencyWithPessimisticLock() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
 
        AtomicInteger failedCount = new AtomicInteger(0);
 
        /**
         * 在testindex未创建的前提下,并发情况下,只能有一个成功,19个失败
         */
        for (int i = 0; i < 20; i++) {
            new Thread(new Task(countDownLatch,failedCount,elasticsearchRestTemplate)).start();
        }
 
        countDownLatch.countDown();
 
        TimeUnit.SECONDS.sleep(10);
 
        System.out.println(failedCount.get());
    }
 
    private class Task implements Runnable {
 
        private CountDownLatch countDownLatch;
        private AtomicInteger failedCount;
        private ElasticsearchRestTemplate elasticsearchRestTemplate;
 
        public Task(CountDownLatch countDownLatch,AtomicInteger failedCount,ElasticsearchRestTemplate elasticsearchRestTemplate){
            this.countDownLatch = countDownLatch;
            this.failedCount = failedCount;
            this.elasticsearchRestTemplate = elasticsearchRestTemplate;
        }
 
        @Override
        public void run() {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                IndexCoordinates indexCoordinates = IndexCoordinates.of("testindex");
                IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(indexCoordinates);
                boolean created = indexOperations.create();
                System.out.println(created);
            } catch (Exception e) {
                failedCount.getAndIncrement();
            }
        }
    }
 
}

参考资料

Elasticsearch8.2 版本的Spring Data Elasticsearch的API使用:https://blog.csdn.net/wtl1992/article/details/124072854

Springboot3.1+Elasticsearch8.x匹配查询: https://blog.csdn.net/weixin_43189060/article/details/131247199