基于netty的心跳检测功能

2016-11-25 09:46:00|?次阅读|上传:jifeng520qq【已有?条评论】发表评论

关键词:Java|来源:唯设编程网

基于netty的心跳检测功能  
实现的功能思路:  
1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;  
2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;  
3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;  
4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;  
5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;  
6.只要收到客户端的ping消息,服务端心跳失败计数器清零;  
7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;  
 
服务端代码:  
package com.kg.netty.server;  
import io.netty.bootstrap.ServerBootstrap;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelPipeline;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.SimpleChannelInboundHandler;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioServerSocketChannel;  
import io.netty.handler.codec.serialization.ClassResolvers;  
import io.netty.handler.codec.serialization.ObjectDecoder;  
import io.netty.handler.codec.serialization.ObjectEncoder;  
import io.netty.handler.timeout.IdleState;  
import io.netty.handler.timeout.IdleStateEvent;  
import io.netty.handler.timeout.IdleStateHandler;  
import java.util.concurrent.TimeUnit;  
import com.kg.netty.msg.KeepAliveMessage;  
import com.kg.utils.Constants;  
import com.kg.utils.Utils;  
public class KeepAliveServer {  
    // 端口  
    private int port ;  
    public KeepAliveServer(int port) {  
        this.port = port;  
    }  
      
    ChannelFuture f ;  
      
    ServerBootstrap b ;  
      
    // 设置6秒检测chanel是否接受过心跳数据  
    private static final int READ_WAIT_SECONDS = 6;  
      
    // 定义客户端没有收到服务端的pong消息的最大次数  
    private static final int MAX_UN_REC_PING_TIMES = 3;  
    public void startServer() {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {  
            b = new ServerBootstrap();  
            b.group(bossGroup, workerGroup);  
            b.channel(NioServerSocketChannel.class);  
            b.childHandler(new KeepAliveServerInitializer());  
            // 服务器绑定端口监听  
            f = b.bind(port).sync();  
            // 监听服务器关闭监听,此方法会阻塞  
            f.channel().closeFuture().sync();  
            // 可以简写为  
            /* b.bind(portNumber).sync().channel().closeFuture().sync(); */  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } finally {  
            bossGroup.shutdownGracefully();  
            workerGroup.shutdownGracefully();  
        }  
    }  
    /** 
     * 消息处理器 
     * @author cullen edward 
     */  
    private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {  
        @Override  
        protected void initChannel(SocketChannel ch) throws Exception {  
            ChannelPipeline pipeline = ch.pipeline();  
              
            /* 
             * 使用ObjectDecoder和ObjectEncoder 
             * 因为双向都有写数据和读数据,所以这里需要两个都设置 
             * 如果只读,那么只需要ObjectDecoder即可 
             */  
            pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));  
            pipeline.addLast("encoder", new ObjectEncoder());  
              
            /* 
             * 这里只监听读操作 
             * 可以根据需求,监听写操作和总得操作 
             */  
            pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));  
              
            pipeline.addLast("handler", new Heartbeat());  
        }  
    }  
      
    private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> {   
          
        // 失败计数器:未收到client端发送的ping请求  
        private int unRecPingTimes = 0 ;  
          
        // 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象  
        ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>();   
          
        @Override  
        protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {  
            System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());  
            // 收到ping消息后,回复  
            if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){  
                msg.setReqCode(Constants.RET_CODE);  
                ctx.channel().writeAndFlush(msg);  
                // 失败计数器清零  
                unRecPingTimes = 0;  
                if(localMsgInfo.get()==null){  
                    KeepAliveMessage localMsg = new KeepAliveMessage();  
                    localMsg.setSn(msg.getSn());  
                    localMsgInfo.set(localMsg);  
                    /* 
                     * 这里可以将设备号放入一个集合中进行统一管理 
                     */  
                    // TODO  
                }  
            }else{  
                ctx.channel().close();  
            }  
        }  
          
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
            if (evt instanceof IdleStateEvent) {  
                IdleStateEvent event = (IdleStateEvent) evt;  
                if (event.state() == IdleState.READER_IDLE) {  
                    /*读超时*/  
                    System.out.println("===服务端===(READER_IDLE 读超时)");  
                    // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连  
                    if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){  
                        System.out.println("===服务端===(读超时,关闭chanel)");  
                        // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连  
                        ctx.channel().close();  
                    }else{  
                        // 失败计数器加1  
                        unRecPingTimes++;  
                    }  
                } else if (event.state() == IdleState.WRITER_IDLE) {  
                    /*写超时*/     
                    System.out.println("===服务端===(WRITER_IDLE 写超时)");  
                } else if (event.state() == IdleState.ALL_IDLE) {  
                    /*总超时*/  
                    System.out.println("===服务端===(ALL_IDLE 总超时)");  
                }  
            }  
        }  
          
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
            System.out.println("错误原因:"+cause.getMessage());  
            if(localMsgInfo.get()!=null){  
                /* 
                 * 从管理集合中移除设备号等唯一标示,标示设备离线 
                 */  
                // TODO  
            }  
            ctx.channel().close();  
        }  
        @Override  
        public void channelActive(ChannelHandlerContext ctx) throws Exception {  
            System.out.println("Client active ");  
            super.channelActive(ctx);  
        }  
          
        @Override  
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
            // 关闭,等待重连  
            ctx.close();  
            if(localMsgInfo.get()!=null){  
                /* 
                 * 从管理集合中移除设备号等唯一标示,标示设备离线 
                 */  
                // TODO  
            }  
            System.out.println("===服务端===(客户端失效)");  
        }  
    }  
      
    public void stopServer(){  
        if(f!=null){  
            f.channel().close();  
        }  
    }  
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        KeepAliveServer keepAliveServer = new KeepAliveServer(1666);  
        keepAliveServer.startServer();  
    }  
}  
客户端代码:  
package com.kg.netty.client;  
import io.netty.bootstrap.Bootstrap;  
import io.netty.channel.Channel;  
import io.netty.channel.ChannelHandlerContext;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelPipeline;  
import io.netty.channel.EventLoopGroup;  
import io.netty.channel.SimpleChannelInboundHandler;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioSocketChannel;  
import io.netty.handler.codec.serialization.ClassResolvers;  
import io.netty.handler.codec.serialization.ObjectDecoder;  
import io.netty.handler.codec.serialization.ObjectEncoder;  
import io.netty.handler.timeout.IdleState;  
import io.netty.handler.timeout.IdleStateEvent;  
import io.netty.handler.timeout.IdleStateHandler;  
import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;  
import com.kg.netty.msg.KeepAliveMessage;  
import com.kg.utils.Constants;  
public class KeepAliveClient {  
    private String host ;  
    private int port ;  
      
    private EventLoopGroup group ;  
      
    private Bootstrap b ;  
      
    private Channel ch ;  
      
    // 定义客户端没有收到服务端的pong消息的最大次数  
    private static final int MAX_UN_REC_PONG_TIMES = 3;  
      
    // 多长时间未请求后,发送心跳  
    private static final int WRITE_WAIT_SECONDS = 5;  
      
    // 隔N秒后重连  
    private static final int RE_CONN_WAIT_SECONDS = 5;  
      
    // 客户端连续N次没有收到服务端的pong消息  计数器  
    private int unRecPongTimes = 0 ;  
      
    private ScheduledExecutorService executorService ;  
      
    // 是否停止  
    private boolean isStop = false ;  
    public KeepAliveClient(String host, int port) {  
        this.host = host ;  
        this.port = port ;  
        group = new NioEventLoopGroup();  
        b = new Bootstrap();  
        b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());  
    }  
    public void start() {  
        connServer();  
    }  
      
    private void connServer(){  
          
        isStop = false;  
          
        if(executorService!=null){  
            executorService.shutdown();  
        }  
        executorService = Executors.newScheduledThreadPool(1);  
        executorService.scheduleWithFixedDelay(new Runnable() {  
              
            boolean isConnSucc = true;  
              
            @Override  
            public void run() {  
                try {  
                    // 重置计数器  
                    unRecPongTimes = 0;  
                    // 连接服务端  
                    if(ch!=null&&ch.isOpen()){  
                        ch.close();  
                    }  
                    ch = b.connect(host, port).sync().channel();  
                    // 此方法会阻塞  
//                  ch.closeFuture().sync();  
                    System.out.println("connect server finish");  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    isConnSucc = false ;  
                } finally{  
                    if(isConnSucc){  
                        if(executorService!=null){  
                            executorService.shutdown();  
                        }  
                    }  
                }  
            }  
        }, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);  
    }  
      
    public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {  
           
        @Override  
        protected void initChannel(SocketChannel ch) throws Exception {  
            ChannelPipeline pipeline = ch.pipeline();  
       
            pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));  
            pipeline.addLast("encoder", new ObjectEncoder());  
       
            pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));  
            // 客户端的逻辑  
            pipeline.addLast("handler", new ClientHandler());  
        }  
    }  
    public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {  
           
        @Override  
        protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)  
                throws Exception {  
            System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());  
            if (Constants.RET_CODE == msg.getReqCode()) {  
                // 计数器清零  
                unRecPongTimes = 0;  
            }  
        }  
          
        @Override  
        public void channelActive(ChannelHandlerContext ctx) throws Exception {  
            System.out.println("Client active ");  
            super.channelActive(ctx);  
        }  
        @Override  
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
            System.out.println("Client close ");  
            super.channelInactive(ctx);  
            /* 
             * 重连 
             */  
            if(!isStop){  
                connServer();  
            }  
        }  
          
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
            if (evt instanceof IdleStateEvent) {  
                IdleStateEvent event = (IdleStateEvent) evt;  
                if (event.state() == IdleState.READER_IDLE) {  
                    /*读超时*/  
                    System.out.println("===服务端===(READER_IDLE 读超时)");  
                } else if (event.state() == IdleState.WRITER_IDLE) {  
                    /*写超时*/     
                    System.out.println("===服务端===(WRITER_IDLE 写超时)");  
                    if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){  
                        ctx.channel().writeAndFlush(getSrcMsg()) ;  
                        unRecPongTimes++;  
                    }else{  
                        ctx.channel().close();  
                    }  
                } else if (event.state() == IdleState.ALL_IDLE) {  
                    /*总超时*/  
                    System.out.println("===服务端===(ALL_IDLE 总超时)");  
                }  
            }  
        }  
    }  
      
    private KeepAliveMessage getSrcMsg(){  
        KeepAliveMessage keepAliveMessage = new KeepAliveMessage();  
        // 设备码  
        keepAliveMessage.setSn("sn_123456abcdfef");  
        keepAliveMessage.setReqCode(Constants.REQ_CODE);  
        return keepAliveMessage ;  
    }  
      
    public void stop(){  
        isStop = true;  
        if(ch!=null&&ch.isOpen()){  
            ch.close();  
        }  
        if(executorService!=null){  
            executorService.shutdown();  
        }  
    }  
      
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);  
        keepAliveServer.start();  
    }  
}  
参考网站:  
http://coder.beitown.com/archives/1180  
下载工程,请猛戳  
http://download.csdn.net/detail/asd13141718/8492741  
 
 
 
//===========================================================================================
心跳机制说明:
 
Netty 超时机制及心跳程序实现    http://www.tuicool.com/articles/m2IRZv
本文介绍了 Netty 超时机制的原理,以及如何在连接闲置时发送一个心跳来维持连接。
 
Netty 超时机制的介绍
 
Netty 的超时类型 IdleState 主要分为:
 
ALL_IDLE : 一段时间内没有数据接收或者发送
READER_IDLE : 一段时间内没有数据接收
WRITER_IDLE : 一段时间内没有数据发送
在 Netty 的 timeout 包下,主要类有:
 
IdleStateEvent : 超时的事件
IdleStateHandler : 超时状态处理
ReadTimeoutHandler : 读超时状态处理
WriteTimeoutHandler : 写超时状态处理
其中 IdleStateHandler 包含了读写超时状态处理,比如
 
private static final int READ_IDEL_TIME_OUT = 4; // 读超时
private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
 
new IdleStateHandler(READ_IDEL_TIME_OUT,
            WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); 
上述例子,在 IdleStateHandler 中定义了读超时的时间是 4 秒, 写超时的时间是 5 秒,其他所有的超时时间是 7 秒。
 
应用 IdleStateHandler
 
既然 IdleStateHandler 包括了读写超时状态处理,那么很多时候 ReadTimeoutHandler 、 WriteTimeoutHandler 都可以不用使用。定义另一个名为 HeartbeatHandlerInitializer 的 ChannelInitializer :
 
public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {
  private static final int READ_IDEL_TIME_OUT = 4; // 读超时
  private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
  private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
  @Override
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
        WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // 1
    pipeline.addLast(new HeartbeatServerHandler()); // 2
  }
}
 
使用了 IdleStateHandler ,分别设置了读、写超时的时间
定义了一个 HeartbeatServerHandler 处理器,用来处理超时时,发送心跳
定义了一个心跳处理器
 
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
  // Return a unreleasable view on the given ByteBuf
  // which will just ignore release and retain calls.
  private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
      .unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
          CharsetUtil.UTF_8));  // 1
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
      throws Exception {
    if (evt instanceof IdleStateEvent) {  // 2
      IdleStateEvent event = (IdleStateEvent) evt;  
      String type = "";
      if (event.state() == IdleState.READER_IDLE) {
        type = "read idle";
      } else if (event.state() == IdleState.WRITER_IDLE) {
        type = "write idle";
      } else if (event.state() == IdleState.ALL_IDLE) {
        type = "all idle";
      }
      ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
          ChannelFutureListener.CLOSE_ON_FAILURE);  // 3
      System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }
}
 
定义了心跳时,要发送的内容
判断是否是 IdleStateEvent 事件,是则处理
将心跳内容发送给客户端
服务器
 
服务器代码比较简单,启动后侦听 8082 端口
 
public final class HeartbeatServer {
  static final int PORT = 8082;
  public static void main(String[] args) throws Exception {
    // Configure the server.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 100)
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new HeartbeatHandlerInitializer());
      // Start the server.
      ChannelFuture f = b.bind(PORT).sync();
      // Wait until the server socket is closed.
      f.channel().closeFuture().sync();
    } finally {
      // Shut down all event loops to terminate all threads.
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
}
 
客户端测试
 
客户端用操作系统自带的 Telnet 程序即可:
telnet 127.0.0.1 8082
 
 
注意事项:Netty4服务端心跳机制
Netty4与Netty3.x的心跳机制略有不同,在Netty4中已经去掉了IdleStateAwareChannelHandler这个类,但IdleStateHandler依旧保留,只是心跳超时的触发事件的写法略有不同,Netty底层实现了一套类似信号和槽的事件通信机制。
这里且看实现。

 

发表评论0条 】
网友评论(共?条评论)..
基于netty的心跳检测功能