文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何在java项目中使用elasticsearch与logstash

2023-06-06 11:49

关注

这篇文章将为大家详细讲解有关如何在java项目中使用elasticsearch与logstash,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

Java的特点有哪些

Java的特点有哪些1.Java语言作为静态面向对象编程语言的代表,实现了面向对象理论,允许程序员以优雅的思维方式进行复杂的编程。2.Java具有简单性、面向对象、分布式、安全性、平台独立与可移植性、动态性等特点。3.使用Java可以编写桌面应用程序、Web应用程序、分布式系统和嵌入式系统应用程序等。

环境准备

1.1 创建普通用户

#创建用户useradd querylog#设置密码passwd queylog#授权sudo权限查找sudoers文件位置 whereis sudoers#修改文件为可编辑 chmod -v u+w /etc/sudoers#编辑文件vi /etc/sudoers#收回权限chmod -v u-w /etc/sudoers#第一次使用sudo会有提示We trust you have received the usual lecture from the local SystemAdministrator. It usually boils down to these three things: #1) Respect the privacy of others. #2) Think before you type. #3) With great power comes great responsibility.用户创建完成。

1.2 安装jdk

su queylogcd /home/queylog#解压jdk-8u191-linux-x64.tar.gz tar -zxvf jdk-8u191-linux-x64.tar.gz sudo mv jdk1.8.0_191 /opt/jdk1.8#编辑/ect/profilevi /ect/profileexport JAVA_HOME=/opt/jdk1.8export JRE_HOME=$JAVA_HOME/jreexport CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATHexport PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH#刷新配置文件source /ect/profile#查看jdk版本java -verion

1.3 防火墙设置

#放行指定IPfirewall-cmd --permanent --add-rich-rule="rule family="ipv4" source address="172.16.110.55" accept" #重新载入firewall-cmd --reload

2、安装elasticsearch

2.1 elasticsearch配置

注意:elasticsearch要使用普通用户启动要不然会报错

su queylogcd /home/queylog#解压elasticsearch-6.5.4.tar.gztar -zxvf elasticsearch-6.5.4.tar.gzsudo mv elasticsearch-6.5.4 /opt/elasticsearch#编辑es配置文件vi /opt/elasticsearch/config/elasticsearch.yml # 配置es的集群名称cluster.name: elastic# 修改服务地址network.host: 192.168.8.224# 修改服务端口http.port: 9200#切换root用户su root#修改/etc/security/limits.conf 追加以下内容vi /etc/security/limits.conf* hard nofile 655360* soft nofile 131072* hard nproc 4096* soft nproc 2048#编辑 /etc/sysctl.conf,追加以下内容:vi /etc/sysctl.confvm.max_map_count=655360fs.file-max=655360#保存后,重新加载:sysctl -p#切换回普通用户su queylog#启动elasticsearch./opt/elasticsearch/bin/elasticsearch#测试curl http://192.168.8.224:9200#控制台会打印{ "name" : "L_dA6oi", "cluster_name" : "elasticsearch", "cluster_uuid" : "eS7yP6fVTvC8KMhLutOz6w", "version" : { "number" : "6.5.4", "build_flavor" : "default", "build_type" : "tar", "build_hash" : "d2ef93d", "build_date" : "2018-12-17T21:17:40.758843Z", "build_snapshot" : false, "lucene_version" : "7.5.0", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search"}

2.2 把elasticsearch作为服务进行管理

#切换root用户su root#编写服务配置文件vi /usr/lib/systemd/system/elasticsearch.service[unit]Description=ElasticsearchDocumentation=http://www.elastic.coWants=network-online.targetAfter=network-online.target[Service]Environment=ES_HOME=/opt/elasticsearchEnvironment=ES_PATH_CONF=/opt/elasticsearch/configEnvironment=PID_DIR=/opt/elasticsearch/configEnvironmentFile=/etc/sysconfig/elasticsearchWorkingDirectory=/opt/elasticsearchUser=queylogGroup=queylogExecStart=/opt/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid# StandardOutput is configured to redirect to journalctl since# some error messages may be logged in standard output before# elasticsearch logging system is initialized. Elasticsearch# stores its logs in /var/log/elasticsearch and does not use# journalctl by default. If you also want to enable journalctl# logging, you can simply remove the "quiet" option from ExecStart.StandardOutput=journalStandardError=inherit# Specifies the maximum file descriptor number that can be opened by this processLimitNOFILE=65536# Specifies the maximum number of processLimitNPROC=4096# Specifies the maximum size of virtual memoryLimitAS=infinity# Specifies the maximum file sizeLimitFSIZE=infinity# Disable timeout logic and wait until process is stoppedTimeoutStopSec=0# SIGTERM signal is used to stop the Java processKillSignal=SIGTERM# Send the signal only to the JVM rather than its control groupKillMode=process# Java process is never killedSendSIGKILL=no# When a JVM receives a SIGTERM signal it exits with code 143SuccessExitStatus=143 [Install]WantedBy=multi-user.targetvi /etc/sysconfig/elasticsearchelasticsearch ######################### Elasticsearch home directoryES_HOME=/opt/elasticsearch# Elasticsearch Java pathJAVA_HOME=/home/liyijie/jdk1.8CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOMR/jre/lib# Elasticsearch configuration directoryES_PATH_CONF=/opt/elasticsearch/config# Elasticsearch PID directoryPID_DIR=/opt/elasticsearch/config############################## Elasticsearch Service ############################### SysV init.d# The number of seconds to wait before checking if elasticsearch started successfully as a daemon processES_STARTUP_SLEEP_TIME=5################################# Elasticsearch Properties ################################## Specifies the maximum file descriptor number that can be opened by this process# When using Systemd,this setting is ignored and the LimitNOFILE defined in# /usr/lib/systemd/system/elasticsearch.service takes precedence#MAX_OPEN_FILES=65536# The maximum number of bytes of memory that may be locked into RAM# Set to "unlimited" if you use the 'bootstrap.memory_lock: true' option# in elasticsearch.yml.# When using Systemd,LimitMEMLOCK must be set in a unit file such as# /etc/systemd/system/elasticsearch.service.d/override.conf.#MAX_LOCKED_MEMORY=unlimited# Maximum number of VMA(Virtual Memory Areas) a process can own# When using Systemd,this setting is ignored and the 'vm.max_map_count'# property is set at boot time in /usr/lib/sysctl.d/elasticsearch.conf#MAX_MAP_COUNT=262144# 重新加载服务systemctl daemon-reload#切换普通用户su queylog#启动elasticsearchsudo systemctl start elasticsearch#设置开机自启动sudo systemctl enable elasticsearch

3、安装logstash

3.1、logstash配置

su queylogcd /home/queylog#解压 logstash-6.5.4.tar.gztar -zxvf logstash-6.5.4.tar.gzsudo mv logstash-6.5.4 /opt/logstash#编辑es配置文件vi /opt/logstash/config/logstash.yml xpack.monitoring.enabled: truexpack.monitoring.elasticsearch.username: elasticxpack.monitoring.elasticsearch.password: changemexpack.monitoring.elasticsearch.url: ["http://192.168.8.224:9200"]#在bin目录下创建logstash.confvi /opt/logstash/bin/logstash.confinput {# 以文件作为来源file {# 日志文件路径path => "/opt/tomcat/logs/catalina.out"start_position => "beginning" # (end, beginning)type=> "isp"}}#filter { #定义数据的格式,正则解析日志(根据实际需要对日志日志过滤、收集)#grok {  # match => { "message" => "%{IPV4:clientIP}|%{GREEDYDATA:request}|%{NUMBER:duration}"}#}   #根据需要对数据的类型转换  #mutate { convert => { "duration" => "integer" }}#}# 定义输出 output {elasticsearch {hosts => "192.168.43.211:9200" #Elasticsearch 默认端口index => "ind"document_type => "isp"} } #给该用户授权chown queylog:queylog /opt/logstash#启动logstash./opt/logstash/bin/logstash -f logstash.conf # 安装并配置启动logstash后查看es索引是否创建完成curl http://192.168.8.224:9200/_cat/indices

4、java代码部分

之前在SpringBoot整合ElasticSearch与Redis的异常解决

查阅资料,这个归纳的原因比较合理。
原因分析:程序的其他地方使用了Netty,这里指redis。这影响在实例化传输客户端之前初始化处理器的数量。 实例化传输客户端时,我们尝试初始化处理器的数量。 由于在其他地方使用Netty,因此已经初始化并且Netty会对此进行防范,因此首次实例化会因看到的非法状态异常而失败。

解决方案

在SpringBoot启动类中加入:

System.setProperty("es.set.netty.runtime.available.processors", "false");

4.1、引入pom依赖

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>  </dependency>

4.2、修改配置文件

spring.data.elasticsearch.cluster-name=elastic# restapi使用9200# java程序使用9300spring.data.elasticsearch.cluster-nodes=192.168.43.211:9300

4.3、对应的接口以及实现类

import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;@Document(indexName = "ind", type = "isp")public class Bean { @Field private String message; public String getMessage() {  return message; } public void setMessage(String message) {  this.message = message; } @Override public String toString() {  return "Tomcat{" +    ", message='" + message + '\'' +    '}'; }}
import java.util.Map;public interface IElasticSearchService {  Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) throws Exception ;//特殊字符转义  default String escape( String s) {  StringBuilder sb = new StringBuilder();  for(int i = 0; i < s.length(); ++i) {   char c = s.charAt(i);   if (c == '\\' || c == '+' || c == '-' || c == '!' || c == '(' || c == ')' || c == ':' || c == '^' || c == '[' || c == ']' || c == '"' || c == '{' || c == '}' || c == '~' || c == '*' || c == '?' || c == '|' || c == '&' || c == '/') {    sb.append('\\');   }   sb.append(c);  }  return sb.toString(); }}
import org.elasticsearch.index.query.BoolQueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.domain.PageRequest;import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;@Servicepublic class ElasticSearchServiceImpl implements IElasticSearchService { Logger log = LoggerFactory.getLogger(ElasticSearchServiceImpl.class); @Autowired ElasticsearchTemplate elasticsearchTemplate; @Resource HighlightResultHelper highlightResultHelper; @Override public Map<String, Object> search(String keywords, Integer currentPage, Integer pageSize) {  keywords= escape(keywords);   currentPage = Math.max(currentPage - 1, 0);  List<HighlightBuilder.Field> highlightFields = new ArrayList<>(); //设置高亮 把查询到的关键字进行高亮  HighlightBuilder.Field message = new HighlightBuilder.Field("message").fragmentOffset(80000).numOfFragments(0).requireFieldMatch(false).preTags("<span style='color:red'>").postTags("</span>");  highlightFields.add(message);  HighlightBuilder.Field[] highlightFieldsAry = highlightFields.toArray(new HighlightBuilder    .Field[highlightFields.size()]);    //创建查询构造器  NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();  //过滤 按字段权重进行搜索 查询内容不为空按关键字、摘要、其他属性权重  BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();  queryBuilder.withPageable(PageRequest.of(currentPage, pageSize));  if (!MyStringUtils.isEmpty(keywords)){   boolQueryBuilder.must(QueryBuilders.queryStringQuery(keywords).field("message"));  }  queryBuilder.withQuery(boolQueryBuilder);  queryBuilder.withHighlightFields(highlightFieldsAry);  log.info("查询语句:{}", queryBuilder.build().getQuery().toString());  //查询  AggregatedPage<Bean> result = elasticsearchTemplate.queryForPage(queryBuilder.build(), Bean    .class,highlightResultHelper);  //解析结果  long total = result.getTotalElements();  int totalPage = result.getTotalPages();  List<Bean> blogList = result.getContent();  Map<String, Object> map = new HashMap<>();  map.put("total", total);  map.put("totalPage", totalPage);  map.put("pageSize", pageSize);  map.put("currentPage", currentPage + 1);  map.put("blogList", blogList);  return map; }
import com.alibaba.fastjson.JSONObject;import org.apache.commons.beanutils.PropertyUtils;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.common.text.Text;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.data.domain.Pageable;import org.springframework.data.elasticsearch.core.SearchResultMapper;import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;import org.springframework.data.elasticsearch.core.aggregation.impl.AggregatedPageImpl;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;import java.util.List;@Componentpublic class HighlightResultHelper implements SearchResultMapper { Logger log = LoggerFactory.getLogger(HighlightResultHelper.class); @Override public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {  List<T> results = new ArrayList<>();  for (SearchHit hit : response.getHits()) {   if (hit != null) {    T result = null;    if (StringUtils.hasText(hit.getSourceAsString())) {     result = JSONObject.parseObject(hit.getSourceAsString(), clazz);    }    // 高亮查询    for (HighlightField field : hit.getHighlightFields().values()) {     try {      PropertyUtils.setProperty(result, field.getName(), concat(field.fragments()));     } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {      log.error("设置高亮字段异常:{}", e.getMessage(), e);     }    }    results.add(result);   }  }  return new AggregatedPageImpl<T>(results, pageable, response.getHits().getTotalHits(), response    .getAggregations(), response.getScrollId()); } public <T> T mapSearchHit(SearchHit searchHit, Class<T> clazz) {  List<T> results = new ArrayList<>();  for (HighlightField field : searchHit.getHighlightFields().values()) {   T result = null;   if (StringUtils.hasText(searchHit.getSourceAsString())) {    result = JSONObject.parseObject(searchHit.getSourceAsString(), clazz);   }   try {    PropertyUtils.setProperty(result, field.getName(), concat(field.fragments()));   } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {    log.error("设置高亮字段异常:{}", e.getMessage(), e);   }   results.add(result);  }  return null; } private String concat(Text[] texts) {  StringBuffer sb = new StringBuffer();  for (Text text : texts) {   sb.append(text.toString());  }  return sb.toString(); }}
import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes = CbeiIspApplication.class)public class ElasticSearchServiceTest {w private static Logger logger= LoggerFactory.getLogger(EncodePhoneAndCardTest.class);  @Autowired private IElasticSearchService elasticSearchService; @Test public ResponseVO getLog(){  try {  Map<String, Object> search = elasticSearchService.search("Exception", 1, 10);   logger.info( JSON.toJSONString(search));  } catch (Exception e) {   e.printStackTrace();  }}

关于如何在java项目中使用elasticsearch与logstash就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯