文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

使用java怎么对elasticsearch进行操作

2023-05-30 16:48

关注

这期内容当中小编将会给大家带来有关使用java怎么对elasticsearch进行操作,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

Java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息

集群名称

默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。

嗅探功能

通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。

查询类型SearchType.QUERY_THEN_FETCH

es 查询共有4种查询类型

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式

DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:

将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。

1, 获取client, 两种方式获取

@Before public void before() throws Exception {  Map<String, String> map = new HashMap<String, String>();   map.put("cluster.name", "elasticsearch_wenbronk");   Settings.Builder settings = Settings.builder().put(map);   client = TransportClient.builder().settings(settings).build()       .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300")));  }
@Before public void before11() throws Exception {  // 创建客户端, 使用的默认集群名, "elasticSearch"//  client = TransportClient.builder().build()//    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));  // 通过setting对象指定集群配置信息, 配置的集群名  Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名//    .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知//    .put("network.host", "192.168.50.37")    .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上//    .put("client.transport.nodes_sampler_interval", 5) //报错,//    .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,    .build();   client = TransportClient.builder().settings(settings).build()     .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));   // 默认5s   // 多久打开连接, 默认5s   System.out.println("success connect"); }

PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...

其他参数的意义:

使用java怎么对elasticsearch进行操作

代码:

package com.wenbronk.javaes;import java.net.InetAddress;import java.net.InetSocketAddress;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.TimeUnit;import org.elasticsearch.action.bulk.BackoffPolicy;import org.elasticsearch.action.bulk.BulkProcessor;import org.elasticsearch.action.bulk.BulkProcessor.Listener;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.bulk.BulkRequestBuilder;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.get.GetResponse;import org.elasticsearch.action.get.MultiGetItemResponse;import org.elasticsearch.action.get.MultiGetResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.action.update.UpdateResponse;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.cluster.node.DiscoveryNode;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.InetSocketTransportAddress;import org.elasticsearch.common.unit.ByteSizeUnit;import org.elasticsearch.common.unit.ByteSizeValue;import org.elasticsearch.common.unit.TimeValue;import org.elasticsearch.common.xcontent.XContentBuilder;import org.elasticsearch.common.xcontent.XContentFactory;import org.elasticsearch.script.Script;import org.junit.Before;import org.junit.Test;import com.alibaba.fastjson.JSONObject;public class JavaESTest { private TransportClient client; private IndexRequest source;  // @Before public void before() throws Exception {  Map<String, String> map = new HashMap<String, String>();   map.put("cluster.name", "elasticsearch_wenbronk");   Settings.Builder settings = Settings.builder().put(map);   client = TransportClient.builder().settings(settings).build()       .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300")));  } @Test public void testInfo() {  List<DiscoveryNode> nodes = client.connectedNodes();  for (DiscoveryNode node : nodes) {   System.out.println(node.getHostAddress());  } }   public String createJson1() {  String json = "{" +    "\"user\":\"kimchy\"," +    "\"postDate\":\"2013-01-30\"," +    "\"message\":\"trying out Elasticsearch\"" +   "}";  return json; }   public Map<String, Object> createJson2() {  Map<String,Object> json = new HashMap<String, Object>();  json.put("user", "kimchy");  json.put("postDate", new Date());  json.put("message", "trying out elasticsearch");  return json; }   public JSONObject createJson3() {  JSONObject json = new JSONObject();  json.put("user", "kimchy");  json.put("postDate", new Date());  json.put("message", "trying out elasticsearch");  return json; }   public XContentBuilder createJson4() throws Exception {  // 创建json对象, 其中一个创建json的方式  XContentBuilder source = XContentFactory.jsonBuilder()   .startObject()    .field("user", "kimchy")    .field("postDate", new Date())    .field("message", "trying to out ElasticSearch")   .endObject();  return source; }   @Test public void test1() throws Exception {  XContentBuilder source = createJson4();  // 存json入索引中  IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get();//  // 结果获取  String index = response.getIndex();  String type = response.getType();  String id = response.getId();  long version = response.getVersion();  boolean created = response.isCreated();  System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created); }  @Test public void testGet() {//  GetResponse response = client.prepareGet("twitter", "tweet", "1")//        .get();  GetResponse response = client.prepareGet("twitter", "tweet", "1")    .setOperationThreaded(false) // 线程安全    .get();  System.out.println(response.getSourceAsString()); }   @Test public void testDelete() {  DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")    .get();  String index = response.getIndex();  String type = response.getType();  String id = response.getId();  long version = response.getVersion();  System.out.println(index + " : " + type + ": " + id + ": " + version); }   @Test public void testUpdate() throws Exception {  UpdateRequest updateRequest = new UpdateRequest();  updateRequest.index("twitter");  updateRequest.type("tweet");  updateRequest.id("1");  updateRequest.doc(XContentFactory.jsonBuilder()    .startObject()    // 对没有的字段添加, 对已有的字段替换     .field("gender", "male")     .field("message", "hello")    .endObject());  UpdateResponse response = client.update(updateRequest).get();    // 打印  String index = response.getIndex();  String type = response.getType();  String id = response.getId();  long version = response.getVersion();  System.out.println(index + " : " + type + ": " + id + ": " + version); }   @Test public void testUpdate2() throws Exception {  // 使用Script对象进行更新//  UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")//    .setScript(new Script("hits._source.gender = \"male\""))//    .get();    // 使用XContFactory.jsonBuilder() 进行更新//  UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")//    .setDoc(XContentFactory.jsonBuilder()//      .startObject()//       .field("gender", "malelelele")//      .endObject()).get();    // 使用updateRequest对象及script//  UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")//    .script(new Script("ctx._source.gender=\"male\""));//  UpdateResponse response = client.update(updateRequest).get();    // 使用updateRequest对象及documents进行更新  UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1")    .doc(XContentFactory.jsonBuilder()      .startObject()       .field("gender", "male")      .endObject()     )).get();  System.out.println(response.getIndex()); }   @Test public void testUpdate3() throws InterruptedException, Exception {  UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")   .script(new Script("ctx._source.gender=\"male\""));  UpdateResponse response = client.update(updateRequest).get(); }   @Test public void testUpsert() throws Exception {  // 设置查询条件, 查找不到则添加生效  IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2")   .source(XContentFactory.jsonBuilder()    .startObject()     .field("name", "214")     .field("gender", "gfrerq")    .endObject());  // 设置更新, 查找到更新下面的设置  UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2")   .doc(XContentFactory.jsonBuilder()     .startObject()      .field("user", "wenbronk")     .endObject())   .upsert(indexRequest);    client.update(upsert).get(); }   @Test public void testMultiGet() {  MultiGetResponse multiGetResponse = client.prepareMultiGet()  .add("twitter", "tweet", "1")  .add("twitter", "tweet", "2", "3", "4")  .add("anothoer", "type", "foo")  .get();    for (MultiGetItemResponse itemResponse : multiGetResponse) {   GetResponse response = itemResponse.getResponse();   if (response.isExists()) {    String sourceAsString = response.getSourceAsString();    System.out.println(sourceAsString);   }  } }   @Test public void testBulk() throws Exception {  BulkRequestBuilder bulkRequest = client.prepareBulk();  bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")    .setSource(XContentFactory.jsonBuilder()      .startObject()       .field("user", "kimchy")       .field("postDate", new Date())       .field("message", "trying out Elasticsearch")      .endObject()));  bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")    .setSource(XContentFactory.jsonBuilder()      .startObject()       .field("user", "kimchy")       .field("postDate", new Date())       .field("message", "another post")      .endObject()));  BulkResponse response = bulkRequest.get();  System.out.println(response.getHeaders()); }   @Test public void testBulkProcessor() throws Exception {  // 创建BulkPorcessor对象  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {   public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {    // TODO Auto-generated method stub   }      // 执行出错时执行   public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {    // TODO Auto-generated method stub   }      public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {    // TODO Auto-generated method stub   }  })  // 1w次请求执行一次bulk  .setBulkActions(10000)  // 1gb的数据刷新一次bulk  .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))  // 固定5s必须刷新一次  .setFlushInterval(TimeValue.timeValueSeconds(5))  // 并发请求数量, 0不并发, 1并发允许执行  .setConcurrentRequests(1)  // 设置退避, 100ms后执行, 最大请求3次  .setBackoffPolicy(    BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))  .build();    // 添加单次请求  bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));  bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));    // 关闭  bulkProcessor.awaitClose(10, TimeUnit.MINUTES);  // 或者  bulkProcessor.close(); }}

tes2代码:

package com.wenbronk.javaes;import java.net.InetSocketAddress;import org.apache.lucene.queryparser.xml.FilterBuilderFactory;import org.elasticsearch.action.search.MultiSearchResponse;import org.elasticsearch.action.search.SearchRequestBuilder;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.search.SearchType;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.settings.Settings.Builder;import org.elasticsearch.common.transport.InetSocketTransportAddress;import org.elasticsearch.common.unit.TimeValue;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.aggregations.Aggregation;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;import org.elasticsearch.search.sort.SortOrder;import org.elasticsearch.search.sort.SortParseElement;import org.junit.Before;import org.junit.Test;public class JavaESTest2 { private TransportClient client;  @Before public void testBefore() {  Builder builder = Settings.settingsBuilder();  builder.put("cluster.name", "wenbronk_escluster");//    .put("client.transport.ignore_cluster_name", true);  Settings settings = builder.build();    org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();  TransportClient client1 = transportBuild.settings(settings).build();  client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));  System.out.println("success connect to escluster");   }   @Test public void testSearch() {//  SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1");//  SearchResponse response = searchRequestBuilder.setTypes("type1", "type2")//       .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//       .setQuery(QueryBuilders.termQuery("user", "test"))//       .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1))//       .setFrom(0).setSize(2).setExplain(true)//       .execute().actionGet();  SearchResponse response = client.prepareSearch()    .execute().actionGet();//  SearchHits hits = response.getHits();//  for (SearchHit searchHit : hits) {//   for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) {//    SearchHitField next = iterator.next();//    System.out.println(next.getValues());//   }//  }  System.out.println(response); }   @Test public void testScrolls() {  QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");    SearchResponse response = client.prepareSearch("twitter")  .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)  .setScroll(new TimeValue(60000))  .setQuery(queryBuilder)  .setSize(100).execute().actionGet();    while(true) {   for (SearchHit hit : response.getHits().getHits()) {    System.out.println("i am coming");   }   SearchResponse response2 = client.prepareSearchScroll(response.getScrollId())    .setScroll(new TimeValue(60000)).execute().actionGet();   if (response2.getHits().getHits().length == 0) {    System.out.println("oh no=====");    break;   }  }   }   @Test public void testMultiSearch() {  QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");  SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);    QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");  SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);    MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2)    .execute().actionGet();  long nbHits = 0;  for (MultiSearchResponse.Item item : multiResponse.getResponses()) {   SearchResponse response = item.getResponse();   nbHits = response.getHits().getTotalHits();   SearchHit[] hits = response.getHits().getHits();   System.out.println(nbHits);  }   }   @Test public void testAggregation() {  SearchResponse response = client.prepareSearch()    .setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分    .addAggregation(AggregationBuilders.terms("term").field("user"))    .addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth")     .interval(DateHistogramInterval.YEAR))    .execute().actionGet();  Aggregation aggregation2 = response.getAggregations().get("term");  Aggregation aggregation = response.getAggregations().get("agg2");//  SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet(); }   @Test public void testTerminateAfter() {  SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();  if (response.isTerminatedEarly()) {   System.out.println("ternimate");  } }   @Test public void testFilter() {  SearchResponse response = client.prepareSearch("twitter")     .setTypes("")     .setQuery(QueryBuilders.matchAllQuery()) //查询所有     .setSearchType(SearchType.QUERY_THEN_FETCH) //    .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19) //      .includeLower(true).includeUpper(true)) //    .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))     .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面     .get();  }   @Test public void testGroupBy() {  client.prepareSearch("twitter").setTypes("tweet")  .setQuery(QueryBuilders.matchAllQuery())  .setSearchType(SearchType.QUERY_THEN_FETCH)  .addAggregation(AggregationBuilders.terms("user")    .field("user").size(0)  // 根据user进行分组           // size(0) 也是10  ).get(); } }

上述就是小编为大家分享的使用java怎么对elasticsearch进行操作了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯