SpringBoot简单整合Netty实现一个聊天室

SpringBoot简单整合Netty实现一个聊天室

薛定谔的汪

今天花了点时间研究下 SpringBoot 整合 Netty,因为现在公司用的框架就有 SpringBoot,感觉我现在依赖上了 SpringBoot…….

在 SpringBoot整合 Netty 的基础上实现一个聊天室这么个小东西。废话不多说直接开始。

Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>

application.yml

1
2
3
4
5
netty:
port: 7788
websocket:
#websocket 路径,采用 websocket 协议实现即时通信
path: /ws

NettyConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class NettyConfig {

@Autowired
private IMInitializer IMInitializer;

@Bean
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(boss(), worker())
.channel(NioServerSocketChannel.class)
.childHandler(IMInitializer)
.option(ChannelOption.SO_BACKLOG, 2048)
.childOption(ChannelOption.SO_KEEPALIVE, true);
return b;
}

@Bean(name = "boss", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup boss() {
return new NioEventLoopGroup();
}

@Bean(name = "worker", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup worker() {
return new NioEventLoopGroup();
}
}

这里得说下**ChannelOption.SO_BACKLOG**的概念,方便以后复习。

TCP协议中有三次握手的概念,其实在 TCP协议中有两个队列,叫半连接队列和全连接队列。客户端请求服务端时,第一次握手成功后,TCP 协议将这次请求加入到半连接队列中;当第三次握手成功后,TCP 协议将这次请求从半连接队列中移除并加入到全连接队列。

ChannelOption.SO_BACKLOG代表这两个队列的容量之和。

IMInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class IMInitializer extends ChannelInitializer<SocketChannel> {

@Value("${websocket.path}")
private String websocketPath;

@Autowired
private IMHandler imHandler;

@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//将 Http 请求封装为 FullHttpRequest
pipeline.addLast(new HttpServerCodec());
//聚集多个消息转换为 Http请求或相应
pipeline.addLast(new HttpObjectAggregator(65536));
// ChunkedWriteHandler主要用于处理大文件流,这里不需要
// pipeline.addLast(new ChunkedWriteHandler());
//支持 WebSocket 协议的 Handler
pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath));
pipeline.addLast(imHandler);

}
}

IMHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
@Component
@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

//@Autowired
//private TestService testService;

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

public static Map<Channel,String> map = new ConcurrentHashMap<>(1024);

@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame webSocketFrame) throws Exception {
Channel incoming = ctx.channel();
String msg = webSocketFrame.text();
log.info("收到的消息: "+msg);
MessageVo messageVo = JSONObject.parseObject(msg, MessageVo.class);
map.put(incoming, messageVo.getName());
channels.writeAndFlush(new TextWebSocketFrame(msg));
//整合 Mybatis 可以记录日志 异步地执行
//testService.log();
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.add(channel);
log.info(channel.remoteAddress() +"加入");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();

String name = map.get(channel);
map.remove(channel);
MessageVo messageVo = new MessageVo();
messageVo.setTalkWords(name+"离开了聊天室");
log.info(messageVo.getTalkWords());
String msg = JSONObject.toJSONString(messageVo);
channels.writeAndFlush(new TextWebSocketFrame(msg));
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
log.info("Client:"+incoming.remoteAddress()+"在线");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
log.info("Client:"+incoming.remoteAddress()+"掉线");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
log.info("Client:"+incoming.remoteAddress()+"异常:",cause);
// 当出现异常就关闭连接
ctx.close();
}

}

注意:IMHandler 添加了 @Sharable注解。在练习时,向管道中添加Handler 采用的是直接 new 的形式,这里与 Spring 整合,采用 @Compnent和@Sharable来实现。因为为了保证线程安全,每个 Channel 都应该有自己的 ChannelHandler,采用 new 的方式自然没问题。

与 Spring 整合后,Spring 默认是单例的,添加@Sharable 注解标识这个 Handler可以被多个 ChannelPipeline 线程安全地共享。

Netty支持 WebSocket 传输格式可以是二进制也可以是文本,这里选用 TextWebSocketFrame,只传文本。

MessageVo

前后端交互vo,使用 json 协议

1
2
3
4
5
6
7
8
9
10
11
@Getter
@Setter
public class MessageVo implements Serializable {

//0-提示信息 1-在线人数
private int type;
//名称
private String name;
//消息内容
private String talkWords;
}

IMServer 服务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Component
public class IMServer {

@Value("${netty.port}")
private int port;

@Autowired
private ServerBootstrap b;

@Autowired
@Qualifier("boss")
private NioEventLoopGroup boss;

@Autowired()
@Qualifier("worker")
private NioEventLoopGroup worker;

@PostConstruct
public void run() throws Exception {
try {
ChannelFuture f = b.bind(port).sync();
log.info("IMServer 启动,端口:"+port);
//阻塞等待服务监听端口关闭
f.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}

@PostConstruct注解是在 Servlet的init()初始化之前执行,执行这个方法后,Netty服务启动但会阻塞主线程,所以 tomcat 服务不会启动,但是在系统中集成的其他组件如 Mybatis 是可以使用的,如果想 Netty 和 Tomcat 服务都启动,可以利用 Application 启动类继承CommandLineRunner来实现。

前端页面展示

页面代码开发者模式能看到,这里就不贴了🙄

体验地址

http://58.87.67.162:99/chat.html

  • Title: SpringBoot简单整合Netty实现一个聊天室
  • Author: 薛定谔的汪
  • Created at : 2018-09-15 18:01:54
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/09/15/netty/springboot-netty/
  • License: This work is licensed under CC BY-NC-SA 4.0.