文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RedisLettuce连接redis集群实现过程详细讲解

2023-01-17 18:00

关注

前言

Lettuce连接redis集群使用的都是集群专用类,像RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection等等;

Lettuce对rediscluster的支持:

启动时只需至少一个可以连接的集群节点就可以,能够自动拓扑出集群全部节点;也可以使用ReadFrom设置读取数据来源,跟主从模式一样;

虽然redis本身的多键命令要求key必须都在同一个槽位,但Lettuce对一部分命令多了优化,可以对多键命令进行跨槽位执行,通过将对不同槽位键的操作命令分解为多条命令,单个命令以fork/join方式并发运行,最后将结果合并返回;

可以跨槽位的命令有

提供跨槽位命令的api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;

可以在多个集群节点上执行的命令有

关于发布订阅

普通用户空间的发布订阅,redis集群会发送到每个节点,发布者和订阅者不需要在同一个节点,普通订阅发布消息可以在集群拓扑改变时重新连接。对于键空间事件,只会发到自己的节点,不会扩散到其他节点,要订阅键空间事件可以去适当的多个节点上订阅,或者使用RedisClusterClient消息传播和NodeSelectionAPI获得一个托管连接集合;

注意:由于主从同步,键会被复制到多个从节点上,特别是键过期事件,会在主从节点上都产生过期事件,如果订阅从节点,可能会收到多条相同的过期事件;订阅是通过NodeSelectionAPI或者单个节点调用subscribe(…)发出的,订阅对于新增的节点无效;

测试Demo:(redis版本7.0.2,Lettuce版本6.1.8)

集群节点:虚拟机 192.168.1.31,端口 9001-9006,集群节点已设置notify-keyspace-events AK;

package testlettuce;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.SslOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.sync.Executions;
import io.lettuce.core.cluster.api.sync.NodeSelection;
import io.lettuce.core.cluster.api.sync.NodeSelectionCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands;
import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection;
import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
public class TestLettuceCluster {
	
	public static void main(String[] args) {
		List<RedisURI> nodeList = new ArrayList<>();
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "123456").build());
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "123456").build());
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "123456").build());
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "123456").build());
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "123456").build());
		nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "123456").build());
		RedisClusterClient clusterClient = RedisClusterClient.create(nodeList);
		ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
	            .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))//设置自适应拓扑刷新超时,每次超时刷新一次,默认30s;
	            .closeStaleConnections(false)//刷新拓扑时是否关闭失效连接,默认true,isPeriodicRefreshEnabled()为true时生效;
	            .dynamicRefreshSources(true)//从拓扑中发现新节点,并将新节点也作为拓扑的源节点,动态刷新可以发现全部节点并计算每个客户端的数量,设置false则只有初始节点为源和计算客户端数量;
	            .enableAllAdaptiveRefreshTriggers()//启用全部触发器自适应刷新拓扑,默认关闭;
	            .enablePeriodicRefresh(Duration.ofSeconds(5L))//开启定时拓扑刷新并设置周期;
	            .refreshTriggersReconnectAttempts(3)//长连接重新连接尝试n次才拓扑刷新
	            .build();
		ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
				.autoReconnect(true)//在连接丢失时开启或关闭自动重连,默认true;
				.cancelCommandsOnReconnectFailure(true)//允许在重连失败取消排队命令,默认false;
				.decodeBufferPolicy(DecodeBufferPolicies.always())//设置丢弃解码缓冲区的策略,以回收内存;always:解码后丢弃,最大内存效率;alwaysSome:解码后丢弃一部分;ratio(n)基于比率丢弃,n/(1+n),通常用1-10对应50%-90%;
				.disconnectedBehavior(DisconnectedBehavior.DEFAULT)//设置连接断开时命令的调用行为,默认启用重连;DEFAULT:启用时重连中接收命令,禁用时重连中拒绝命令;ACCEPT_COMMANDS:重连中接收命令;REJECT_COMMANDS:重连中拒绝命令;
//				.maxRedirects(5)//当键从一个节点迁移到另一个节点,集群重定向次数,默认5;
//				.nodeFilter(nodeFilter)//设置节点过滤器
//				.pingBeforeActivateConnection(true)//激活连接前设置PING,默认true;
//				.protocolVersion(ProtocolVersion.RESP3)//设置协议版本,默认RESP3;
//				.publishOnScheduler(false)//使用专用的调度器发出响应信号,默认false,启用时数据信号将使用服务的多线程发出;
//				.requestQueueSize(requestQueueSize)//设置每个连接请求队列大小;
//				.scriptCharset(scriptCharset)//设置Lua脚本编码为byte[]的字符集,默认StandardCharsets.UTF_8;
//				.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//设置低级套接字的属性
//				.sslOptions(SslOptions.builder().build())//设置ssl属性
//				.suspendReconnectOnProtocolFailure(false)//当重新连接遇到协议失败时暂停重新连接(SSL验证,连接失败前PING),默认值为false;
//				.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//设置超时来取消和终止命令;
				.topologyRefreshOptions(clusterTopologyRefreshOptions)//设置拓扑更新设置
				.validateClusterNodeMembership(true)//在允许连接到集群节点之前,验证集群节点成员关系,默认值为true;
				.build();
		clusterClient.setDefaultTimeout(Duration.ofSeconds(5L));
		clusterClient.setOptions(clusterClientOptions);
		StatefulRedisClusterConnection<String, String> clusterConn = clusterClient.connect();
		clusterConn.setReadFrom(ReadFrom.ANY);//设置从哪些节点读取数据;
		RedisAdvancedClusterCommands<String, String> clusterCmd = clusterConn.sync();
		clusterCmd.set("a", "A");
		clusterCmd.set("b", "B");
		clusterCmd.set("c", "C");
		clusterCmd.set("d", "D"); 
		System.out.println("get a=" + clusterCmd.get("a"));
		System.out.println("get b=" + clusterCmd.get("b"));
		System.out.println("get c=" + clusterCmd.get("c"));
		System.out.println("get d=" + clusterCmd.get("d"));
		//跨槽位命令
		Map<String, String> kvmap = new HashMap<>();
		kvmap.put("a", "AA");
		kvmap.put("b", "BB");
		kvmap.put("c", "CC");
		kvmap.put("d", "DD");
		clusterCmd.mset(kvmap);//Lettuce做了优化,支持一些命令的跨槽位命令;
		System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d"));
		//选定部分节点操作
		NodeSelection<String, String> replicas = clusterCmd.replicas();
		NodeSelectionCommands<String, String> replicaseCmd = replicas.commands();
		Executions<KeyScanCursor<String>> executions = replicaseCmd.scan(ScanCursor.INITIAL);
		executions.forEach(s -> {System.out.println(s.getKeys());});
		//订阅发布消息
		StatefulRedisClusterPubSubConnection<String, String> pubSubConn = clusterClient.connectPubSub();
		pubSubConn.addListener(new RedisPubSubListener<String, String>() {
			@Override
			public void message(String channel, String message) {
				System.out.println("[message]ch:" + channel + ",msg:" + message);
			}
			@Override
			public void message(String pattern, String channel, String message) {
			}
			@Override
			public void subscribed(String channel, long count) {
				System.out.println("[subscribed]ch:" + channel);
			}
			@Override
			public void psubscribed(String pattern, long count) {
			}
			@Override
			public void unsubscribed(String channel, long count) {
			}
			@Override
			public void punsubscribed(String pattern, long count) {
			}
		});
		pubSubConn.sync().subscribe("TEST_Ch");//(回调内部使用阻塞调用或者lettuce同步api调用,需使用异步订阅)
		clusterCmd.publish("TEST_Ch", "MSGMSGMSG");
		//响应式订阅,可以监听ChannelMessage和PatternMessage,使用链式过滤处理计算等操作
		RedisClusterPubSubReactiveCommands<String, String> pubsubReactive = pubSubConn.reactive();
		pubsubReactive.subscribe("TEST_Ch2").subscribe();
		pubsubReactive.observeChannels()
			.filter(chmsg -> {return chmsg.getMessage().contains("tom");})
			.doOnNext(chmsg -> {System.out.println("<tom>" + chmsg.getChannel() + ">>" + chmsg.getMessage());})
			.subscribe();
		clusterCmd.publish("TEST_Ch2", "send to jerry");
		clusterCmd.publish("TEST_Ch", "tom MSG");
		clusterCmd.publish("TEST_Ch2", "this is tom");
		//keySpaceEvent事件
		StatefulRedisClusterPubSubConnection<String, String> clusterPubSubConn = clusterClient.connectPubSub();
		clusterPubSubConn.setNodeMessagePropagation(true);//启用禁用节点消息传播到该listener,例如只能在本节点通知的键事件通知;
		RedisPubSubListener<String, String> listener  = new RedisPubSubListener<String, String>() {
			@Override
			public void unsubscribed(String channel, long count) {
				System.out.println("unsubscribed_ch:" + channel);
			}
			@Override
			public void subscribed(String channel, long count) {
				System.out.println("subscribed_ch:" + channel);
			}
			@Override
			public void punsubscribed(String pattern, long count) {
				System.out.println("punsubscribed_pattern:" + pattern);
			}
			@Override
			public void psubscribed(String pattern, long count) {
				System.out.println("psubscribed_pattern:" + pattern);
			}
			@Override
			public void message(String pattern, String channel, String message) {
				System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message);
			}
			@Override
			public void message(String channel, String message) {
				System.out.println("message_ch:" + channel + " msg:" + message);
			}
		};
		clusterPubSubConn.addListener(listener);
		PubSubAsyncNodeSelection<String, String> allPubSubAsyncNodeSelection = clusterPubSubConn.async().all();
		NodeSelectionPubSubAsyncCommands<String, String> pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands();
		clusterCmd.setex("a", 1, "A");
		pubsubAsyncCmd.psubscribe("__keyspace@0__:*");
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("end");
	}
}

运行结果:

另外,还有一个cluster专用的Listener:RedisClusterPubSubListener,可以从listener里获得发布消息的节点信息:

RedisClusterPubSubListener<String, String> clusterListener = new RedisClusterPubSubListener<String, String>() {
			@Override
			public void message(RedisClusterNode node, String channel, String message) {
			}
			@Override
			public void message(RedisClusterNode node, String pattern, String channel, String message) {
			}
			@Override
			public void subscribed(RedisClusterNode node, String channel, long count) {
			}
			@Override
			public void psubscribed(RedisClusterNode node, String pattern, long count) {
			}
			@Override
			public void unsubscribed(RedisClusterNode node, String channel, long count) {
			}
			@Override
			public void punsubscribed(RedisClusterNode node, String pattern, long count) {
			}
		};

到此这篇关于Redis Lettuce连接redis集群实现过程详细讲解的文章就介绍到这了,更多相关Redis Lettuce连接redis集群内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯