增加netty依赖
io.netty netty-all 4.1.28.Final
创建netty init 类
public class WsServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket基于http协议,所以需要http编解码器 pipeline.addLast(new HttpServerCodec()); //添加对于读写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行聚合 pipeline.addLast(new HttpObjectAggregator(1024*64)); // ================= 上述是用于支持http协议的 ============== //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址 //比如处理一些握手动作(ping,pong) 等同于访问路劲 pipeline.addLast(new WebSocketServerProtocolHandler("/heart/api/v1")); //自定义handler pipeline.addLast(new ChatHandler()); }}
netty 启动类
@Component@Slf4jpublic class WsServer { public static class SingletonServer{ static final WsServer INSTANCE = new WsServer(); } public static WsServer getInstance(){ return SingletonServer.INSTANCE; } private EventLoopGroup mainGroup ; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WsServer(){ mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); //添加自定义初始化处理器 server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WsServerInitializer()); } public void start(Integer port){ future = this.server.bind(port); }}
3.启动netty
@SpringBootApplicationpublic class Application{ public static void main(String[] args){ SpringApplication.run(Application.class,args); //7888 是netty 占用的端口 WsServer.getInstance().start(7888); }}
4.接收代码
@Component@Slf4jpublic class ChatHandler extends SimpleChannelInboundHandler { private static ChannelGroup CLIENTS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { CLIENTS.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { CLIENTS.remove(ctx.channel()); } @Transactional(rollbackFor = Exception.class) @Override public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //接收客户端发送的信息 System.out.println(msg.text()); //发送客户端的消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("发送心跳")); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("心跳异常:",cause); CLIENTS.remove(ctx.channel()); }}
netty 自带定时器 这里只举个例子 定时器不在spring管理中举读取的定时发送
@Component@Slf4jpublic class ChatHandler extends SimpleChannelInboundHandler { private static ChannelGroup CLIENTS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired private RestTemplate restTemplate; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { CLIENTS.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { CLIENTS.remove(ctx.channel()); } @Transactional(rollbackFor = Exception.class) @Override public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //接收客户端发送的信息 System.out.println(msg.text()); ctx.channel().eventLoop().scheduleWithFixedDelay(new Runnable() { ChannelHandlerContext ctx; RestTemplate restTemplate; @Override public void run() {}//对自身属性进行赋值 public Runnable accept(ChannelHandlerContext c, RestTemplate restTemplate) { this.ctx = c; this.restTemplate= restTemplate; return this; //0 是延迟多少秒执行 后面两个参数时多长时间执行一次 }}.accept(ctx,restTemplate),0,1, TimeUnit.MINUTES); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("心跳异常:",cause); CLIENTS.remove(ctx.channel()); }}
来源地址:https://blog.csdn.net/weixin_56576835/article/details/129992578