Netty+WebSocket实现简单网页群聊
这两天看了下WebSocket的RFC文档,对WebSocket协议有了基本的认识,顺便写了篇博客做点笔记 WebSocket 协议。
例子说明:每个网页一个websocket连接,点发送消息后,消息会发送给除了自己之外的其它在线的websocket客户端,简单实现群聊
服务端
采用Netty实现,Netty版本是4.1.2.Final.
服务端共有以下4个类:
WebSocketServer实现IHttpService和IWebSocketService,WebSocketServerHandler持有IHttpService和 IWebSocketService的引用,若收到FullHttpRequest则交给IHttpService其处理,若收到WebSocketFrame则交给IWebSocketService去处理。
IHttpService.java
- package cc.lixiaohui.demo.netty4.websocket;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.FullHttpRequest;
- /**
- * @author lixiaohui
- * @date 2016年9月24日 下午3:58:31
- *
- */
- public interface IHttpService {
- void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request);
- }
IWebSocketService.java
- package cc.lixiaohui.demo.netty4.websocket;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- /**
- * @author lixiaohui
- * @date 2016年9月24日 下午3:46:07
- *
- */
- public interface IWebSocketService {
- void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame);
- }
WebSocketServerHandler.java
- package cc.lixiaohui.demo.netty4.websocket;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author lixiaohui
- * @date 2016年9月24日 下午2:22:33
- *
- */
- public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
- private IWebSocketService websocketService;
- private IHttpService httpService;
- public WebSocketServerHandler(IWebSocketService websocketService, IHttpService httpService) {
- super();
- this.websocketService = websocketService;
- this.httpService = httpService;
- }
- /*
- * @see
- * io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel
- * .ChannelHandlerContext, java.lang.Object)
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest) {
- httpService.handleHttpRequest(ctx, (FullHttpRequest) msg);
- } else if (msg instanceof WebSocketFrame) {
- websocketService.handleFrame(ctx, (WebSocketFrame) msg);
- }
- }
- /*
- * @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext)
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- }
WebSocketServer.java
- package cc.lixiaohui.demo.netty4.websocket;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelId;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.FullHttpRequest;
- import io.netty.handler.codec.http.HttpHeaderNames;
- import io.netty.handler.codec.http.HttpHeaders;
- import io.netty.handler.codec.http.HttpMethod;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketFrame;
- import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
- import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import io.netty.util.AttributeKey;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * @author lixiaohui
- * @date 2016年9月24日 下午2:08:58
- *
- */
- public class WebSocketServer implements IWebSocketService, IHttpService {
- public static void main(String[] args) {
- new WebSocketServer(9999).start();
- }
- // ----------------------------static fields -----------------------------
- private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
- private static final String HN_HTTP_CODEC = "HN_HTTP_CODEC";
- private static final String HN_HTTP_AGGREGATOR = "HN_HTTP_AGGREGATOR";
- private static final String HN_HTTP_CHUNK = "HN_HTTP_CHUNK";
- private static final String HN_SERVER = "HN_LOGIC";
- // handshaker attachment key
- private static final AttributeKey<WebSocketServerHandshaker> ATTR_HANDSHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNELID");
- private static final int MAX_CONTENT_LENGTH = 65536;
- private static final String WEBSOCKET_UPGRADE = "websocket";
- private static final String WEBSOCKET_CONNECTION = "Upgrade";
- private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d";
- // ------------------------ member fields -----------------------
- private String host; // 绑定的地址
- private int port; // 绑定的端口
- /**
- * 保存所有WebSocket连接
- */
- private Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<ChannelId, Channel>();
- private final String WEBSOCKET_URI_ROOT;
- public WebSocketServer(int port) {
- this("localhost", port);
- }
- public WebSocketServer(String host, int port) {
- this.host = host;
- this.port = port;
- WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port);
- }
- public void start() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup);
- b.channel(NioServerSocketChannel.class);
- b.childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ChannelPipeline pl = ch.pipeline();
- // 保存该Channel的引用
- channelMap.put(ch.id(), ch);
- logger.info("new channel {}", ch);
- ch.closeFuture().addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("channel close {}", future.channel());
- // Channel 关闭后不再引用该Channel
- channelMap.remove(future.channel().id());
- }
- });
- pl.addLast(HN_HTTP_CODEC, new HttpServerCodec());
- pl.addLast(HN_HTTP_AGGREGATOR, new HttpObjectAggregator(MAX_CONTENT_LENGTH));
- pl.addLast(HN_HTTP_CHUNK, new ChunkedWriteHandler());
- pl.addLast(HN_SERVER, new WebSocketServerHandler(WebSocketServer.this, WebSocketServer.this));
- }
- });
- try {
- // 绑定端口
- ChannelFuture future = b.bind(host, port).addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.info("websocket started.");
- }
- }
- }).sync();
- future.channel().closeFuture().addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("server channel {} closed.", future.channel());
- }
- }).sync();
- } catch (InterruptedException e) {
- logger.error(e.toString());
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- logger.info("websocket server shutdown");
- }
- /*
- * @see cc.lixiaohui.demo.netty4.websocket.IHttpService#handleHttpRequest(io.netty.handler.codec.http.FullHttpRequest)
- */
- public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
- if (isWebSocketUpgrade(req)) { // 该请求是不是websocket upgrade请求
- logger.info("upgrade to websocket protocol");
- String subProtocols = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);
- WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT, subProtocols, false);
- WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
- if (handshaker == null) {// 请求头不合法, 导致handshaker没创建成功
- WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
- } else {
- // 响应该请求
- handshaker.handshake(ctx.channel(), req);
- // 把handshaker 绑定给Channel, 以便后面关闭连接用
- ctx.channel().attr(ATTR_HANDSHAKER).set(handshaker);// attach handshaker to this channel
- }
- return;
- }
- // TODO 忽略普通http请求
- logger.info("ignoring normal http request");
- }
- /*
- * @see
- * cc.lixiaohui.demo.netty4.websocket.IWebSocketService#handleFrame(io.netty
- * .channel.Channel, io.netty.handler.codec.http.websocketx.WebSocketFrame)
- */
- public void handleFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
- // text frame
- if (frame instanceof TextWebSocketFrame) {
- String text = ((TextWebSocketFrame) frame).text();
- TextWebSocketFrame rspFrame = new TextWebSocketFrame(text);
- logger.info("recieve TextWebSocketFrame from channel {}", ctx.channel());
- // 发给其他所有channel
- for (Channel ch : channelMap.values()) {
- if (ctx.channel().equals(ch)) {
- continue;
- }
- ch.writeAndFlush(rspFrame);
- logger.info("write text[{}] to channel {}", text, ch);
- }
- return;
- }
- // ping frame, 回复pong frame即可
- if (frame instanceof PingWebSocketFrame) {
- logger.info("recieve PingWebSocketFrame from channel {}", ctx.channel());
- ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
- return;
- }
- if (frame instanceof PongWebSocketFrame) {
- logger.info("recieve PongWebSocketFrame from channel {}", ctx.channel());
- return;
- }
- // close frame,
- if (frame instanceof CloseWebSocketFrame) {
- logger.info("recieve CloseWebSocketFrame from channel {}", ctx.channel());
- WebSocketServerHandshaker handshaker = ctx.channel().attr(ATTR_HANDSHAKER).get();
- if (handshaker == null) {
- logger.error("channel {} have no HandShaker", ctx.channel());
- return;
- }
- handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
- return;
- }
- // 剩下的是binary frame, 忽略
- logger.warn("unhandle binary frame from channel {}", ctx.channel());
- }
- //三者与:1.GET? 2.Upgrade头 包含websocket字符串? 3.Connection头 包含 Upgrade字符串?
- private boolean isWebSocketUpgrade(FullHttpRequest req) {
- HttpHeaders headers = req.headers();
- return req.method().equals(HttpMethod.GET)
- && headers.get(HttpHeaderNames.UPGRADE).contains(WEBSOCKET_UPGRADE)
- && headers.get(HttpHeaderNames.CONNECTION).contains(WEBSOCKET_CONNECTION);
- }
- }
客户端
客户端采用浏览器,代码:
- <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
- <html xmlns="http://www.w3.org/1999/xhtml">
- <head>
- <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
- <title></title>
- </head>
- </head>
- <script type="text/javascript">
- var socket;
- if(!window.WebSocket){
- window.WebSocket = window.MozWebSocket;
- }
- if(window.WebSocket){
- socket = new WebSocket("ws://localhost:9999");
- socket.onmessage = function(event){
- appendln("接收:" + event.data);
- };
- socket.onopen = function(event){
- appendln("WebSocket 连接已建立");
- };
- socket.onclose = function(event){
- appendln("WebSocket 连接已关闭");
- };
- }else{
- alert("浏览器不支持WebSocket协议");
- }
- function send(message){
- if(!window.WebSocket){return;}
- if(socket.readyState == WebSocket.OPEN){
- socket.send(message);
- appendln("发送:" + message);
- }else{
- alert("WebSocket连接建立失败");
- }
- }
- function appendln(text) {
- var ta = document.getElementById('responseText');
- ta.value += text + "\r\n";
- }
- function clear() {
- var ta = document.getElementById('responseText');
- ta.value = "";
- }
- </script>
- <body>
- <form onSubmit="return false;">
- <input type = "text" name="message" value="你好啊"/>
- <br/><br/>
- <input type="button" value="发送 WebSocket 请求消息" onClick="send(this.form.message.value)"/>
- <hr/>
- <h3>服务端返回的应答消息</h3>
- <textarea id="responseText" style="width: 800px;height: 300px;"></textarea>
- </form>
- </body>
- </html>
测试
打开客户端,可以看到能接收到其他客户端发的消息
网页A:
网页B:
相关推荐
实现了好友申请、好友分组、好友聊天、群管理、群公告、用户群聊等功能。 项目技术栈 后端技术栈 Spring Boot netty nio WebSocket MyBatis Spring Data JPA Redis MySQL Spring Session Alibaba Druid Gradle 前端...
基于LayIM、Netty、Spring Boot 实现的在线聊天系统,web网络开发,可内嵌于自己的B/S系统或进行二次迭代 本项目是基于Netty实现的一个实时通讯系统,前端使用了LayIM组件。 技术栈 Spring Boot、Spring MVC、...
Netty 聊天 小例子 非常适合初学者
Netty作为之前及现在不断学习Netty道路上持续集成项目Netty心跳实现客户端及服务端聊天实现完成Netty回声服务器使用WebSocket实现点对点聊天功能WebSocket实现群聊功能及上下线提醒增加Netty UDP协议实现使用第三方...
该资源中包含了bio、nio、netty的一些常用案例,还有一些Reactor模型的案例。例如:群聊功能、WebSocket长连接的群聊,以及tcp在传输过程中的粘包、拆包问题模拟与解决。
jdk bio,nio,aio各种使用案例,深入理解netty,结合源码以及文章分析: jdk原生nio的缓冲区使用 jdk原生的nio channel使用 jdk原生的nio网络编程 jdk原生的React器编程模型(使用选择器)聊天室 零拷贝使用案例...
模仿微信做的通讯软件,主要功能:一对一聊天、群聊天、添加好友、扫一扫添加好友、创建群聊、管理群成员、聊天支持文字,图片和语音、朋友圈功能,可发动文字,图片或图文形式的动态,好友可见
实现功能:访微信聊天项目实战,用户登录、注册以及第三方微信登录,用户头像上传,添加好友,好友列表,新朋友通知与发现,发送邮箱及反馈,群聊分组,论坛设计,朋友圈发布,登录多个用户登录挤下线,单聊/群聊,...
金刚涉及技术:netty,springMVC, ,vuejs,jquery, ,redis,mysql等。开源免费功能点单聊聊天发送文件和图片О版版本金刚2.0离线消息推送群聊在线状态的实时同步修改签名查看聊天记录好友管理好友分组管理群组...
开发模块单聊,群聊LayIm实时通讯开发这可以先通简单的单聊开始,配合doc目录下的X-IM及时通信项目开发文档-version1.0.docx快速了解X-IM的项目的数据流程。熟悉整个流程后,读者可以根据自己的业务需求快速完成自己...
基于netty结合Spring Boot搭建的一个简单WebSocket聊天室Demo,WebSocket消息路由基于个人设计的策略模式,由于前端不是专业的所以比较简陋。 快速开始 运行 ChatApplication 打开聊天窗口 私聊:打开两个chatroom....
PingPangChat是一个基于netty的websocket即时聊天程序。支持文字、表情包、图片发送接收功能,以及消息提醒(聊天回显)。 聊天支持:单聊(用户列表实时刷新)、群聊(没自定义)。
特提斯 Tethys是采用开发的IM服务器,致力于服务端响应式技术开发推广与研究,可快速构建功能完备,高效且可定制化的IM服务。 特色 免费的 高效 可靠的 功能 用户认证 私人聊天 ...Tethys的实现离不开源社
PingPangChat是一个基于netty的websocket即时聊天程序。支持文字、表情包、图片发送接收功能,以及消息提醒(聊天回显)。 聊天支持:单聊(用户列表实时刷新)、群聊(没自定义)。 PingPangChat软件架构: 聊天...
简单快捷的IM方案,快速打造在线IM,可用于公司内网、外网通讯,客服系统等,实现了socket,websocket,能和安卓、IOS应用结合使用 Java后端和js消息采用Google Protobuf传输,如需修改protobuf文件请参考当前文档 ...
提供简单快捷的IM方案,快速打造在线IM,可用于公司内网、外网通讯,客服系统等,实现了socket,websocket,能和安卓、IOS应用结合使用,可用于任何商业、个人作品中,请保留作者信息,如果项目帮到了您请加个星,...
简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 ...
简介QIQIIM 提供简单快捷的IM方案,快速打造在线IM,可用于公司内网、外网通讯,客服系统等,实现了socket,websocket,能和安卓、IOS应用结合使用,可用于任何商业、个人作品中,请保留作者信息,如果项目帮到了您请...