前面文章介绍Netty相关知识点。接下来将介绍下在通信过程中用的编码器和解码器。这里会不会联想到谍战戏里面。发送情报者怕情报泄露,所以对情报行加密然后传给接收者。接收者对情报进行解密,得到情报。这里讲的编码器和解码器是和情报传递很相似?一起查看这篇文章,来揭秘!!!
1一 编解码器
1 1.1 什么叫编解码器
在网络传输的过程中,数据都是以字节流的方式进行传递。客户端在向服务端发送数据的时候,将业务中其他类型数据转化为字节,叫编码。服务端接收到数据为字节流,将字节流转化为原来的格式,叫解码。统称codec。
编解码器分为两部分-编码器和解码器,编码器负责出站,解码器负责入站。
2 1.2 解码器
1.2.1 概述
解码器负责入站操作,那么也一定要实现ChannelInboundHandler接口,所以解码器本质 上也是ChannelHandler。我们自定义编解码器只需要继承ByteToMessageDecoder(Netty提供抽象类,继承 ChannelInboundHandlerAdapter),实现decode()。Netty提供一些常用的解码器实现, 开箱即用。如下:
- 1 RedisDecoder 基于Redis协议的解码器
- 2 XmlDecoder 基于XML格式的解码器
- 3 JsonObjectDecoder 基于json数据格式的解码器
- 4 HttpObjectDecoder 基于http协议的解码器
Netty也提供了MessageToMessageDecoder,将⼀种格式转化为另⼀种格式的解码器,也提供了⼀些 实现,如下:
- 1 StringDecoder 将接收到ByteBuf转化为字符串
- 2 ByteArrayDecoder 将接收到ByteBuf转化字节数组
- 3 Base64Decoder 将由ByteBuf或US-ASCII字符串编码的Base64解码为ByteBuf。
1.2.2 将字节流转化为Intger类型(案例)
1. 字节解码器
- package com.haopt.netty.codec;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
-
- import java.util.List;
- public class ByteToIntegerDecoder extends ByteToMessageDecoder {
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, Listout) throws Exception {
- if(in.readableBytes() >= 4){ //int类型占⽤4个字节,所以需要判断是否存在有4个字节,再进⾏读取
- out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
- }
- }
- }
2. Handler
- package com.haopt.netty.codec;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- public class ServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- Integer i = (Integer) msg; //这⾥可以直接拿到Integer类型的数据
- System.out.println("服务端接收到的消息为:" + i);
- }
- }
3 在pipeline中添加解码器
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ByteToIntegerDecoder())
- .addLast(new ServerHandler());
- }
可以将代码复制到IDEA运行下,查看下运行效果。
3 1.3 编码器
1.3.1 概述
将原来的格式转化为字节。我们要实现自定义解码器只要继承MessageToByteEncoder(实现了ChannelOutboundHandler接⼝),本质上也是ChannelHandler。Netty中一些实现的编码器,如下:
- 1 ObjectEncoder 将对象(需要实现Serializable接⼝)编码为字节流
- 2 SocksMessageEncoder 将SocksMessage编码为字节流
- 3 HAProxyMessageEncoder 将HAProxyMessage编码成字节流
Netty也提供了MessageToMessageEncoder,将⼀种格式转化为另⼀种格式的编码器,也提供了⼀些 实现:
- 1 RedisEncoder 将Redis协议的对象进⾏编码
- 2 StringEncoder 将字符串进⾏编码操作
- 3 Base64Encoder 将Base64字符串进⾏编码操作
1.3.2 将Integer类型编码为字节进⾏传递(案例)
1. 自定义编码器
- package com.haopt.netty.codec.client;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
- @Override
- protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
- out.writeInt(msg);
- }
- }
2. Handler
- package com.haopt.netty.codec.client;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ClientHandler extends SimpleChannelInboundHandler
{ - @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("接收到服务端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ctx.writeAndFlush(123);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
3. pipeline
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IntegerToByteEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
2 二 开发Http服务器
通过Netty中提供的http的解码器,进行http服务器开发。建议代码复制下来,执行下看看效果。
4 2.1 Netty配置
1. server
- package com.haopt.netty.codec.http;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpRequestDecoder;
- import io.netty.handler.codec.http.HttpResponseEncoder;
- import io.netty.handler.stream.ChunkedWriteHandler;
- public class NettyHttpServer {
- public static void main(String[] args) throws Exception {
- // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作线程,线程数默认是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服务器启动类
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker);
- //配置server通道
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- //http请求的解码器
- //将http请求中的uri以及请求体聚合成⼀个完整的FullHttpRequest对象
- .addLast(new HttpRequestDecoder())
- .addLast(new HttpObjectAggregator(1024 * 128))
- .addLast(new HttpResponseEncoder()) //http响应的编码器
- .addLast(new ChunkedWriteHandler()) //⽀持异步的⼤⽂件传输,防⽌内存溢出
- .addLast(new ServerHandler());
- }
- }); //worker线程的处理器
- ChannelFuture future = serverBootstrap.bind(8080).sync();
- System.out.println("服务器启动完成。。。。。");
- //等待服务端监听端⼝关闭
- future.channel().closeFuture().sync();
- } finally {
- //优雅关闭
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
2. ServerHandler
- package com.haopt.netty.codec.http;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.*;
- import io.netty.util.CharsetUtil;
- import java.util.Map;
- public class ServerHandler extends SimpleChannelInboundHandler
{ - @Override
- public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
- //解析FullHttpRequest,得到请求参数
- Map
paramMap = new RequestParser(request).parse(); - String name = paramMap.get("name");
- //构造响应对象
- FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
- httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
- StringBuilder sb = new StringBuilder();
- sb.append("");
- httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8));
- //操作完成后,将channel关闭
- ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE);
- }
- }
3. RequestParser
- package com.haopt.netty.codec.http;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.HttpMethod;
- import io.netty.handler.codec.http.QueryStringDecoder;
- import io.netty.handler.codec.http.multipart.Attribute;
- import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
- import io.netty.handler.codec.http.multipart.InterfaceHttpData;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
- public class RequestParser {
- private FullHttpRequest fullReq;
-
- public RequestParser(FullHttpRequest req) {
- this.fullReq = req;
- }
-
- public Map
parse() throws Exception { - HttpMethod method = fullReq.method();
- Map
parmMap = new HashMap<>(); - if (HttpMethod.GET == method) {
- // 是GET请求
- QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
- decoder.parameters().entrySet().forEach( entry -> {
- // entry.getValue()是⼀个List, 只取第⼀个元素
- parmMap.put(entry.getKey(), entry.getValue().get(0));
- });
- } else if (HttpMethod.POST == method) {
- // 是POST请求
- HttpPostRequestDecoder decoder = new
- HttpPostRequestDecoder(fullReq);
- decoder.offer(fullReq);
- List
parmList = decoder.getBodyHttpDatas(); - for (InterfaceHttpData parm : parmList) {
- Attribute data = (Attribute) parm;
- parmMap.put(data.getName(), data.getValue());
- }
- } else {
- // 不⽀持其它⽅法
- throw new RuntimeException("不⽀持其它⽅法"); // 可以用自定义异常来替代
- }
- return parmMap;
- }
- }
4. 对象
- package com.haopt.netty.codec.obj;
- public class User implements java.io.Serializable {
- private static final long serialVersionUID = -89217070354741790L;
- private Long id;
- private String name;
- private Integer age;
- public Long getId() {
- return id;
- }
- public void setId(Long id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public Integer getAge() {
- return age;
- }
- public void setAge(Integer age) {
- this.age = age;
- }
- @Override
- public String toString() {
- return "User{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", age=" + age +
- '}';
- }
- }
5 2.2 服务端
1. NettyObjectServer
- package com.haopt.netty.codec.obj;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- public class NettyObjectServer {
- public static void main(String[] args) throws Exception {
- // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作线程,线程数默认是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服务器启动类
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker);
- //配置server通道
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(
- this.getClass().getClassLoader()
- )))
- .addLast(new ServerHandler());
- }
- }); //worker线程的处理器
- ChannelFuture future = serverBootstrap.bind(6677).sync();
- System.out.println("服务器启动完成。。。。。");
- //等待服务端监听端⼝关闭
- future.channel().closeFuture().sync();
- } finally {
- //优雅关闭
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
2. ServerHandler
- package com.haopt.netty.codec.obj;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ServerHandler extends SimpleChannelInboundHandler<User> {
- @Override
- public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
- //获取到user对象
- System.out.println(user);
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
- }
- }
6 2.3 客户端
1. NettyObjectClient
- package com.haopt.netty.codec.obj;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- public class NettyObjectClient {
- public static void main(String[] args) throws Exception{
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服务器启动类
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new ObjectEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
- future.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
2. ClientHandler
- package com.haopt.netty.codec.obj;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.util.CharsetUtil;
- public class ClientHandler extends SimpleChannelInboundHandler
{ - @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- System.out.println("接收到服务端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- User user = new User();
- user.setId(1L);
- user.setName("张三");
- user.setAge(20);
- ctx.writeAndFlush(user);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
7 2.4 JDK序列化的优化
JDK序列化使⽤是⽐较⽅便,但是性能较差,序列化后的字节⽐较⼤,所以⼀般在项⽬中不 会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架Hessian编解码。
1. 导入依赖
-
com.caucho -
hessian -
4.0.63 -
2. User对象
- package com.haopt.netty.codec.hessian;
- public class User implements java.io.Serializable{
- private static final long serialVersionUID = -8200798627910162221L;
- private Long id;
- private String name;
- private Integer age;
- public Long getId() {
- return id;
- }
- public void setId(Long id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public Integer getAge() {
- return age;
- }
- public void setAge(Integer age) {
- this.age = age;
- }
- @Override
- public String toString() {
- return "User{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", age=" + age +
- '}';
- }
- }
3. Hessian序列化⼯具类
- package com.haopt.netty.codec.hessian.codec;
- import com.caucho.hessian.io.HessianInput;
- import com.caucho.hessian.io.HessianOutput;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
-
- public class HessianSerializer {
- public
byte[] serialize(T obj) { - ByteArrayOutputStream os = new ByteArrayOutputStream();
- HessianOutput ho = new HessianOutput(os);
- try {
- ho.writeObject(obj);
- ho.flush();
- return os.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- ho.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- try {
- os.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public
Object deserialize(byte[] bytes, Class clazz) { - ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- HessianInput hi = new HessianInput(is);
- try {
- return (T) hi.readObject(clazz);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- hi.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- try {
- is.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
4. 编码器
- package com.haopt.netty.codec.hessian.codec;
- import cn.itcast.netty.coder.hessian.User;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- public class HessianEncoder extends MessageToByteEncoder<User> {
- private HessianSerializer hessianSerializer = new HessianSerializer();
- protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
- byte[] bytes = hessianSerializer.serialize(msg);
- out.writeBytes(bytes);
- }
- }
5. 解码器
- public class HessianDecoder extends ByteToMessageDecoder {
- private HessianSerializer hessianSerializer = new HessianSerializer();
-
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
- out) throws Exception {
- //复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉
- //避免出现异常:did not read anything but decoded a message
- //Netty检测没有读取任何字节就会抛出该异常
- ByteBuf in2 = in.retainedDuplicate();
- byte[] dst;
- if (in2.hasArray()) {//堆缓冲区模式
- dst = in2.array();
- } else {
- dst = new byte[in2.readableBytes()];
- in2.getBytes(in2.readerIndex(), dst);
- }
- //跳过所有的字节,表示已经读取过了
- in.skipBytes(in.readableBytes());
- //反序列化
- Object obj = hessianSerializer.deserialize(dst, User.class);
- out.add(obj);
- }
- }
服务端
- public class NettyHessianServer {
- public static void main(String[] args) throws Exception {
- // System.setProperty("io.netty.noUnsafe", "true");
- // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
- EventLoopGroup boss = new NioEventLoopGroup(1);
- // ⼯作线程,线程数默认是:cpu*2
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服务器启动类
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(boss, worker);
- //配置server通道
- serverBootstrap.channel(NioServerSocketChannel.class);
- serverBootstrap.childHandler(new ChannelInitializer
- () {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new HessianDecoder())
- .addLast(new ServerHandler());
- }
- }); //worker线程的处理器
- // serverBootstrap.childOption(ChannelOption.ALLOCATOR,
- UnpooledByteBufAllocator.DEFAULT);
- ChannelFuture future = serverBootstrap.bind(6677).sync();
- System.out.println("服务器启动完成。。。。。");
- //等待服务端监听端⼝关闭
- future.channel().closeFuture().sync();
- } finally {
- //优雅关闭
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- }
- }
- }
- public class ServerHandler extends SimpleChannelInboundHandler<User> {
- @Override
- public void channelRead0(ChannelHandlerContext ctx, User user) throws
- Exception {
- //获取到user对象
- System.out.println(user);
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
- }
- }
7. 客户端(配置类)
- public class NettyHessianClient {
- public static void main(String[] args) throws Exception {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- // 服务器启动类
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new HessianEncoder());
- ch.pipeline().addLast(new ClientHandler());
- }
- });
- ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
- future.channel().closeFuture().sync();
- } finally {
- worker.shutdownGracefully();
- }
- }
- }
- public class ClientHandler extends SimpleChannelInboundHandler
{ - @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
- Exception {
- System.out.println("接收到服务端的消息:" +
- msg.toString(CharsetUtil.UTF_8));
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- User user = new User();
- user.setId(1L);
- user.setName("张三");
- user.setAge(20);
- ctx.writeAndFlush(user);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
这篇文章,介绍了什么是编码器、解码器,也讲述了如何实战中运用编码器和解码器。希望能对有所帮助。在开头提到的我们本文的编码器解码器和情报信息交互是否相似?在我看来,是相似的。发报人将自己看的懂得信息,按照某种规则进行加密。收报人接收到信息是加密后的数据,需要进行按照规则进行解密才能看懂。我们客户端在进行发送数据,需要将程序中的数据变为二进制流发送。服务端接收到数据,需要将二进制流转化为程序可以操作数据类型。