编码解码
Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等
ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline提供了ChannelHandler链的容器。如果事件的运动方向是从客户端到服务端的,那么我们称这些事件对服务端来说是入站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelInboundHandler(从head到tail方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为出站的,入站只调用pipeline里的ChannelInboundHandler逻辑,出站只调用pipelind里的ChannelOutboundHandler逻辑。
编码解码器
当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。
如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。
protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单
NettyServerTest.java
package com.mrathena.codec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class NettyServerTest {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 默认的入站处理器,使用的是ByteBuf,未经过其他格式的编解码
// pipeline.addLast(new ByteBufChannelInboundHandler());
// String类型的入站处理器,该处理器输入String类型的消息,输出String类型的数据
// pipeline.addLast(new StringDecoder());
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new ServerStringChannelInboundHandler());
// Object类型的入站处理器
// pipeline.addLast(new ObjectDecoder(1024,ClassResolvers.cachedisabled(null)));
// pipeline.addLast(new ObjectEncoder());
// pipeline.addLast(new ServerObjectChannelInboundHandler());
// 使用自定义ProtoStuff的自定义Object类型的入站处理器
pipeline.addLast(new ProtoStuffBytetoUserDecoder());
pipeline.addLast(new ProtoStuffUserToByteEncoder());
pipeline.addLast(new ServerObjectChannelInboundHandler());
}
});
log.info("Netty Server Started");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (Throwable cause) {
log.error("", cause);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Slf4j
class ServerObjectChannelInboundHandler extends SimpleChannelInboundHandler<User> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, User msg) throws Exception {
log.info("{}", msg);
ctx.writeAndFlush(new User(999L, "888"));
}
}
@Slf4j
class ServerStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
ctx.writeAndFlush("Hello " + ctx.channel().remoteAddress());
}
}
@Slf4j
class ServerByteBufChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(((ByteBuf) msg).toString(StandardCharsets.UTF_8));
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello " + ctx.channel().remoteAddress(), StandardCharsets.UTF_8));
}
}
NettyClientTest.java
package com.mrathena.codec;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class NettyClientTest {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 默认的入站处理器,未经过其他格式的编解码
// pipeline.addLast(new ByteBufNettyClientHandler());
// String类型的入站处理器,输出String类型的数据
// pipeline.addLast(new StringDecoder());
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new ClientStringChannelInboundHandler());
// Object类型的入站处理器
// pipeline.addLast(new ObjectDecoder(1024,ClassResolvers.cachedisabled(null)));
// pipeline.addLast(new ObjectEncoder());
// pipeline.addLast(new ClientObjectChannelInboundHandler());
// 使用自定义ProtoStuff的自定义Object类型的入站处理器
pipeline.addLast(new ProtoStuffBytetoUserDecoder());
pipeline.addLast(new ProtoStuffUserToByteEncoder());
pipeline.addLast(new ClientObjectChannelInboundHandler());
}
});
log.info("Netty Client Started");
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (Throwable cause) {
log.error("", cause);
} finally {
group.shutdownGracefully();
}
}
}
@Slf4j
class ClientObjectChannelInboundHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(new User(1L, "mrathean"));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, msg);
}
}
@Slf4j
class ClientStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("Hello Server");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
}
}
@Slf4j
class ClientByteBufNettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server", StandardCharsets.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(((ByteBuf) msg).toString(StandardCharsets.UTF_8));
}
}
ProtoStuffBytetoUserDecoder.java
package com.mrathena.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.BytetoMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class ProtoStuffBytetoUserDecoder extends BytetoMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
out.add(ProtoStuffKit.deserialize(bytes, User.class));
}
}
ProtoStuffUserToByteEncoder.java
package com.mrathena.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessagetoByteEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* ProtoStuff,protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单
*/
@Slf4j
public class ProtoStuffUserToByteEncoder extends MessagetoByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
out.writeBytes(ProtoStuffKit.serialize(msg));
}
}
ProtoStuffKit .java
package com.mrathena.codec;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIoUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* protostuff 序列化工具类,基于protobuf封装
*/
public class ProtoStuffKit {
private static Map<Class<?>, Schema<?>> map = new ConcurrentHashMap<>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) map.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
map.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*/
public static <T> byte[] serialize(T object) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) object.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIoUtil.toByteArray(object, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*/
public static <T> T deserialize(byte[] data, Class<T> clazz) {
try {
T object = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIoUtil.mergeFrom(data, object, schema);
return object;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static void main(String[] args) {
byte[] userBytes = ProtoStuffKit.serialize(new User(1L, "空幻"));
User user = ProtoStuffKit.deserialize(userBytes, User.class);
System.out.println(user);
}
}
User.java
package com.mrathena.codec;
import lombok.*;
import java.io.Serializable;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String username;
}
拆包粘包
说明
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
举例说明,一个汉字在utf8编码下占3个字节,但是tcp发送字节可能会在任何地方分段,很可能就在一个字符的3个字节中分段了,这时如果转换为String,就会出现乱码. 所以必须有一种机制能分辨传输是否完成
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
解决方案
- 消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
- 在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。
- 发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
- FixedLengthFrameDecoder(固定长度报文来分包)
- LineBasedFrameDecoder (回车换行分包)
- DelimiterBasedFrameDecoder(特殊分隔符分包)
- 还有我们自行实现的数据长度和数据一起发送的Decoder,据说netty也提供了这种方式,但是不太好用
NettyServerTest.java
package com.mrathena.split.stick;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerTest {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 回车换行分包,该MessagetoByteDecoder操作的是ByteBuf,应该放在转StringDecoder之前
// pipeline.addLast(new LineBasedFrameDecoder(1024,true,true));
// pipeline.addLast(new StringDecoder());
// 特殊符号分包,应该放在转StringDecoder之前
// pipeline.addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("_",StandardCharsets.UTF_8)));
// pipeline.addLast(new StringDecoder());
// 定长换行分包,应该放在转StringDecoder之前
// pipeline.addLast(new FixedLengthFrameDecoder("Hello,World!".length()));
// pipeline.addLast(new StringDecoder());
// 定长换行分包,定长100,发送时不足的话自行补满,有必要的话可以服务端去除两端空格,应该放在转StringDecoder之前
// pipeline.addLast(new FixedLengthFrameDecoder(100));
// pipeline.addLast(new StringDecoder());
// 自定义LengthAndContentProtocol分包,该MessagetoByteDecoder将ByteBuf转成LengthAndContentProtocol对象,为了方便,顺便再转成String
pipeline.addLast(new LengthAndContentProtocolBytetoMessageDecoder());
// 最终接受String消息的处理器
pipeline.addLast(new ServerStringChannelInboundHandler());
}
});
log.info("Netty Server Started");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (Throwable cause) {
log.error("", cause);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Slf4j
class ServerStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
}
}
NettyClientTest.java
package com.mrathena.split.stick;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class NettyClientTest {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 将处理器中要发送的String转成ByteBuf
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new ClientStringChannelInboundHandler());
// 将处理器中要发送的LengthAndContentProtocol转成ByteBuf
pipeline.addLast(new LengthAndContentProtocolMessagetoByteEncoder());
pipeline.addLast(new ClientCustomChannelInboundHandler());
}
});
log.info("Netty Client Started");
ChannelFuture channelFuture = bootstrap.connect("localhost", cause);
} finally {
group.shutdownGracefully();
}
}
}
@Slf4j
class ClientCustomChannelInboundHandler extends SimpleChannelInboundHandler<LengthAndContentProtocol> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
String message = "Hello,World!";
// 自定义LengthAndContentProtocol分包
ctx.writeAndFlush(new LengthAndContentProtocol(message.getBytes(StandardCharsets.UTF_8)));
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LengthAndContentProtocol msg) throws Exception {
log.info("{}", msg);
}
}
@Slf4j
class ClientStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
String message = "Hello,World!";
// 回车换行分包
// ctx.writeAndFlush(message + "\r\n");
// 特殊符号分包
// ctx.writeAndFlush(message + "_");
// 定长换行分包
// ctx.writeAndFlush(message);
// 定长换行分包,发送时不足的话自行补满
int length = message.length();
for (int j = 0; j < 100 - length; j++) {
message = message.concat(" ");
}
ctx.writeAndFlush(message);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
}
}
LengthAndContentProtocol.java
package com.mrathena.split.stick;
import lombok.*;
import java.io.Serializable;
@Getter
@Setter
@ToString
public class LengthAndContentProtocol implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息的长度
*/
private int length;
/**
* 消息的内容
*/
private byte[] content;
public LengthAndContentProtocol(byte[] content) {
this.content = content;
this.length = content.length;
}
}
LengthAndContentProtocolBytetoMessageDecoder.java
package com.mrathena.split.stick;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.BytetoMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* ByteBuf转LengthAndContentProtocol
*/
@Slf4j
public class LengthAndContentProtocolBytetoMessageDecoder extends BytetoMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, List<Object> out) throws Exception {
log.info("{}", in);
// 不够一个int则先不读取
if (in.readableBytes() < 4) {
return;
}
// 够一个int了,但是content还不够
// in.readInt()会导致readerIndex后移4位,如果后续内容不够length的话,还需要下次重新读取该长度,或者把该长度保存在一个成员变量中
// in.getInt(in.readerIndex())可以拿到当前最近的一个int,而且不会导致readerIndex后移
int length = in.getInt(in.readerIndex());
if (in.readableBytes() < 4 + length) {
return;
}
// int和content都够了
length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
// 返回LengthAndContentProtocol
LengthAndContentProtocol message = new LengthAndContentProtocol(content);
// out.add(message);
// 也可以直接返回string
out.add(new String(content));
}
}
LengthAndContentProtocolMessagetoByteEncoder.java
package com.mrathena.split.stick;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessagetoByteEncoder;
/**
* LengthAndContentProtocol转ByteBuf
*/
public class LengthAndContentProtocolMessagetoByteEncoder extends MessagetoByteEncoder<LengthAndContentProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, LengthAndContentProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
}
}
心跳机制
说明
所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性
在 Netty 中,实现心跳机制的关键是 IdleStateHandler,服务端添加该处理器(配置读超时3秒),客户端定时发送心跳(间隔1秒),如果读超时时间间隔内服务端既没有收到业务消息,也没有收到心跳消息,则说明channel可能存在问题,服务端触发并处理读超时事件
IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds)
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
这里解释下三个参数的含义
- readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码
pipeline.addLast(new IdleStateHandler(3, 0, TimeUnit.SECONDS));
跟随源码,发现在 handlerAdded 和 channelRegistered 和 channelActive 的时候都会触发 initialize 方法,判断 readerIdleTimeNanos > 0 就用线程池延迟 readerIdleTimeNanos 执行 new ReaderIdleTimeoutTask(ctx) 任务,
ReaderIdleTimeoutTask 类的 run 方法
// 假如 readerIdleTimeNanos 用的是一秒,这里值应该是1seconds对应的nanos
protected void run(ChannelHandlerContext ctx) {
// 不管是否正在读,先置 下次延迟时间 为1秒的纳秒
long nextDelay = readerIdleTimeNanos;
// 表示当前是否正在读取数据,reading 在 channelRead 时置 true,在 channelReadComplete 时置 false
if (!reading) {
// 如果当前没有正在读,则重置下次延迟时间
// nextDelay = nextDelay - (ticksInNanos() - lastReadTime)
// 初始化时,lastReadTime = ticksInNanos(); 每次 channelReadComplete 时 lastReadTime = ticksInNanos();
// (ticksInNanos() - lastReadTime),就是本次定时任务执行时,获取当前时间和上次读完成的时间间隔
// 然后用 读空闲超时时间 减这个差值,如果是负数,说明差值已经超过了读空闲时间,本次执行直接算作是一次超时,在 读空闲超时时间 后执行下次任务
// 如果是正数,说明 当前时间与上次读完成的差值 还没有达到 读空闲超时时间,需要再等一会儿再执行下次任务,而计算完的 nextDelay 正是这个间隔
nextDelay -= ticksInNanos() - lastReadTime;
}
// 本次已经超时了
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 构建一个 READER_IDLE 事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// ctx.fireUserEventTriggered(evt);
// netty中,所有 fire开头的方法都是调用下一个handler的对应方法
// 将本事件传给下一个Handler,并调用其userEventTriggered方法来处理该事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, nextDelay, TimeUnit.NANOSECONDS);
}
}
NettyServerTest.java
package com.mrathena.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.IdleState;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NettyServerTest {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// IdleStateHandler
// 如果readerIdleTime后还没有读取到数据(任何数据(包括心跳)),会触发IdleStateEvent事件,并且交给下一个handler来处理,其必须实现userEventTriggered方法来处理该事件
// 客户端心跳间隔是1秒,为了防止误触发,这里读超时时间要稍微大点,这里是3秒,3秒内理论上能有3次心跳了,如果一个都没有,则认为读超时了,触发一次读空闲事件
pipeline.addLast(new IdleStateHandler(3, TimeUnit.SECONDS));
pipeline.addLast(new ServerStringChannelInboundHandler());
}
});
log.info("Netty Server Started");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} catch (Throwable cause) {
log.error("", cause);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Slf4j
class ServerStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
/**
* 读空闲次数
*/
private Long readIdleTimes = 0L;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// msg可能是心跳消息也可能是业务消息
readIdleTimes = 0L;
if ("heartbeat".equals(msg)) {
ctx.writeAndFlush("ok");
} else {
// 收到业务消息可以重置一下读空闲次数
log.info(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (IdleState.READER_IDLE.equals(event.state())) {
// 读空闲的计数加1
readIdleTimes++;
} else {
return;
}
log.info("超时事件:读空闲,{}", readIdleTimes);
if (readIdleTimes > 10) {
log.info("读空闲已连续10次,关闭连接");
ctx.channel().writeAndFlush("close");
ctx.channel().close();
}
}
}
NettyClientTest.java
package com.mrathena.heartbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientTest {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ClientStringChannelInboundHandler());
}
});
log.info("Netty Client Started");
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
Channel channel = channelFuture.channel();
String heartbeat = "heartbeat";
while (channel.isActive()) {
Thread.sleep(1000);
channel.writeAndFlush(heartbeat);
}
} catch (Throwable cause) {
log.error("", cause);
} finally {
group.shutdownGracefully();
}
}
}
@Slf4j
class ClientStringChannelInboundHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("Hello Server");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
if (msg != null && msg.equals("close")) {
log.info("服务端连接已关闭,客户端随之关闭");
ctx.channel().closeFuture();
}
}
}
断线重连
说明
- 客户端启动连接服务端时,如果网络或服务端有问题,客户端连接失败,可以重连,重连的逻辑加在客户端。
- 系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连
NettyServer.java
package com.mrathena.reconnect;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerByteBufChannelInboundHandler());
}
});
log.info("Netty Server Started");
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Slf4j
class ServerByteBufChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(((ByteBuf) msg).toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Client".getBytes(CharsetUtil.UTF_8)));
}
}
NettyClient.java
package com.mrathena.reconnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* 实现了重连的客户端
*/
@Slf4j
public class NettyClient {
private final String host;
private final int port;
private final Bootstrap bootstrap;
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient("localhost", 8888);
nettyClient.connect();
}
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
this.bootstrap = new Bootstrap();
this.bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// NettyClient.this,this指的是当前正在访问这段代码的对象,当在内部类中使用this指的就是内部类的对象,
// 为了访问外层类对象,就可以使用外层类名.this来访问,一般也只在这种情况下使用这种形式
channel.pipeline().addLast(new ClientByteBufChannelInboundHandler(NettyClient.this));
}
});
log.info("Netty Client Started");
}
public void connect() throws Exception {
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// 重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
log.info("尝试连接服务端...");
try {
connect();
} catch (Exception e) {
e.printstacktrace();
}
}, 1000, TimeUnit.MILLISECONDS);
} else {
log.info("连接成功");
}
}
});
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
}
}
ClientByteBufChannelInboundHandler.java
package com.mrathena.reconnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ClientByteBufChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final NettyClient nettyClient;
public ClientByteBufChannelInboundHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 当客户端连接服务器完成就会触发该方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server".getBytes(CharsetUtil.UTF_8)));
}
// 当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(((ByteBuf) msg).toString(CharsetUtil.UTF_8));
}
// channel 处于不活动状态时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("连接断开,尝试重新连接服务端...");
nettyClient.connect();
}
}
重写聊天室
BytetoStringDecoder.java
package com.mrathena.chat.optimize;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.BytetoMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Byte转String,使用 长度加内容 的方式
*/
@Slf4j
public class BytetoStringDecoder extends BytetoMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, List<Object> out) throws Exception {
// 不够一个int则先不读取
if (in.readableBytes() < 4) {
return;
}
// 够一个int了,而且不会导致readerIndex后移
int length = in.getInt(in.readerIndex());
if (in.readableBytes() < 4 + length) {
return;
}
// int和content都够了
length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
// 返回string
out.add(new String(content));
}
}
StringToByteEncoder.java
package com.mrathena.chat.optimize;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessagetoByteEncoder;
import java.nio.charset.StandardCharsets;
/**
* String转Byte,使用 长度加内容 的方式
*/
public class StringToByteEncoder extends MessagetoByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
NewChatNettyServer.java
package com.mrathena.chat.optimize;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NewChatNettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 入向的byte转String
channel.pipeline().addLast(new BytetoStringDecoder());
// 出向的String转byte
channel.pipeline().addLast(new StringToByteEncoder());
// 心跳检测,3秒内没收到心跳算作一次失败,连续3次失败,服务端认为连接断开,主动关闭连接
channel.pipeline().addLast(new IdleStateHandler(3, TimeUnit.SECONDS));
// 心跳超时处理器
channel.pipeline().addLast(new HeartbeatTimeoutHandler());
// 业务处理器
channel.pipeline().addLast(new ServerBusinessHandler());
}
});
log.info("Netty Server Started");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
@Slf4j
class HeartbeatTimeoutHandler extends SimpleChannelInboundHandler<String> {
/**
* 心跳检测失败次数,主动关闭连接
*/
private Long heartbeatCheckFailureTimes = 0L;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 消息可能是心跳消息也可能是业务消息
// 收到消息即可重置一下心跳检测失败次数
heartbeatCheckFailureTimes = 0L;
// 心跳消息不处理
if (!"heartbeat".equals(msg)) {
// 业务消息转给下一个业务处理器处理
ctx.fireChannelRead(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (IdleState.READER_IDLE.equals(event.state())) {
// 读空闲的计数加1
heartbeatCheckFailureTimes++;
} else {
return;
}
log.info("心跳检测失败,当前连续失败次数: {}", heartbeatCheckFailureTimes);
if (heartbeatCheckFailureTimes > 3) {
log.info("心跳检测失败已达3次,关闭连接");
ctx.channel().writeAndFlush("close");
ctx.channel().close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("", cause);
ctx.close();
}
}
@Slf4j
class ServerBusinessHandler extends SimpleChannelInboundHandler<String> {
private static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
SocketAddress address = channel.remoteAddress();
log.info("{} Join", address);
GROUP.writeAndFlush("[系统通知] " + address + " 上线了");
channel.writeAndFlush("[系统通知] Hello " + address + ",Welcome to Join");
GROUP.add(channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
SocketAddress address = channel.remoteAddress();
log.info("{} Leave", address);
GROUP.remove(channel);
GROUP.writeAndFlush("[系统通知] " + address + " 下线了");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception {
Channel channel = ctx.channel();
SocketAddress address = channel.remoteAddress();
log.info("{}: {}", address, message);
GROUP.forEach(itemChannel -> {
if (channel != itemChannel) {
itemChannel.writeAndFlush(address + " " + message);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, cause);
ctx.close();
}
}
NewChatNettyClient.java
package com.mrathena.chat.optimize;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NewChatNettyClient {
private static final ExecutorService POOL = Executors.newFixedThreadPool(2);
private final String host;
private final int port;
private final Bootstrap bootstrap;
public static void main(String[] args) throws Exception {
NewChatNettyClient chatNettyClient = new NewChatNettyClient("localhost", 8888);
chatNettyClient.connect();
}
public NewChatNettyClient(String host, int port) {
this.host = host;
this.port = port;
this.bootstrap = new Bootstrap();
this.bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 入向的byte转String
channel.pipeline().addLast(new BytetoStringDecoder());
// 出向的String转byte
channel.pipeline().addLast(new StringToByteEncoder());
// NettyClient.this,一般也只在这种情况下使用这种形式
channel.pipeline().addLast(new ClientBusinessHandler(NewChatNettyClient.this));
}
});
log.info("Netty Client Started");
}
public void connect() throws Exception {
// 重新连接
ChannelFuture channelFuture = bootstrap.connect(host, port);
channelFuture.addListener((ChannelFutureListener) future -> {
Channel channel = future.channel();
if (!future.isSuccess()) {
channel.eventLoop().schedule(() -> {
log.info("Connecting ...");
try {
connect();
} catch (Exception e) {
e.printstacktrace();
}
}, 1, TimeUnit.SECONDS);
} else {
log.info("Connection Established");
// 启动心跳线程
POOL.submit(() -> {
try {
while (channel.isActive()) {
channel.writeAndFlush("heartbeat");
TimeUnit.SECONDS.sleep(1);
}
} catch (Throwable cause) {
log.error("", cause);
}
});
// 启动消息线程
POOL.submit(() -> {
try {
Scanner scanner = new Scanner(System.in);
while (channel.isActive()) {
String message = scanner.nextLine();
if (StringUtils.isNotBlank(message)) {
channel.writeAndFlush(message);
}
}
} catch (Throwable cause) {
log.error("", cause);
}
});
}
});
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
}
}
@Slf4j
class ClientBusinessHandler extends SimpleChannelInboundHandler<String> {
private final NewChatNettyClient chatNettyClient;
public ClientBusinessHandler(NewChatNettyClient chatNettyClient) {
this.chatNettyClient = chatNettyClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info(msg);
}
/**
* 监控连接后断开重连
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Lose Connection,Try Connect Again");
chatNettyClient.connect();
}
}
补充 handler 调用顺序
handler的生命周期回调接口调用顺序
handlerAdded > channelRegistered > channelActive > channelRead > channelReadComplete > channelInactive > channelUnRegistered > handlerRemoved
- handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调;
- channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。
- channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线程上,说明该channel准备就绪,可以使用了,NioServerSocketChannel在绑定端口后调用,NioSocketChannel在register0中isActive()时调用。
- channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读;
- channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;
- channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。
- channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;
- handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。