这篇“springboot2+es7怎么使用RestHighLevelClient”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“springboot2+es7怎么使用RestHighLevelClient”文章吧。
由于spring和es的集成并不是特别友好,es的高低版本兼容问题、api更新频率高等问题,所以我选择是官网提供的原生Client(RestHighLevelClient),但又不想去关注es的配置类以及和spring的集成配置、jar包冲突等问题,所以使用spring-boot-starter-data-elasticsearch。
一、引入依赖jar
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
二、application.properties配置
spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202spring.elasticsearch.rest.connection-timeout=5sspring.elasticsearch.rest.read-timeout=30slogging.level.org.springframework.data.convert.CustomConversions=error
spring-boot-starter-data-elasticsearch中自动装配es的配置类:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。
ElasticsearchRestClientAutoConfiguration:
@ConditionalOnClass({RestHighLevelClient.class})@ConditionalOnMissingBean({RestClient.class})@EnableConfigurationProperties({ElasticsearchRestClientProperties.class})public class ElasticsearchRestClientAutoConfiguration { @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestHighLevelClient.class}) static class RestHighLevelClientConfiguration { RestHighLevelClientConfiguration() { } @Bean RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) { return new RestHighLevelClient(restClientBuilder); } } @Configuration( proxyBeanMethods = false ) @ConditionalOnMissingBean({RestClientBuilder.class}) static class RestClientBuilderConfiguration { RestClientBuilderConfiguration() { } @Bean RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) { return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties); } @Bean RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) { HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> { return new HttpHost[x$0]; }); RestClientBuilder builder = RestClient.builder(hosts); builder.setHttpClientConfigCallback((httpClientBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(httpClientBuilder); }); return httpClientBuilder; }); builder.setRequestConfigCallback((requestConfigBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(requestConfigBuilder); }); return requestConfigBuilder; }); builderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return builder; } private HttpHost createHttpHost(String uri) { try { return this.createHttpHost(URI.create(uri)); } catch (IllegalArgumentException var3) { return HttpHost.create(uri); } } private HttpHost createHttpHost(URI uri) { if (!StringUtils.hasLength(uri.getUserInfo())) { return HttpHost.create(uri.toString()); } else { try { return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString()); } catch (URISyntaxException var3) { throw new IllegalStateException(var3); } } } }}
ElasticsearchRestClientProperties:
@ConfigurationProperties( prefix = "spring.elasticsearch.rest")public class ElasticsearchRestClientProperties { private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200")); private String username; private String password; private Duration connectionTimeout = Duration.ofSeconds(1L); private Duration readTimeout = Duration.ofSeconds(30L); public ElasticsearchRestClientProperties() { } public List<String> getUris() { return this.uris; } public void setUris(List<String> uris) { this.uris = uris; } public String getUsername() { return this.username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public Duration getConnectionTimeout() { return this.connectionTimeout; } public void setConnectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; } public Duration getReadTimeout() { return this.readTimeout; } public void setReadTimeout(Duration readTimeout) { this.readTimeout = readTimeout; }}
三、使用
ES基本操作持久层
@Repository@Slf4jpublic class EsRepository { @Resource private RestHighLevelClient highLevelClient; public boolean existIndex(String index) { try { return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean createIndex(String index, Map<String, Object> columnMap) { if (existIndex(index)) { return Boolean.FALSE; } CreateIndexRequest request = new CreateIndexRequest(index); if (columnMap != null && columnMap.size() > 0) { Map<String, Object> source = new HashMap<>(); source.put("properties", columnMap); request.mapping(source); } try { highLevelClient.indices().create(request, RequestOptions.DEFAULT); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, columnMap={}", index, columnMap, e); } return Boolean.FALSE; } public boolean deleteIndex(String index) { try { if (existIndex(index)) { AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); return response.isAcknowledged(); } } catch (Exception e) { log.error("es持久层异常!index={}", index, e); } return Boolean.FALSE; } public boolean insert(String index, String jsonString) { IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(new Snowflake().nextIdStr()); indexRequest.source(jsonString, XContentType.JSON); try { log.info("indexRequest={}", indexRequest); IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("indexResponse={}", indexResponse); return Boolean.TRUE; } catch (IOException e) { log.error("es持久层异常!index={}, jsonString={}", index, jsonString, e); } return Boolean.FALSE; } public boolean update(String index, Map<String, Object> dataMap) { UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString()); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.doc(dataMap); try { highLevelClient.update(updateRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, dataMap={}", index, dataMap, e); return Boolean.FALSE; } return Boolean.TRUE; } public boolean delete(String index, String id) { DeleteRequest deleteRequest = new DeleteRequest(index, id); try { highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("es持久层异常!index={}, id={}", index, id, e); return Boolean.FALSE; } return Boolean.TRUE; } }
ES查询持久层
@Repository@Slf4jpublic class EsSearchRepository { @Resource private RestHighLevelClient highLevelClient; public EsQueryRespPO<Map<String, Object>> searchPage(EsQueryReqPO queryPO) { // 默认分页参数设置 if (queryPO.getPageNum() == null) { queryPO.setPageNum(1); } if (queryPO.getPageSize() == null) { queryPO.setPageSize(10); } // 设置索引 SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); // 封装查询源对象 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder); // 查询条件 sourceBuilder.query(queryPO.getQuery()); // 排序字段 if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); sourceBuilder.sort(order); } // 开始行数,默认0 sourceBuilder.from((queryPO.getPageNum() - 1) * queryPO.getPageSize()); // 页大小,默认10 sourceBuilder.size(queryPO.getPageSize()); // 查询结果 SearchResponse searchResponse = null; try { // log.info("es查询请求对象:searchRequest={}", searchRequest); log.info("es查询请求对象source:sourceBuilder={}", searchRequest.source()); // 执行搜索 searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("es查询响应结果:searchResponse={}", searchResponse); } catch (IOException e) { log.error("es查询,IO异常!searchRequest={}", searchRequest, e); // 异常处理 return EsQueryRespPO.error("es查询,IO异常!"); } if (RestStatus.OK.equals(searchResponse.status())) { // 解析对象 SearchHit[] hits = searchResponse.getHits().getHits(); // 获取source List<Map<String, Object>> sourceList = Arrays.stream(hits).map(SearchHit::getSourceAsMap).collect(Collectors.toList()); long totalHits = searchResponse.getHits().getTotalHits().value; return EsQueryRespPO.success(sourceList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); } else { log.error("es查询返回的状态码异常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); return EsQueryRespPO.error("es查询返回的状态码异常!"); } } public EsQueryRespPO<AggregationBucketPO> searchAggregation(EsQueryReqPO queryPO) { // 设置索引 SearchRequest searchRequest = new SearchRequest(queryPO.getIndex()); // 封装查询源对象 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder); // 查询条件 sourceBuilder.query(queryPO.getQuery()); // 排序字段 if (StringUtils.isNotBlank(queryPO.getSortField()) && queryPO.getSort() != null) { FieldSortBuilder order = new FieldSortBuilder(queryPO.getSortField()).order(queryPO.getSort()); sourceBuilder.sort(order); } // 页大小0,只返回聚合结果 sourceBuilder.size(0); // 设置聚合查询,可以设置多个聚合查询条件,只要聚合查询命名不同就行 // 聚合分组条件, group by sourceBuilder.aggregation(queryPO.getTermsAggregation()); // 聚合统计条件, count分组后的数据,计算分组后的总大小 sourceBuilder.aggregation(queryPO.getCardinalityAggregation()); // 查询结果 SearchResponse searchResponse = null; try { // log.info("es查询请求对象:searchRequest={}", searchRequest); log.info("es查询请求对象source:sourceBuilder={}", searchRequest.source()); // 执行搜索 searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("es查询响应结果:searchResponse={}", searchResponse); } catch (IOException e) { log.error("es查询,IO异常!searchRequest={}", searchRequest, e); return EsQueryRespPO.error("es查询,IO异常!"); } if (RestStatus.OK.equals(searchResponse.status())) { // 解析对象 Aggregations aggregations = searchResponse.getAggregations(); long docTotal = searchResponse.getHits().getTotalHits().value; // 遍历terms聚合结果 Terms terms = aggregations.get(queryPO.getTermsAggregation().getName()); List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> { String key = bucket.getKeyAsString(); long docCount = bucket.getDocCount(); return new AggregationBucketPO(key, docCount, docTotal); }).collect(Collectors.toList()); // 总数量 Cardinality cardinality = aggregations.get(queryPO.getCardinalityAggregation().getName()); long totalHits = cardinality.getValue(); return EsQueryRespPO.success(bucketList, queryPO.getPageNum(), queryPO.getPageSize(), totalHits); } else { log.error("es查询返回的状态码异常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); return EsQueryRespPO.error("es查询返回的状态码异常!"); } }}
其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等类如下:
@Datapublic class EsQueryReqPO { String[] index; QueryBuilder query; String sortField; SortOrder sort; private Integer pageNum; private Integer pageSize; private TermsAggregationBuilder termsAggregation; private CardinalityAggregationBuilder cardinalityAggregation;}@Data@NoArgsConstructor@AllArgsConstructorpublic class EsQueryRespPO<T> { private Boolean success; private String message; private Integer pageNum; private Integer pageSize; private Long totalSize; private List<T> sourceList; public static <T> EsQueryRespPO<T> success(List<T> sourceList, Integer pageNum, Integer pageSize, Long totalSize) { EsQueryRespPO<T> esQueryRespPO = new EsQueryRespPO<>(); esQueryRespPO.setSuccess(true); esQueryRespPO.setSourceList(sourceList); esQueryRespPO.setPageNum(pageNum); esQueryRespPO.setPageSize(pageSize); esQueryRespPO.setTotalSize(totalSize); return esQueryRespPO; } public static EsQueryRespPO error() { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage("es查询异常"); return esQueryRespPO; } public static EsQueryRespPO error(String message) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage(message); return esQueryRespPO; } }
@Data@NoArgsConstructor@AllArgsConstructorpublic class AggregationBucketPO { private String key; private Long docCount; private Long docTotal; }
ES多级(二级)聚合分桶查询
import com.yy.armor.manager.common.exception.EsException;import com.yy.armor.manager.persist.es.po.AggregationBucketPO;import com.yy.armor.manager.persist.es.po.EsMultiAggregationReqPO; import java.io.IOException;import java.util.List;import java.util.stream.Collectors;import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections4.CollectionUtils;import org.apache.commons.lang3.StringUtils;import org.apache.pulsar.shade.org.apache.commons.compress.utils.Lists;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.bucket.terms.Terms;import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.sort.FieldSortBuilder;import org.springframework.stereotype.Repository; @Repository@Slf4jpublic class EsSearchRepository { @Resource private RestHighLevelClient highLevelClient; public List<AggregationBucketPO> searchMultiAggregation(EsMultiAggregationReqPO reqPO) { // 设置索引 SearchRequest searchRequest = new SearchRequest(reqPO.getIndex()); // 封装查询源对象 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder); // 查询条件 sourceBuilder.query(reqPO.getQuery()); // 排序字段 if (StringUtils.isNotBlank(reqPO.getSortField()) && reqPO.getSort() != null) { FieldSortBuilder order = new FieldSortBuilder(reqPO.getSortField()).order(reqPO.getSort()); sourceBuilder.sort(order); } // 页大小0,只返回聚合结果 sourceBuilder.size(0); // 聚合分桶。创建terms桶聚合,聚合名字=terms_by_XXX, 字段=XXX TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getField()).field(reqPO.getField()); if (reqPO.getFieldSize() != null) { termsAggregationBuilder.size(reqPO.getFieldSize()); } // 二级聚合分桶 TermsAggregationBuilder subTermsAggregationBuilder = AggregationBuilders.terms("terms_by_" + reqPO.getSubField()).field(reqPO.getSubField()); if (reqPO.getSubFieldSize() != null) { subTermsAggregationBuilder.size(reqPO.getSubFieldSize()); } termsAggregationBuilder.subAggregation(subTermsAggregationBuilder); // 聚合分组条件 sourceBuilder.aggregation(termsAggregationBuilder); // 查询结果 SearchResponse searchResponse = null; try { log.info("es查询请求对象source:sourceBuilder={}", searchRequest.source()); // 执行搜索 searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); log.info("es查询响应结果:searchResponse={}", searchResponse); } catch (IOException e) { log.error("es查询,IO异常!searchRequest={}", searchRequest, e); throw new EsException("es查询,IO异常!"); } if (RestStatus.OK.equals(searchResponse.status())) { // 遍历terms聚合结果 Terms terms = searchResponse.getAggregations().get(termsAggregationBuilder.getName()); List<AggregationBucketPO> bucketList = terms.getBuckets().stream().map(bucket -> { // 一级聚合分桶的数据 String key = bucket.getKeyAsString(); long docCount = bucket.getDocCount(); // 二级聚合分桶的数据 Terms subTerms = bucket.getAggregations().get(subTermsAggregationBuilder.getName()); List<AggregationBucketPO> subBucketList = convertTerms(subTerms); return new AggregationBucketPO(key, docCount, subBucketList); }).collect(Collectors.toList()); return bucketList; } else { log.error("es查询返回的状态码异常!searchResponse.status={}, searchRequest={}", searchResponse.status(), searchRequest); throw new EsException("es查询返回的状态码异常!"); } } private List<AggregationBucketPO> convertTerms(Terms terms) { if (CollectionUtils.isEmpty(terms.getBuckets())) { return Lists.newArrayList(); } return terms.getBuckets().stream().map(bucket -> { String key = bucket.getKeyAsString(); long docCount = bucket.getDocCount(); return new AggregationBucketPO(key, docCount); }).collect(Collectors.toList()); }}
其中,EsMultiAggregationReqPO、AggregationBucketPO类如下:
@Datapublic class EsMultiAggregationReqPO { String[] index; QueryBuilder query; private String field; private String subField; private Integer fieldSize; private Integer subFieldSize; String sortField; SortOrder sort;}
@Data@NoArgsConstructor@AllArgsConstructorpublic class AggregationBucketPO { private String key; private Long docCount; private List<AggregationBucketPO> subBucketList; public AggregationBucketPO(String key, Long docCount) { this.key = key; this.docCount = docCount; }}
二级聚合分桶测试代码
@PostConstruct private void init() { // 查询对象的封装 EsMultiAggregationReqPO reqPO = new EsMultiAggregationReqPO(); reqPO.setIndex(new String[]{"test_log"}); List<Long> ids = Lists.newArrayList(); ids.add(140L); ids.add(141L); ids.add(142L); QueryBuilder queryBuilder4 = QueryBuilders.termsQuery("eventId", ids); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder4); reqPO.setQuery(queryBuilder); reqPO.setField("eventId"); reqPO.setFieldSize(9999); reqPO.setSubField("riskFlag"); // 执行查询 List<AggregationBucketPO> esQueryRespPO = searchMultiAggregation(reqPO); System.out.println("esQueryRespPO=" + esQueryRespPO); }
其它
如果没有用spring-boot-starter-data-elasticsearch来自动注入es组件,那么需要自己做es client的注入,es配置类如下:
@Configurationpublic class EsClientConfig { @Value("${spring.elasticsearch.rest.uris}") private List<String> uris; @Bean public RestHighLevelClient restHighLevelClient() { List<HttpHost> httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList()); HttpHost[] httpHost = new HttpHost[uris.size()]; httpHostList.toArray(httpHost); RestClientBuilder clientBuilder = RestClient.builder(httpHost); return new RestHighLevelClient(clientBuilder); } }
Snowflake是hutool包里的,导包:
<!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.14</version> </dependency>
聚合查询的测试用例:
@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTest(classes = StartApplication.class)public class EsTest { @Resource private EsSearchRepository esSearchRepository; @Test public void testSearchAggregation() { // 查询对象的封装 EsQueryReqPO queryPO = new EsQueryReqPO(); queryPO.setIndex(new String[]{"yzh2", "yzh3"}); queryPO.setPageNum(1); queryPO.setPageSize(10); // 时间戳范围 QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp") .from(System.currentTimeMillis() - 1000) .to(System.currentTimeMillis()); // 登录标识 QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang"); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2); queryPO.setQuery(queryBuilder); // 根据userName分组。创建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("terms_by_userName").field("payload.userName.keyword"); termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum()); termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null) .from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize())); queryPO.setTermsAggregation(termsAggregationBuilder); // 根据userName聚合count统计. cardinality名=count_userName, 字段=payload.userName.keyword CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders .cardinality("count_userName").field("payload.userName.keyword"); queryPO.setCardinalityAggregation(cardinalityAggregationBuilder); // 执行查询 EsQueryRespPO<AggregationBucketPO> esQueryRespPO = esSearchRepository.searchAggregation(queryPO); }}
以上就是关于“springboot2+es7怎么使用RestHighLevelClient”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。