Netty-实验
创始人
2024-01-22 05:27:26
0

Netty应用实例-群聊系统

实例要求:

(1)编写一个Netty群聊系统,实现服务端和客户端之间的数据简单通讯(非阻塞)
(2)实现多人群聊
(3)服务器端:可以监视用户上线,离线,并实现消息转发功能
(4)客户端:通过channel可以物阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发的得到)
(5)目的:进一步理解Netty非阻塞网络编程机制

拆解过程:

首先我们建立GroupChatServer端,然后重写对应的具体业务处理的handler,在handler中具体需要捕捉的是客户端channel的上线、下线、具体对应的就是handlerAdded、handlerRemoved、channelActive、channelInactive,分别是处理发送给其他客户端的消息和服务器捕捉发送消息,最重要的就是读消息并转发,对于发送消息的channel的处理和其他客户端不一样。

然后我们建立GroupChatClient端,不同的就是我们的消息需要响应输入,然后发送,对于handler只需要读取消息打印即可。

GroupchatServer实现:
package com.sgg.Netty.GroupChat;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class GroupchatServer {private int port;public GroupchatServer(int port){this.port = port;}public void run() throws InterruptedException {//创建线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();try{b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数.childOption(ChannelOption.SO_KEEPALIVE,true)    //设置保持活动连接状态.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("decoder",new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast(new GroupchatServerhandler());}});System.out.println("服务器启动");ChannelFuture cf = b.bind(port).sync();cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if(cf.isSuccess()){System.out.println("监听端口成功");}else{System.out.println("监听端口失败");}}});cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new GroupchatServer(8847).run();}
}
GroupchatServerhandler实现:
package com.sgg.Netty.GroupChat;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat;public class GroupchatServerhandler extends SimpleChannelInboundHandler {//定义一个channel组,管理所有的channelprivate static ChannelGroup channelGroup= new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);SimpleDateFormat sdf = new SimpleDateFormat("yyyy--mm-dd HH:mm:ss");/***处理所有在线客户端某新客户加入消息*///handlerAdded 表示连接建立,一旦连接,第一个被执行//将当前channelchannelGroup@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//将该客户端加入聊天的消息推送给其他在线的客户端//该方法会将channelGroup中所有的channel遍历,并发送消息channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");channelGroup.add(channel);}/***处理所有在线客户端某新客户离开的消息*///断开连接,将XX酷互动离开的信息推送给当前所有在线的客户//出发该方法时ChannelGroup中会自动删除该channenl,不需手动删除@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//将该客户端离开聊天的消息推送给其他在线的客户端//该方法会将channelGroup中所有的channel遍历,并发送消息channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天\n");}/***处理服务端显示新客户上线消息*///表示channel处于活动状态,提示XX上线@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+"上线了");}/***处理服务端显示新客户上下线消息*///表示channel处于不活动状态,提示XX下线@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+"下线了~");}//读取数据并转发给当前在线的所有人@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {Channel channel = ctx.channel();channelGroup.forEach(ch->{if(channel!=ch){ch.writeAndFlush("[客户】"+channel.remoteAddress()+"发送了消息"+s+"\n");}else{ch.writeAndFlush("[自己]发送了消息"+s+"\n");}});}}
GroupchatClient实现:
package com.sgg.Netty.GroupChat;import com.sgg.Netty.simple.NettyClienthandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;public class GroupchatClient {private final String host;private final int port;public GroupchatClient(String host, int port){this.host = host;this.port = port;}public void run() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("decoder",new StringDecoder());pipeline.addLast("encoder",new StringEncoder());pipeline.addLast(new GroupchatCinenthandler());//加入自己的handler}});
//            System.out.println("客户端OK");ChannelFuture channelFuture = bootstrap.connect(host,port).sync();Channel channel = channelFuture.channel();System.out.println("---------"+channel.localAddress()+"--------");//客户端需要输入信息Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String msg = scanner.nextLine();//通过channel发送出去channel.writeAndFlush(msg+"\r\n");}}finally {//关闭group.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new GroupchatClient("127.0.0.1",8847).run();}
}
GroupchatClienthandler实现:
package com.sgg.Netty.GroupChat;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class GroupchatCinenthandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {System.out.println(s.trim());}
}

到此,我们实现了群聊系统如下:
GroupchatServer
在这里插入图片描述
GrouchatClient1
在这里插入图片描述
GrouchatClient2
在这里插入图片描述
GrouchatClient3
在这里插入图片描述

Netty心跳机制实验

我们先来看看心跳机制的产生
我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。

那么心跳机制可以用来做什么呢?

我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。

如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据(可能是空包,也可能是特殊数据),对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。

1. 如何实现心跳机制

一般实现心跳机制由两种方式:

TCP协议自带的心跳机制来实现;
在应用层来实现。
但是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那如果我们要是使用UDP协议岂不是用不了?所以一般我们都不用。

而一般我们自己实现呢大致的策略是这样的:

Client启动一个定时器,不断发送心跳;
Server收到心跳后,做出回应;
Server启动一个定时器,判断Client是否存在,这里做判断有两种方法:时间差和简单标识。
时间差:

收到一个心跳包之后记录当前时间;
判断定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。如果改时间大于设定值则认为超时。
简单标识:

收到心跳后设置连接标识为true;
判断定时器到达时间,如果未收到心跳则设置连接标识为false;

Netty中的心跳机制的实现:

我们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。
该类可以对三种类型的超时做心跳机制检测:

  • readerIdleTimeSeconds:设置读超时时间;
  • writerIdleTimeSeconds:设置写超时时间;
  • allIdleTimeSeconds:同时为读或写设置超时时间;

下面我们进行心跳机制的实验,我们只需要写服务端即可,客户端我们可以用上面的群聊实验中的客户端即可。
MyServer:

package com.sgg.Netty.HeatBeat;import com.sgg.Netty.simple.NettyServerhandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class MyServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try{//使用链式变成来进行设置bootstrap.group(bossGroup,workerGroup)       //设置两个线程组.channel(NioServerSocketChannel.class)     //使用NioServerSocketChannel作为服务器的通道实现.option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数.childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {         //创建一个通道测试对象//给pipeline设置处理器@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//加入一个Netty提供的IdleStateHandler/*** 1、IdleStateHandler是netty提供的处理空闲状态的处理器* 2、long readerIdleTime: 表示多长时间没有读,就会发送一个心跳监测包检测是否连接* 3、long writerIdleTime: 表示多长时间没有写,就会发送一个心跳监测包检测是否连接* 4、long allIdleTime: 表示多长时间没有读和写,就会发送一个心跳监测包检测是否连接* 5、当IdleStateHandler触发后,就会传递给管道的下一个handler去处理,* 通过调用(触发)下一个handler的userEventTiggered,在该方法中去处理*/pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));pipeline.addLast(new MyServerhandler());}});ChannelFuture cf = bootstrap.bind(8847).sync();//给cf注册监听器,监控我们关心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if(cf.isSuccess()){System.out.println("监听端口成功");}else{System.out.println("监听端口失败");}}});//对关闭通道进行监听cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

MyServerhandler:

package com.sgg.Netty.HeatBeat;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;public class MyServerhandler extends ChannelInboundHandlerAdapter {/*** 心跳机制触发后的处理函数* @param ctx 上下文* @param evt 心跳机制发生的事件* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){//将evt向下转型 IdleStateEventIdleStateEvent event  = (IdleStateEvent)evt;String eventType = null;switch (event.state()){case READER_IDLE:eventType = "读空闲";break;case WRITER_IDLE:eventType = "写空闲";break;case ALL_IDLE:eventType = "读写空闲";break;}System.out.println(ctx.channel().remoteAddress()+"------"+eventType);}}
}

实验结果:
在这里插入图片描述
我们在handler中实现的userEventTriggered方法中我们可以捕捉到空闲事件,后续根据需要即可进行响应的处理

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...