本文主要是对 elasticsearch-rest-high-level-client 是学习总结。
1、es端口:
默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。
- 9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。
- 9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。
以往,9300端口也被用于客户端库的连接,然而这种类型的交互在我们的官方客户端已被废弃,其他地方也不支持。
2、es的java客户端
客户端 | 优点 | 缺点 | 说明 |
Java Low Level Rest Client | 与ES版本之间没有关系,适用于作为所有版本ES的客户端 | ||
Java High Level Rest Client | 使用最多 | 使用需与ES版本保持一致 | 基于Low Level Rest Client,它提供了更多的接口。注意:7.15版本之后将被弃用 |
TransportClient | 使用Transport 接口进行通信,能够使用ES集群中的一些特性,性能最好 | JAR包版本需与ES集群版本一致,ES集群升级,客户端也跟着升级到相同版本 | 过时产品,7版本之后不再支持 |
Elasticsearch Java API Client | 最新的es客户端 | 文档少 |
详细的elasticsearch java客户端发展史详见:https://blog.csdn.net/cloudbigdata/article/details/126296206
3、RestHighLevelClient介绍
JavaREST客户端有两种模式:
- Java Low Level REST Client:ES官方的低级客户端。低级别的客户端通过http与Elasticearch集群通信。
- Java High Level REST Client:ES官方的高级客户端。基于上面的低级客户端,也是通过HTTP与ES集群进行通信。它提供了更多的接口。
注意事项:
客户端(Client) Jar包的版本尽量不要大于Elasticsearch本体的版本,否则可能出现客户端中使用的某些API在Elasticsearch中不支持。
4、springboot集成RestHighLevelClient
下面介绍下 SpringBoot 如何通过 elasticsearch-rest-high-level-client 工具操作ElasticSearch。当然也可以通过spring-data-elasticsearch来操作ElasticSearch,而本文仅是 elasticsearch-rest-high-level-client 的案例介绍。
这里需要说一下,能使用RestHighLevelClient尽量使用它,为什么不推荐使用 Spring 家族封装的 spring-data-elasticsearch。主要原因是灵活性和更新速度,Spring 将 ElasticSearch 过度封装,让开发者很难跟 ES 的 DSL 查询语句进行关联。再者就是更新速度,ES 的更新速度是非常快,但是 spring-data-elasticsearch 更新速度比较缓慢。并且spring-data-elasticsearch在Elasticsearch6.x和7.x版本上的Java API差距很大,如果升级版本需要花点时间来了解。spring-data-elasticsearch的底层其实也是基于elasticsearch-rest-high-level-client的api。
4.1、maven依赖
org.elasticsearch elasticsearch 6.8.2 org.elasticsearch.client elasticsearch-rest-client 6.8.2 org.elasticsearch.client elasticsearch-rest-high-level-client 6.8.2 com.alibaba fastjson 1.2.28 commons-lang commons-lang 2.6 commons-io commons-io 2.6
4.2、es配置
4.2.1、application.yml 配置文件
# es集群名称elasticsearch.clusterName=single-node-cluster#es用户名elasticsearch.userName=elastic#es密码elasticsearch.password=elastic# es host ip 地址(集群):本次使用的是单机模式elasticsearch.hosts=43.142.243.124:9200# es 请求方式elasticsearch.scheme=http# es 连接超时时间elasticsearch.connectTimeOut=1000# es socket 连接超时时间elasticsearch.socketTimeOut=30000# es 请求超时时间elasticsearch.connectionRequestTimeOut=500# es 最大连接数elasticsearch.maxConnectNum=100# es 每个路由的最大连接数elasticsearch.maxConnectNumPerRoute=100
4.2.2、java 连接配置类
写一个 Java 配置类读取 application 中的配置信息:
package com.example.demo.config;import lombok.extern.slf4j.Slf4j;import org.apache.http.HttpHost;import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.impl.client.BasicCredentialsProvider;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.net.InetAddress;import java.util.ArrayList;import java.util.List;@Slf4j@Data@Configuration@ConfigurationProperties(prefix = "elasticsearch")public class ElasticsearchConfig { // es host ip 地址(集群) private String hosts; // es用户名 private String userName; // es密码 private String password; // es 请求方式 private String scheme; // es集群名称 private String clusterName; // es 连接超时时间 private int connectTimeOut; // es socket 连接超时时间 private int socketTimeOut; // es 请求超时时间 private int connectionRequestTimeOut; // es 最大连接数 private int maxConnectNum; // es 每个路由的最大连接数 private int maxConnectNumPerRoute; @Bean(name = "restHighLevelClient") public RestHighLevelClient restHighLevelClient() { // 拆分地址// List hostLists = new ArrayList<>();// String[] hostList = hosts.split(",");// for (String addr : hostList) {// String host = addr.split(":")[0];// String port = addr.split(":")[1];// hostLists.add(new HttpHost(host, Integer.parseInt(port), scheme));// }// // 转换成 HttpHost 数组// HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{}); // 此处为单节点es String host = hosts.split(":")[0]; String port = hosts.split(":")[1]; HttpHost httpHost = new HttpHost(host,Integer.parseInt(port)); // 构建连接对象 RestClientBuilder builder = RestClient.builder(httpHost); // 设置用户名、密码 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(userName,password)); // 连接延时配置 builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; }); // 连接数配置 builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(maxConnectNum); httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder; }); return new RestHighLevelClient(builder); }}
3、mybatis配置
package com.example.test.dao;import com.example.test.beans.Goods;import java.util.List;public interface GoodsMapper { List findAll();}
select `id`, `title`, `price`, `stock`, `saleNum`, `createTime`, `categoryName`, `brandName`, `status`, `spec` from goods
4、实体对象
package com.example.test.beans;import com.alibaba.fastjson.annotation.JSONField;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;import java.math.BigDecimal;import java.util.Date;public class Goods { private Long id; private String title; private BigDecimal price; private Integer stock; private Integer saleNum; private String categoryName; private String brandName; private Integer status; private String spec; @JSONField(format = "yyyy-MM-dd HH:mm:ss") private Date createTime; public Goods() { } public Goods(Long id, String title, BigDecimal price, Integer stock, Integer saleNum, String categoryName, String brandName, Integer status, String spec, Date createTime) { this.id = id; this.title = title; this.price = price; this.stock = stock; this.saleNum = saleNum; this.categoryName = categoryName; this.brandName = brandName; this.status = status; this.spec = spec; this.createTime = createTime; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } public Integer getStock() { return stock; } public void setStock(Integer stock) { this.stock = stock; } public Integer getSaleNum() { return saleNum; } public void setSaleNum(Integer saleNum) { this.saleNum = saleNum; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } public String getBrandName() { return brandName; } public void setBrandName(String brandName) { this.brandName = brandName; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public String getSpec() { return spec; } public void setSpec(String spec) { this.spec = spec; } @Override public String toString() { return "Goods{" + "id=" + id + ", title='" + title + '\'' + ", price=" + price + ", stock=" + stock + ", saleNum=" + saleNum + ", categoryName='" + categoryName + '\'' + ", brandName='" + brandName + '\'' + ", status=" + status + ", spec='" + spec + '\'' + ", createTime=" + createTime + '}'; }}
5、索引操作service
IndexTestService:
package com.example.test.service.es;import org.elasticsearch.cluster.metadata.MappingMetaData;import java.util.Map;public interface IndexTestService { public boolean indexCreate() throws Exception; public Map getMapping(String indexName) throws Exception; public boolean indexDelete(String indexName) throws Exception; public boolean indexExists(String indexName) throws Exception;}
IndexTestServiceImpl :
package com.example.test.service.impl.es;import com.example.test.service.es.IndexTestService;import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;import org.elasticsearch.action.support.master.AcknowledgedResponse;import org.elasticsearch.client.IndicesClient;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.client.indices.CreateIndexRequest;import org.elasticsearch.client.indices.CreateIndexResponse;import org.elasticsearch.client.indices.GetIndexRequest;import org.elasticsearch.client.indices.GetIndexResponse;import org.elasticsearch.cluster.metadata.MappingMetaData;import org.elasticsearch.common.xcontent.XContentType;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.Map;@Servicepublic class IndexTestServiceImpl implements IndexTestService { @Autowired RestHighLevelClient restHighLevelClient; @Override public boolean indexCreate() throws Exception { // 1、创建 创建索引request 参数:索引名mess CreateIndexRequest indexRequest = new CreateIndexRequest("goods"); // 2、设置索引的settings // 3、设置索引的mappings String mapping = "{\n" + "\n" + "\t\t\"properties\": {\n" + "\t\t \"brandName\": {\n" + "\t\t\t\"type\": \"keyword\"\n" + "\t\t },\n" + "\t\t \"categoryName\": {\n" + "\t\t\t\"type\": \"keyword\"\n" + "\t\t },\n" + "\t\t \"createTime\": {\n" + "\t\t\t\"type\": \"date\",\n" + "\t\t\t\"format\": \"yyyy-MM-dd HH:mm:ss\"\n" + "\t\t },\n" + "\t\t \"id\": {\n" + "\t\t\t\"type\": \"long\"\n" + "\t\t },\n" + "\t\t \"price\": {\n" + "\t\t\t\"type\": \"double\"\n" + "\t\t },\n" + "\t\t \"saleNum\": {\n" + "\t\t\t\"type\": \"integer\"\n" + "\t\t },\n" + "\t\t \"status\": {\n" + "\t\t\t\"type\": \"integer\"\n" + "\t\t },\n" + "\t\t \"stock\": {\n" + "\t\t\t\"type\": \"integer\"\n" + "\t\t },\n" + "\t\t\"spec\": {\n" + "\t\t\t\"type\": \"text\",\n" + "\t\t\t\"analyzer\": \"ik_max_word\",\n" + "\t\t\t\"search_analyzer\": \"ik_smart\"\n" + "\t\t },\n" + "\t\t \"title\": {\n" + "\t\t\t\"type\": \"text\",\n" + "\t\t\t\"analyzer\": \"ik_max_word\",\n" + "\t\t\t\"search_analyzer\": \"ik_smart\"\n" + "\t\t }\n" + "\t\t}\n" + " }"; // 4、 设置索引的别名 // 5、 发送请求 // 5.1 同步方式发送请求 IndicesClient indicesClient = restHighLevelClient.indices(); indexRequest.mapping(mapping, XContentType.JSON); // 请求服务器 CreateIndexResponse response = indicesClient.create(indexRequest, RequestOptions.DEFAULT); return response.isAcknowledged(); } @Override public Map getMapping(String indexName) throws Exception { IndicesClient indicesClient = restHighLevelClient.indices(); // 创建get请求 GetIndexRequest request = new GetIndexRequest(indexName); // 发送get请求 GetIndexResponse response = indicesClient.get(request, RequestOptions.DEFAULT); // 获取表结构 Map mappings = response.getMappings(); Map sourceAsMap = mappings.get(indexName).getSourceAsMap(); return sourceAsMap; } @Override public boolean indexDelete(String indexName) throws Exception { IndicesClient indicesClient = restHighLevelClient.indices(); // 创建delete请求方式 DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); // 发送delete请求 AcknowledgedResponse response = indicesClient.delete(deleteIndexRequest, RequestOptions.DEFAULT); return response.isAcknowledged(); } @Override public boolean indexExists(String indexName) throws Exception { IndicesClient indicesClient = restHighLevelClient.indices(); // 创建get请求 GetIndexRequest request = new GetIndexRequest(indexName); // 判断索引库是否存在 boolean result = indicesClient.exists(request, RequestOptions.DEFAULT); return result; }}
测试代码:
package com.example.test;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import com.example.common.utils.java.StackTraceUtil;import com.example.common.utils.java.UtilMisc;import com.example.test.beans.Goods;import com.example.test.service.es.DocumentTestService;import com.example.test.service.es.EsQueryDataService;import com.example.test.service.es.IndexTestService;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.rest.RestStatus;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;import java.util.Date;import java.util.List;import java.util.Map;@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class ElasticsearchTest1 { @Autowired IndexTestService indexTestService; @Test public void indexCreate() { boolean flag = false; try { flag = indexTestService.indexCreate(); } catch (Exception e) { log.error("创建索引失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("创建索引是否成功:" + flag); } @Test public void getMapping() { try { Map indexMap = indexTestService.getMapping("goods"); // 将bean 转化为格式化后的json字符串 String pretty1 = JSON.toJSONString(indexMap, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat); log.info("索引信息:{}", pretty1); } catch (Exception e) { log.error("获取索引失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } } @Test public void deleteIndex() { boolean flag = false; try { flag = indexTestService.indexDelete("goods"); } catch (Exception e) { log.error("删除索引库失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("删除索引库是否成功:" + flag); } @Test public void indexExists() { boolean flag = false; try { flag = indexTestService.indexExists("goods"); } catch (Exception e) { log.error("校验索引库是否存在,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("索引库是否存在:" + flag); }}
6、文档操作service
测试数据:https://pan.baidu.com/s/1A_ckKV7wsLJQJoeeALgkig?pwd=r68c
DocumentTestService:
package com.example.test.service.es;import com.example.test.beans.Goods;import org.elasticsearch.rest.RestStatus;import java.io.IOException;public interface DocumentTestService { public RestStatus addDocument(String indexName, String type, Goods goods) throws IOException; public Goods getDocument(String indexName, String type, String id) throws Exception; public RestStatus updateDocument(String indexName, String type, Goods goods) throws IOException; public RestStatus deleteDocument(String indexName, String type, String id) throws IOException; public RestStatus batchImportGoodsData() throws IOException;}
DocumentTestServiceImpl :
package com.example.test.service.impl.es;import com.alibaba.fastjson.JSON;import com.example.common.utils.ObjectUtil;import com.example.common.utils.java.BeanMapUtils;import com.example.test.beans.Goods;import com.example.test.dao.GoodsMapper;import com.example.test.service.es.DocumentTestService;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.get.GetRequest;import org.elasticsearch.action.get.GetResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.rest.RestStatus;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;import java.util.List;import java.util.Map;@Slf4j@Servicepublic class DocumentTestServiceImpl implements DocumentTestService { @Autowired RestHighLevelClient restHighLevelClient; @Resource GoodsMapper goodsMapper; @Override public RestStatus addDocument(String indexName, String type, Goods goods) throws IOException { // 默认类型为_doc type = ObjectUtil.isEmptyObject(type) ? "_doc" : type; // 将对象转为json String data = JSON.toJSONString(goods); // 创建索引请求对象 IndexRequest indexRequest = new IndexRequest(indexName,type).id(goods.getId() + "").source(data, XContentType.JSON); // 执行增加文档 IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); RestStatus status = response.status(); log.info("创建状态:{}", status); return status; } @Override public Goods getDocument(String indexName, String type, String id) throws Exception { // 默认类型为_doc type = ObjectUtil.isEmptyObject(type) ? "_doc" : type; // 创建获取请求对象 GetRequest getRequest = new GetRequest(indexName, type, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map sourceAsMap = response.getSourceAsMap(); Goods goods = BeanMapUtils.mapToBean(sourceAsMap,Goods.class); return goods; } @Override public RestStatus updateDocument(String indexName, String type, Goods goods) throws IOException { // 默认类型为_doc type = ObjectUtil.isEmptyObject(type) ? "_doc" : type; // 将对象转为json String data = JSON.toJSONString(goods); // 创建索引请求对象 UpdateRequest updateRequest = new UpdateRequest(indexName, type, String.valueOf(goods.getId())); // 设置更新文档内容 updateRequest.doc(data, XContentType.JSON); // 执行更新文档 UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("创建状态:{}", response.status()); RestStatus status = response.status(); log.info("更新文档信息响应状态:{}", status); return status; } @Override public RestStatus deleteDocument(String indexName, String type, String id) throws IOException { // 默认类型为_doc type = ObjectUtil.isEmptyObject(type) ? "_doc" : type; // 创建删除请求对象 DeleteRequest deleteRequest = new DeleteRequest(indexName, type, id); // 执行删除文档 DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); RestStatus status = response.status(); log.info("删除文档响应状态:{}", status); return status; } @Override public RestStatus batchImportGoodsData() throws IOException { //1.查询所有数据,mysql List goodsList = goodsMapper.findAll(); //2.bulk导入 BulkRequest bulkRequest = new BulkRequest(); //2.1 循环goodsList,创建IndexRequest添加数据 for (Goods goods : goodsList) { //将goods对象转换为json字符串 String data = JSON.toJSONString(goods);//map --> {} IndexRequest indexRequest = new IndexRequest("goods","_doc"); indexRequest.id(goods.getId() + "").source(data, XContentType.JSON); bulkRequest.add(indexRequest); } BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); return response.status(); }}
测试代码:
package com.example.test;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import com.example.common.utils.java.StackTraceUtil;import com.example.common.utils.java.UtilMisc;import com.example.test.beans.Goods;import com.example.test.service.es.DocumentTestService;import com.example.test.service.es.EsQueryDataService;import com.example.test.service.es.IndexTestService;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.rest.RestStatus;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;import java.util.Date;import java.util.List;import java.util.Map;@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class ElasticsearchTest1 { @Autowired DocumentTestService documentTestService; @Test public void addDocument() { // 创建商品信息 Goods goods = new Goods(); goods.setId(1L); goods.setTitle("Apple iPhone 13 Pro (A2639) 256GB 远峰蓝色 支持移动联通电信5G 双卡双待手机"); goods.setPrice(new BigDecimal("8799.00")); goods.setStock(1000); goods.setSaleNum(599); goods.setCategoryName("手机"); goods.setBrandName("Apple"); goods.setStatus(0); goods.setCreateTime(new Date()); // 返回状态 RestStatus restStatus = null; try { restStatus = documentTestService.addDocument("goods","_doc", goods); } catch (Exception e) { log.error("添加文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("添加文档响应状态:" + restStatus); } @Test public void getDocument() { // 返回信息 Goods goods = null; try { goods = documentTestService.getDocument("goods", "_doc", "1"); } catch (Exception e) { log.error("查询文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("查询的文档信息:" + goods); } @Test public void updateDocument() { // 创建商品信息 Goods goods = new Goods(); goods.setTitle("Apple iPhone 13 Pro Max (A2644) 256GB 远峰蓝色 支持移动联通电信5G 双卡双待手机"); goods.setPrice(new BigDecimal("9999")); goods.setId(1L); // 返回状态 RestStatus restStatus = null; try { restStatus = documentTestService.updateDocument("goods", "_doc", goods); } catch (Exception e) { log.error("更新文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("更新文档响应状态:" + restStatus); } @Test public void deleteDocument() { // 返回状态 RestStatus restStatus = null; try { restStatus = documentTestService.deleteDocument("goods", "_doc", "1"); } catch (Exception e) { log.error("删除文档失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("删除文档响应状态:" + restStatus); } @Test public void importDocument() { // 返回状态 RestStatus restStatus = null; try { restStatus = documentTestService.batchImportGoodsData(); } catch (Exception e) { log.error("批量导入数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("批量导入数据响应状态:" + restStatus); }}
7、DSL高级查询操作
EsQueryDataService:
package com.example.test.service.es;import java.io.IOException;import java.util.List;import java.util.Map;public interface EsQueryDataService { public List termQuery(String indexName, String columnName, Object value, Class classz); public List termsQuery(String indexName, String columnName, Object[] dataArgs, Class classz); public List matchAllQuery(String indexName, Class classz, int startIndex, int pageSize, List orderList, String columnName, Object value); public List matchPhraseQuery(String indexName, Class classz, String columnName, Object value); public List matchMultiQuery(String indexName, Class classz, String[] fields, Object text); public List wildcardQuery(String indexName, Class classz,String field, String text); public List fuzzyQuery(String indexName, Class classz, String field, String text); public List boolQuery(String indexName,Class beanClass); public void metricQuery(String indexName); public void bucketQuery(String indexName,String bucketField, String bucketFieldAlias); public void subBucketQuery(String indexName,String bucketField, String bucketFieldAlias,String avgFiled,String avgFiledAlias); public void subSubAgg(String indexName);}
EsQueryDataServiceImpl :
package com.example.test.service.impl.es;import com.alibaba.fastjson.JSON;import com.example.common.exception.myexception.MyBusinessException;import com.example.common.utils.ObjectUtil;import com.example.common.utils.java.StackTraceUtil;import com.example.test.beans.Goods;import com.example.test.service.es.EsQueryDataService;import lombok.extern.slf4j.Slf4j;import org.apache.poi.ss.formula.functions.T;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.text.Text;import org.elasticsearch.common.unit.Fuzziness;import org.elasticsearch.index.query.*;import org.elasticsearch.rest.RestStatus;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.aggregations.AggregationBuilder;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.Aggregations;import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;import org.elasticsearch.search.aggregations.bucket.terms.Terms;import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;import org.elasticsearch.search.aggregations.metrics.avg.ParsedAvg;import org.elasticsearch.search.aggregations.metrics.max.ParsedMax;import org.elasticsearch.search.aggregations.metrics.min.ParsedMin;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;import org.elasticsearch.search.sort.SortBuilder;import org.elasticsearch.search.sort.SortOrder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import javax.rmi.CORBA.Util;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Set;@Slf4j@Servicepublic class EsQueryDataServiceImpl implements EsQueryDataService { @Autowired RestHighLevelClient restHighLevelClient; @Override public List termQuery(String indexName, String field, Object value, Class beanClass) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery(field, value)); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("精确查询数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","精确查询数据失败"); } return list; } @Override public List termsQuery(String indexName, String field, Object[] dataArgs, Class beanClass) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件(注意:termsQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termsQuery(field, dataArgs)); // 展示100条,默认只展示10条记录 searchSourceBuilder.size(100); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("单字段多内容查询数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","单字段多内容查询数据失败"); } return list; } @Override public List matchAllQuery(String indexName, Class beanClass, int startIndex, int pageSize, List orderList, String field, Object value) { // 查询的数据列表 List list = new ArrayList<>(); try { // 创建查询源构造器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 构建查询条件 if (!ObjectUtil.isEmptyObject(field) && !ObjectUtil.isEmptyObject(value)) { MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery(field, value); searchSourceBuilder.query(matchQueryBuilder); } else { MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery(); searchSourceBuilder.query(matchAllQueryBuilder); } // 设置分页 searchSourceBuilder.from(startIndex); searchSourceBuilder.size(pageSize); // 设置排序 if (orderList != null) { for(String order : orderList) { // -开头代表:倒序 boolean flag = order.startsWith("-"); SortOrder sort = flag ? SortOrder.DESC: SortOrder.ASC; order = flag ? order.substring(1) : order; searchSourceBuilder.sort(order, sort); } } // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("查询所有数据失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","查询所有数据失败"); } return list; } @Override public List matchPhraseQuery(String indexName, Class beanClass, String field, Object value) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchPhraseQuery(field, value)); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","词语匹配查询失败"); } return list; } @Override public List matchMultiQuery(String indexName, Class beanClass, String[] fields, Object text) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 设置查询条件 searchSourceBuilder.query(QueryBuilders.multiMatchQuery(text, fields)); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","词语匹配查询失败"); } return list; } @Override public List wildcardQuery(String indexName, Class beanClass,String field, String text) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.wildcardQuery(field, text)); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("通配符查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","通配符查询失败"); } return list; } @Override public List fuzzyQuery(String indexName, Class beanClass, String field, String text) { // 查询的数据列表 List list = new ArrayList<>(); try { // 构建查询条件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.fuzzyQuery(field, text).fuzziness(Fuzziness.AUTO)); // 执行查询es数据 queryEsData(indexName, beanClass, list, searchSourceBuilder); } catch (IOException e) { log.error("通配符查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999","通配符查询失败"); } return list; } @Override public List boolQuery(String indexName,Class beanClass) { // 查询的数据列表 List list = new ArrayList<>(); try { // 创建 Bool 查询构建器 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); // 构建查询条件 boolQueryBuilder.must(QueryBuilders.matchQuery("title", "三星")); // 标题 boolQueryBuilder.must(QueryBuilders.matchQuery("spec", "联通3G"));// 说明书 boolQueryBuilder.filter().add(QueryBuilders.rangeQuery("createTime").format("yyyy").gte("2018").lte("2022")); // 创建时间 // 构建查询源构建器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.size(100); // 甚至返回字段 // 如果查询的属性很少,那就使用includes,而excludes设置为空数组 // 如果排序的属性很少,那就使用excludes,而includes设置为空数组 String[] includes = {"title", "categoryName", "price"}; String[] excludes = {}; searchSourceBuilder.fetchSource(includes, excludes); // 高亮设置 // 设置高亮三要素: field: 你的高亮字段 , preTags :前缀 , postTags:后缀 HighlightBuilder highlightBuilder = new HighlightBuilder().field("title").preTags("").postTags(""); highlightBuilder.field("spec").preTags("").postTags(""); searchSourceBuilder.highlighter(highlightBuilder); // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 根据状态和数据条数验证是否返回了数据 if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits() > 0) { SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { // 将 JSON 转换成对象 T bean = JSON.parseObject(hit.getSourceAsString(), beanClass); // 获取高亮的数据 HighlightField highlightField = hit.getHighlightFields().get("title"); System.out.println("高亮名称:" + highlightField.getFragments()[0].string()); // 替换掉原来的数据 Text[] fragments = highlightField.getFragments(); if (fragments != null && fragments.length > 0) { StringBuilder title = new StringBuilder(); for (Text fragment : fragments) {title.append(fragment); } // 获取method对象,其中包含方法名称和参数列表 Method setTitle = beanClass.getMethod("setTitle", String.class); if (setTitle != null) {// 执行method,bean为实例对象,后面是方法参数列表;setTitle没有返回值setTitle.invoke(bean, title.toString()); } } list.add(bean); } } } catch (Exception e) { log.error("布尔查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999", "布尔查询失败"); } return list; } @Override public void metricQuery(String indexName) { try { // 构建查询条件 MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery(); // 创建查询源构造器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(matchAllQueryBuilder); // 获取最贵的商品 AggregationBuilder maxPrice = AggregationBuilders.max("maxPrice").field("price"); searchSourceBuilder.aggregation(maxPrice); // 获取最便宜的商品 AggregationBuilder minPrice = AggregationBuilders.min("minPrice").field("price"); searchSourceBuilder.aggregation(minPrice); // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); ParsedMax max = aggregations.get("maxPrice"); log.info("最贵的价格:" + max.getValue()); ParsedMin min = aggregations.get("minPrice"); log.info("最便宜的价格:" + min.getValue()); } catch (Exception e) { log.error("指标聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999", "指标聚合分析查询失败"); } } @Override public void bucketQuery(String indexName,String bucketField, String bucketFieldAlias) { try { // 构建查询条件 MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery(); // 创建查询源构造器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(matchAllQueryBuilder); // 根据bucketField进行分组查询 TermsAggregationBuilder aggBrandName = AggregationBuilders.terms(bucketFieldAlias).field(bucketField); searchSourceBuilder.aggregation(aggBrandName); // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); ParsedStringTerms aggBrandName1 = aggregations.get(bucketField); // 分组结果数据 for (Terms.Bucket bucket : aggBrandName1.getBuckets()) { log.info(bucket.getKeyAsString() + "====" + bucket.getDocCount()); } } catch (IOException e) { log.error("分桶聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999", "分桶聚合分析查询失败"); } } @Override public void subBucketQuery(String indexName,String bucketField, String bucketFieldAlias,String avgFiled,String avgFiledAlias) { try { // 构建查询条件 MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery(); // 创建查询源构造器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(matchAllQueryBuilder); // 根据 bucketField进行分组查询,并且获取分类信息中 指定字段的平均值 TermsAggregationBuilder subAggregation = AggregationBuilders.terms(bucketFieldAlias).field(bucketField) .subAggregation(AggregationBuilders.avg(avgFiledAlias).field(avgFiled)); searchSourceBuilder.aggregation(subAggregation); // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Aggregations aggregations = searchResponse.getAggregations(); ParsedStringTerms aggBrandName1 = aggregations.get(bucketFieldAlias); for (Terms.Bucket bucket : aggBrandName1.getBuckets()) { // 获取聚合后的 组内字段平均值,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象 ParsedAvg avgPrice = bucket.getAggregations().get(avgFiledAlias); log.info(bucket.getKeyAsString() + "====" + avgPrice.getValueAsString()); } } catch (IOException e) { log.error("分桶聚合分析查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999", "分桶聚合分析查询失败"); } } @Override public void subSubAgg(String indexName) { try { // 构建查询条件 MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery(); // 创建查询源构造器 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(matchAllQueryBuilder); // 注意这里聚合写的位置不要写错,很容易搞混,错一个括号就不对了 TermsAggregationBuilder subAggregation = AggregationBuilders.terms("categoryNameAgg").field("categoryName") .subAggregation(AggregationBuilders.avg("categoryNameAvgPrice").field("price")) .subAggregation(AggregationBuilders.terms("brandNameAgg").field("brandName").subAggregation(AggregationBuilders.avg("brandNameAvgPrice").field("price"))); searchSourceBuilder.aggregation(subAggregation); // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //获取总记录数 log.info("totalHits = " + searchResponse.getHits().getTotalHits()); // 获取聚合信息 Aggregations aggregations = searchResponse.getAggregations(); ParsedStringTerms categoryNameAgg = aggregations.get("categoryNameAgg"); //获取值返回 for (Terms.Bucket bucket : categoryNameAgg.getBuckets()) { // 获取聚合后的分类名称 String categoryName = bucket.getKeyAsString(); // 获取聚合命中的文档数量 long docCount = bucket.getDocCount(); // 获取聚合后的分类的平均价格,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象 ParsedAvg avgPrice = bucket.getAggregations().get("categoryNameAvgPrice"); System.out.println(categoryName + "======平均价:" + avgPrice.getValue() + "======数量:" + docCount); ParsedStringTerms brandNameAgg = bucket.getAggregations().get("brandNameAgg"); for (Terms.Bucket brandeNameAggBucket : brandNameAgg.getBuckets()) { // 获取聚合后的品牌名称 String brandName = brandeNameAggBucket.getKeyAsString(); // 获取聚合后的品牌的平均价格,注意返回值不是Aggregation对象,而是指定的ParsedAvg对象 ParsedAvg brandNameAvgPrice = brandeNameAggBucket.getAggregations().get("brandNameAvgPrice"); log.info(" " + brandName + "======" + brandNameAvgPrice.getValue()); } } } catch (IOException e) { log.error("综合聚合查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); throw new MyBusinessException("99999", "综合聚合查询失败"); } } private void queryEsData(String indexName, Class beanClass, List list, SearchSourceBuilder searchSourceBuilder) throws IOException { // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(searchSourceBuilder); // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 根据状态和数据条数验证是否返回了数据 if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits() > 0) { SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { // 将 JSON 转换成对象 Goods userInfo = JSON.parseObject(hit.getSourceAsString(), Goods.class); // 将 JSON 转换成对象 T bean = JSON.parseObject(hit.getSourceAsString(), beanClass); list.add(bean); } } }}
测试代码:
package com.example.test;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.serializer.SerializerFeature;import com.example.common.utils.java.StackTraceUtil;import com.example.common.utils.java.UtilMisc;import com.example.test.beans.Goods;import com.example.test.service.es.EsQueryDataService;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.rest.RestStatus;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;import java.util.Date;import java.util.List;import java.util.Map;@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class ElasticsearchTest1 { @Autowired EsQueryDataService esQueryDataService; @Test public void termQuery() { // 返回数据 List goodsList = null; try { goodsList = esQueryDataService.termQuery("goods", "title", "华为", Goods.class); } catch (Exception e) { log.error("单字段精确查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("单字段精确查询结果:" + goodsList); } @Test public void termsQuery() { // 返回数据 List goodsList = null; try { String[] args = {"华为", "OPPO", "TCL"}; goodsList = esQueryDataService.termsQuery("goods", "title", args, Goods.class); } catch (Exception e) { log.error("单字段多内容精确查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("单字段多内容精确查询结果:" + goodsList); } @Test public void matchQuery() { // 返回数据 List goodsList = null; try { List orderList = UtilMisc.toList("-price","-saleNum"); goodsList = esQueryDataService.matchAllQuery("goods", Goods.class,0,3,orderList,"title", "华为"); } catch (Exception e) { log.error("匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("匹配查询结果:" + goodsList); } @Test public void matchPhraseQuery() { // 返回数据 List goodsList = null; try { goodsList = esQueryDataService.matchPhraseQuery("goods", Goods.class,"title", "华为"); } catch (Exception e) { log.error("词语匹配查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("词语匹配查询结果:" + goodsList); } @Test public void matchMultiQuery() { // 返回数据 List goodsList = null; try { String[] fields = {"title", "categoryName"}; goodsList = esQueryDataService.matchMultiQuery("goods", Goods.class,fields,"手机"); } catch (Exception e) { log.error("内容在多字段中进行查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("内容在多字段中进行查询结果:" + goodsList); } @Test public void wildcardQuery() { // 返回数据 List goodsList = null; try { goodsList = esQueryDataService.wildcardQuery("goods", Goods.class,"title","*三"); } catch (Exception e) { log.error("通配符查询查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("通配符查询结果:" + goodsList); } @Test public void fuzzyQuery() { // 返回数据 List goodsList = null; try { goodsList = esQueryDataService.fuzzyQuery("goods", Goods.class,"title","三"); } catch (Exception e) { log.error("模糊查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("模糊查询结果:" + goodsList); } @Test public void boolQuery() { // 返回数据 List goodsList = null; try { goodsList = esQueryDataService.boolQuery("goods", Goods.class); } catch (Exception e) { log.error("布尔查询失败,错误信息:" + StackTraceUtil.getStackTraceAsString(e)); } System.out.println("布尔查询结果:" + goodsList); } @Test public void metricQuery() { esQueryDataService.metricQuery("goods"); } @Test public void bucketQuery() { esQueryDataService.bucketQuery("goods","brandName","brandNameName"); } @Test public void subBucketQuery() { esQueryDataService.subBucketQuery("goods","brandName","brandNameName","price","avgPrice"); } @Test public void subSubAgg() { esQueryDataService.subSubAgg("goods"); }}
参考文章:SpringBoot整合RestHighLevelClient案例
来源地址:https://blog.csdn.net/weixin_40482816/article/details/126955661