SpringBoot简单整合Netty实现一个聊天室
今天花了点时间研究下 SpringBoot 整合 Netty,因为现在公司用的框架就有 SpringBoot,感觉我现在依赖上了 SpringBoot…….
在 SpringBoot整合 Netty 的基础上实现一个聊天室这么个小东西。废话不多说直接开始。
Maven 依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.6.Final</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency >
application.yml 1 2 3 4 5 netty: port: 7788 websocket: path: /ws
NettyConfig 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public class NettyConfig { @Autowired private IMInitializer IMInitializer; @Bean public ServerBootstrap bootstrap () { ServerBootstrap b = new ServerBootstrap (); b.group(boss(), worker()) .channel(NioServerSocketChannel.class) .childHandler(IMInitializer) .option(ChannelOption.SO_BACKLOG, 2048 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); return b; } @Bean(name = "boss", destroyMethod = "shutdownGracefully") public NioEventLoopGroup boss () { return new NioEventLoopGroup (); } @Bean(name = "worker", destroyMethod = "shutdownGracefully") public NioEventLoopGroup worker () { return new NioEventLoopGroup (); } }
这里得说下**ChannelOption.SO_BACKLOG
**的概念,方便以后复习。
TCP协议中有三次握手的概念,其实在 TCP协议中有两个队列,叫半连接队列和全连接队列。客户端请求服务端时,第一次握手成功后,TCP 协议将这次请求加入到半连接队列中;当第三次握手成功后,TCP 协议将这次请求从半连接队列中移除并加入到全连接队列。
ChannelOption.SO_BACKLOG
代表这两个队列的容量之和。
IMInitializer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Component public class IMInitializer extends ChannelInitializer <SocketChannel> { @Value("${websocket.path}") private String websocketPath; @Autowired private IMHandler imHandler; @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new HttpObjectAggregator (65536 )); pipeline.addLast(new WebSocketServerProtocolHandler (websocketPath)); pipeline.addLast(imHandler); } }
IMHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 @Slf4j @Component @ChannelHandler .Sharablepublic class IMHandler extends SimpleChannelInboundHandler <TextWebSocketFrame> { public static ChannelGroup channels = new DefaultChannelGroup (GlobalEventExecutor.INSTANCE); public static Map<Channel,String> map = new ConcurrentHashMap <>(1024 ); @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame webSocketFrame) throws Exception { Channel incoming = ctx.channel(); String msg = webSocketFrame.text(); log.info("收到的消息: " +msg); MessageVo messageVo = JSONObject.parseObject(msg, MessageVo.class); map.put(incoming, messageVo.getName()); channels.writeAndFlush(new TextWebSocketFrame (msg)); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channels.add(channel); log.info(channel.remoteAddress() +"加入" ); } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String name = map.get(channel); map.remove(channel); MessageVo messageVo = new MessageVo (); messageVo.setTalkWords(name+"离开了聊天室" ); log.info(messageVo.getTalkWords()); String msg = JSONObject.toJSONString(messageVo); channels.writeAndFlush(new TextWebSocketFrame (msg)); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); log.info("Client:" +incoming.remoteAddress()+"在线" ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); log.info("Client:" +incoming.remoteAddress()+"掉线" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); log.info("Client:" +incoming.remoteAddress()+"异常:" ,cause); ctx.close(); } }
注意:IMHandler 添加了 @Sharable注解。在练习时,向管道中添加Handler 采用的是直接 new 的形式,这里与 Spring 整合,采用 @Compnent和@Sharable来实现。因为为了保证线程安全,每个 Channel 都应该有自己的 ChannelHandler,采用 new 的方式自然没问题。
与 Spring 整合后,Spring 默认是单例的,添加@Sharable 注解标识这个 Handler可以被多个 ChannelPipeline 线程安全地共享。
Netty支持 WebSocket 传输格式可以是二进制也可以是文本,这里选用 TextWebSocketFrame,只传文本。
MessageVo 前后端交互vo,使用 json 协议
1 2 3 4 5 6 7 8 9 10 11 @Getter @Setter public class MessageVo implements Serializable { private int type; private String name; private String talkWords; }
IMServer 服务类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j @Component public class IMServer { @Value("${netty.port}") private int port; @Autowired private ServerBootstrap b; @Autowired @Qualifier("boss") private NioEventLoopGroup boss; @Autowired() @Qualifier("worker") private NioEventLoopGroup worker; @PostConstruct public void run () throws Exception { try { ChannelFuture f = b.bind(port).sync(); log.info("IMServer 启动,端口:" +port); f.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
@PostConstruct注解是在 Servlet的init()初始化之前执行,执行这个方法后,Netty服务启动但会阻塞主线程,所以 tomcat 服务不会启动,但是在系统中集成的其他组件如 Mybatis 是可以使用的,如果想 Netty 和 Tomcat 服务都启动,可以利用 Application 启动类继承CommandLineRunner来实现。
前端页面展示
页面代码开发者模式能看到,这里就不贴了🙄
体验地址 http://58.87.67.162:99/chat.html