文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一不小心成了知名开源项目的贡献者?!

2024-11-29 22:48

关注

在这个文章中我提到了这么一个内容,官方自带的 ES8 Adapter 同步类不支持 ES8 的 TLS 认证,所以导致我们在部署 ES8 集群的时候需要关闭这个安全功能。

但是作为技术人员就是见不得功能被阉割,所以就拉取了源码,在原有的基础上进行改造支持了 TLS 认证。

图片

改完过后本地重新打包实现了功能,本着独乐乐不如众乐乐,就顺手提了一个 PR,结果万万没想到,最近发现这个 PR 被合并到主干了!!!

图片

就这样一不小心成了一个几万星的知名开源项目贡献者,大佬还在 PR 下面回复了一个 tks,突然发现自己和大佬也可以靠的这么近。

图片

问题描述

在没有修复的时候,启动了 canal 适配器过后,在进行 MySQL 数据同步到 ES8 的时候,出现下面的错误,这个错误的原因是因为 ES8 默认开启了安全认证,并且自带了签名证书。Canal Adapter 在适配 ES8 的时候并没有支持这个功能,因此报错了。

2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83)
 at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
 at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
 at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
 at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154)
 at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118)
 at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97)
 ... 12 common frames omitted
Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
 at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
 ... 23 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
 at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
 at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
 at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
 at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537)
 at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
 at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
 ... 1 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
 at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
 at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
 at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:919)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:916)
 at java.security.AccessController.doPrivileged(Native Method)
 at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356)
 ... 9 common frames omitted
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
 at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
 at sun.security.validator.Validator.validate(Validator.java:260)
 at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496)
 ... 17 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
 at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
 at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
 ... 23 common frames omitted
2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync and rollback, execute times: 13

解决方案

解决方案有两个:

  1. 部署搭建 ES 集群的时候,关闭这个安全证书的功能,对应 ES 的配置是在 elasticsearch.yml 里面的 xpack.security.enabled 为 false,docker 部署的 ES 需要进入的容器里面去进行修改,或者在容器启动的时候就配置。
  2. 修改 canal adapter 的源码,兼容证书;

这里主要讲一下方案 2,因为对于方案 1 需要取消 ES8 的安全功能,不推荐。

修改源码,兼容 ES8 安全配置

拷贝证书

在使用 docker 安装和部署 ES8 的时候,默认已经创建好了一个证书,我们需要将证书从容器中拷贝出来,命令如下

docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .

这里的 es01 是容器名称,根据自己的进行替换即可,拷贝出来的路径可以自行替换,记住在哪就行,后面会用到。

修改代码

在 canal adapter 的源码中,找到下面这类,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection

图片

将其中的构造方法改成下面这段

public ESConnection(String[] hosts, Map properties) throws UnknownHostException {
    String caPath = properties.get("security.ca.path");
    if (StringUtils.isNotEmpty(caPath)) {
        connectEsWithCa(hosts, properties, caPath);
    } else {
        connectEsWithoutCa(hosts, properties);
    }
}
private void connectEsWithCa(String[] hosts, Map properties, String caPath) {
    Path caCertificatePath = Paths.get(caPath);
    try (InputStream is = Files.newInputStream(caCertificatePath)) {
        CertificateFactory factory = CertificateFactory.getInstance("X.509");
        Certificate trustedCa = factory.generateCertificate(is);
        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(null, null);
        trustStore.setCertificateEntry("ca", trustedCa);
        SSLContextBuilder sslContextBuilder = SSLContexts.custom()
        .loadTrustMaterial(trustStore, null);
        final SSLContext sslContext = sslContextBuilder.build();

        HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        String nameAndPwd = properties.get("security.auth");
        if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
            String[] nameAndPwdArr = nameAndPwd.split(":");
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                                               new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                return httpClientBuilder.setSSLContext(sslContext);
                });
            }
            restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
        } catch (Exception e) {
            throw new RuntimeException(e);
    }
}

private void connectEsWithoutCa(String[] hosts, Map properties) {
    HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
    RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
    String nameAndPwd = properties.get("security.auth");
    if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
        String[] nameAndPwdArr = nameAndPwd.split(":");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
        restClientBuilder.setHttpClientConfigCallback(
                httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    }
    restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
            .build();
}

简单说明

  1. 其中 connectEsWithoutCa 方法为原来的构造方法的实现;
  2. connectEsWithCa 方法为兼容了安全认证的方法构造方法实现;
  3. 这两个方法的使用根据是否配置了 security.ca.path 属性来判断;
  4. 而 security.ca.path 这个配置是在启动器的 outerAdapters 的 ES8 的 properties 下,与 security.auth 同级;

代码修改到这里就结束了,下面看下如何使用

重新打包

修改好了代码过后,通过 maven 重新打包,打包出对应的 es8 的 jar 包即可。

图片

将编译打包后的 jar 重新复制到 canal 适配器的 plugin 目录下面,并且修改一下对应的名称跟下载下来的版本一致即可,比如我这边之前下载的 1.1.7 版本。

图片

其中 client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7 是原来下载下来携带的 jar,client-adapter.es8x-1.1.7-jar-with-dependencies.jar 是我重新打包编译后的 jar。

修改启动器的配置

前面讲到兼容代码的时候,我们使用了一个叫 security.ca.path 的配置,所以我们需要将前面拷贝的 ca 证书路径,配置在这个属性上,即 security.ca.path: /opt/canal/http_ca.crt

完整的配置如下所示

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
        - name: es8
          key: es-key
          hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
          properties:
            mode: rest # transport or rest
            security.auth: elastic:password
            security.ca.path: /opt/canal/http_ca.crt
            cluster.name: docker-cluster
        - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase

#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

配置好了证书路径过后,就可以正常启动和同步数据了,具体的实操也可以看对应的公众号文章 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!,这里就不重复演示了。

总结

以前一直想着要参与一下开源项目,没想到这次也算是实现了一个小小的目标,其实这次纯属是一个意外之喜,原本只是在自己学习和研究 canal 的数据同步,然后发现了这个问题,最后就修复了一下,顺手提了一个 PR,没想到还真的被合并了,想想还是很激动的。

这个事情告诉我们只要真正的去参与和使用并了解一个开源项目了过后,还是有机会贡献自己的代码的,哪怕只是一个很小的一部分,也算是为开源项目贡献了一份自己的绵薄之力。

另外最近也发现了另一个开源项目的一些小 bug,回头再提交一下 PR,向着开源的道路继续前行。

来源:Java极客技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯