服务端代码:
package com.kg.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
import com.kg.netty.msg.KeepAliveMessage;
import com.kg.utils.Constants;
import com.kg.utils.Utils;
public class KeepAliveServer {
// 端口
private int port ;
public KeepAliveServer(int port) {
this.port = port;
}
ChannelFuture f ;
ServerBootstrap b ;
// 设置6秒检测chanel是否接受过心跳数据
private static final int READ_WAIT_SECONDS = 6;
// 定义客户端没有收到服务端的pong消息的最大次数
private static final int MAX_UN_REC_PING_TIMES = 3;
public void startServer() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new KeepAliveServerInitializer());
// 服务器绑定端口监听
f = b.bind(port).sync();
// 监听服务器关闭监听,此方法会阻塞
f.channel().closeFuture().sync();
// 可以简写为
/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 消息处理器
* @author cullen edward
*/
private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/*
* 使用ObjectDecoder和ObjectEncoder
* 因为双向都有写数据和读数据,所以这里需要两个都设置
* 如果只读,那么只需要ObjectDecoder即可
*/
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
pipeline.addLast("encoder", new ObjectEncoder());
/*
* 这里只监听读操作
* 可以根据需求,监听写操作和总得操作
*/
pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
pipeline.addLast("handler", new Heartbeat());
}
}
private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> {
// 失败计数器:未收到client端发送的ping请求
private int unRecPingTimes = 0 ;
// 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象
ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
// 收到ping消息后,回复
if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){
msg.setReqCode(Constants.RET_CODE);
ctx.channel().writeAndFlush(msg);
// 失败计数器清零
unRecPingTimes = 0;
if(localMsgInfo.get()==null){
KeepAliveMessage localMsg = new KeepAliveMessage();
localMsg.setSn(msg.getSn());
localMsgInfo.set(localMsg);
/*
* 这里可以将设备号放入一个集合中进行统一管理
*/
// TODO
}
}else{
ctx.channel().close();
}
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
/*读超时*/
System.out.println("===服务端===(READER_IDLE 读超时)");
// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
System.out.println("===服务端===(读超时,关闭chanel)");
// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
ctx.channel().close();
}else{
// 失败计数器加1
unRecPingTimes++;
}
} else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
System.out.println("===服务端===(WRITER_IDLE 写超时)");
} else if (event.state() == IdleState.ALL_IDLE) {
/*总超时*/
System.out.println("===服务端===(ALL_IDLE 总超时)");
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("错误原因:"+cause.getMessage());
if(localMsgInfo.get()!=null){
/*
* 从管理集合中移除设备号等唯一标示,标示设备离线
*/
// TODO
}
ctx.channel().close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 关闭,等待重连
ctx.close();
if(localMsgInfo.get()!=null){
/*
* 从管理集合中移除设备号等唯一标示,标示设备离线
*/
// TODO
}
System.out.println("===服务端===(客户端失效)");
}
}
public void stopServer(){
if(f!=null){
f.channel().close();
}
}
/**
* @param args
*/
public static void main(String[] args) {
KeepAliveServer keepAliveServer = new KeepAliveServer(1666);
keepAliveServer.startServer();
}
}
客户端代码:
package com.kg.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.kg.netty.msg.KeepAliveMessage;
import com.kg.utils.Constants;
public class KeepAliveClient {
private String host ;
private int port ;
private EventLoopGroup group ;
private Bootstrap b ;
private Channel ch ;
// 定义客户端没有收到服务端的pong消息的最大次数
private static final int MAX_UN_REC_PONG_TIMES = 3;
// 多长时间未请求后,发送心跳
private static final int WRITE_WAIT_SECONDS = 5;
// 隔N秒后重连
private static final int RE_CONN_WAIT_SECONDS = 5;
// 客户端连续N次没有收到服务端的pong消息 计数器
private int unRecPongTimes = 0 ;
private ScheduledExecutorService executorService ;
// 是否停止
private boolean isStop = false ;
public KeepAliveClient(String host, int port) {
this.host = host ;
this.port = port ;
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());
}
public void start() {
connServer();
}
private void connServer(){
isStop = false;
if(executorService!=null){
executorService.shutdown();
}
executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(new Runnable() {
boolean isConnSucc = true;
@Override
public void run() {
try {
// 重置计数器
unRecPongTimes = 0;
// 连接服务端
if(ch!=null&&ch.isOpen()){
ch.close();
}
ch = b.connect(host, port).sync().channel();
// 此方法会阻塞
// ch.closeFuture().sync();
System.out.println("connect server finish");
} catch (Exception e) {
e.printStackTrace();
isConnSucc = false ;
} finally{
if(isConnSucc){
if(executorService!=null){
executorService.shutdown();
}
}
}
}
}, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
}
public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));
// 客户端的逻辑
pipeline.addLast("handler", new ClientHandler());
}
}
public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)
throws Exception {
System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
if (Constants.RET_CODE == msg.getReqCode()) {
// 计数器清零
unRecPongTimes = 0;
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
super.channelInactive(ctx);
/*
* 重连
*/
if(!isStop){
connServer();
}
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
/*读超时*/
System.out.println("===服务端===(READER_IDLE 读超时)");
} else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
System.out.println("===服务端===(WRITER_IDLE 写超时)");
if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){
ctx.channel().writeAndFlush(getSrcMsg()) ;
unRecPongTimes++;
}else{
ctx.channel().close();
}
} else if (event.state() == IdleState.ALL_IDLE) {
/*总超时*/
System.out.println("===服务端===(ALL_IDLE 总超时)");
}
}
}
}
private KeepAliveMessage getSrcMsg(){
KeepAliveMessage keepAliveMessage = new KeepAliveMessage();
// 设备码
keepAliveMessage.setSn("sn_123456abcdfef");
keepAliveMessage.setReqCode(Constants.REQ_CODE);
return keepAliveMessage ;
}
public void stop(){
isStop = true;
if(ch!=null&&ch.isOpen()){
ch.close();
}
if(executorService!=null){
executorService.shutdown();
}
}
/**
* @param args
*/
public static void main(String[] args) {
KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);
keepAliveServer.start();
}
}
参考网站:
http://coder.beitown.com/archives/1180
下载工程,请猛戳
http://download.csdn.net/detail/asd13141718/8492741
//===========================================================================================
心跳机制说明:
Netty 超时机制及心跳程序实现 http://www.tuicool.com/articles/m2IRZv
本文介绍了 Netty 超时机制的原理,以及如何在连接闲置时发送一个心跳来维持连接。
Netty 超时机制的介绍
Netty 的超时类型 IdleState 主要分为:
ALL_IDLE : 一段时间内没有数据接收或者发送
READER_IDLE : 一段时间内没有数据接收
WRITER_IDLE : 一段时间内没有数据发送
在 Netty 的 timeout 包下,主要类有:
IdleStateEvent : 超时的事件
IdleStateHandler : 超时状态处理
ReadTimeoutHandler : 读超时状态处理
WriteTimeoutHandler : 写超时状态处理
其中 IdleStateHandler 包含了读写超时状态处理,比如
private static final int READ_IDEL_TIME_OUT = 4; // 读超时
private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
new IdleStateHandler(READ_IDEL_TIME_OUT,
WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS));
上述例子,在 IdleStateHandler 中定义了读超时的时间是 4 秒, 写超时的时间是 5 秒,其他所有的超时时间是 7 秒。
应用 IdleStateHandler
既然 IdleStateHandler 包括了读写超时状态处理,那么很多时候 ReadTimeoutHandler 、 WriteTimeoutHandler 都可以不用使用。定义另一个名为 HeartbeatHandlerInitializer 的 ChannelInitializer :
public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {
private static final int READ_IDEL_TIME_OUT = 4; // 读超时
private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // 1
pipeline.addLast(new HeartbeatServerHandler()); // 2
}
}
使用了 IdleStateHandler ,分别设置了读、写超时的时间
定义了一个 HeartbeatServerHandler 处理器,用来处理超时时,发送心跳
定义了一个心跳处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
// Return a unreleasable view on the given ByteBuf
// which will just ignore release and retain calls.
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8)); // 1
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) { // 2
IdleStateEvent event = (IdleStateEvent) evt;
String type = "";
if (event.state() == IdleState.READER_IDLE) {
type = "read idle";
} else if (event.state() == IdleState.WRITER_IDLE) {
type = "write idle";
} else if (event.state() == IdleState.ALL_IDLE) {
type = "all idle";
}
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE); // 3
System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
定义了心跳时,要发送的内容
判断是否是 IdleStateEvent 事件,是则处理
将心跳内容发送给客户端
服务器
服务器代码比较简单,启动后侦听 8082 端口
public final class HeartbeatServer {
static final int PORT = 8082;
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HeartbeatHandlerInitializer());
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端测试
客户端用操作系统自带的 Telnet 程序即可:
telnet 127.0.0.1 8082
注意事项:Netty4服务端心跳机制
Netty4与Netty3.x的心跳机制略有不同,在Netty4中已经去掉了IdleStateAwareChannelHandler这个类,但IdleStateHandler依旧保留,只是心跳超时的触发事件的写法略有不同,Netty底层实现了一套类似信号和槽的事件通信机制。
这里且看实现。