SpringBoot+Netty+WebSocket实现消息发送的示例代码

一.导入Netty依赖

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.25.Final</version>
  </dependency>

二.搭建websocket服务器

@Component
public class WebSocketServer {

 /**
  * 主线程池
  */
 private EventLoopGroup bossGroup;
 /**
  * 工作线程池
  */
 private EventLoopGroup workerGroup;
 /**
  * 服务器
  */
 private ServerBootstrap server;
 /**
  * 回调
  */
 private ChannelFuture future;

 public void start() {
  future = server.bind(9001);
  System.out.println("netty server - 启动成功");
 }

 public WebSocketServer() {
  bossGroup = new NioEventLoopGroup();
  workerGroup = new NioEventLoopGroup();

  server = new ServerBootstrap();
  server.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new WebsocketInitializer());
 }
}

三.初始化Websocket

public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {

 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
  // ------------------
  // 用于支持Http协议
  // ------------------
  // websocket基于http协议,需要有http的编解码器
  pipeline.addLast(new HttpServerCodec());
  // 对写大数据流的支持
  pipeline.addLast(new ChunkedWriteHandler());
  // 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用
  //设置单次请求的文件的大小
  pipeline.addLast(new HttpObjectAggregator(1024 * 64));
  //webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws
  pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
  // 添加Netty空闲超时检查的支持
  // 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
  // 2. 写空闲超时
  // 3. 读写空闲超时
  pipeline.addLast(new IdleStateHandler(4, 8, 12));
  //添加心跳处理
  pipeline.addLast(new HearBeatHandler());
  // 添加自定义的handler
  pipeline.addLast(new ChatHandler());

 }
}

四.创建Netty监听器

@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {

 @Resource
 private WebSocketServer websocketServer;

 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
  if(event.getApplicationContext().getParent() == null) {
   try {
    websocketServer.start();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
}

五.建立消息通道

public class UserChannelMap {
 /**
  * 用户保存用户id与通道的Map对象
  */
// private static Map<String, Channel> userChannelMap;

 /* static {
  userChannelMap = new HashMap<String, Channel>();
 }*/

 /**
  * 定义一个channel组,管理所有的channel
  * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
  */
 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 /**
  * 存放用户与Chanel的对应信息,用于给指定用户发送消息
  */
 private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

 private UserChannelMap(){}
 /**
  * 添加用户id与channel的关联
  * @param userNum
  * @param channel
  */
 public static void put(String userNum, Channel channel) {
  userChannelMap.put(userNum, channel);
 }

 /**
  * 根据用户id移除用户id与channel的关联
  * @param userNum
  */
 public static void remove(String userNum) {
  userChannelMap.remove(userNum);
 }

 /**
  * 根据通道id移除用户与channel的关联
  * @param channelId 通道的id
  */
 public static void removeByChannelId(String channelId) {
  if(!StringUtils.isNotBlank(channelId)) {
   return;
  }
  for (String s : userChannelMap.keySet()) {
   Channel channel = userChannelMap.get(s);
   if(channelId.equals(channel.id().asLongText())) {
    System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
    userChannelMap.remove(s);
    UserService userService = SpringUtil.getBean(UserService.class);
    userService.logout(s);
    break;
   }
  }
 }

 /**
  * 打印所有的用户与通道的关联数据
  */
 public static void print() {
  for (String s : userChannelMap.keySet()) {
   System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
  }
 }

 /**
  * 根据好友id获取对应的通道
  * @param receiverNum 接收人编号
  * @return Netty通道
  */
 public static Channel get(String receiverNum) {
  return userChannelMap.get(receiverNum);
 }

 /**
  * 获取channel组
  * @return
  */
 public static ChannelGroup getChannelGroup() {
  return channelGroup;
 }

 /**
  * 获取用户channel map
  * @return
  */
 public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
  return userChannelMap;
 }
}

六.自定义消息类型

public class Message {
 /**
  * 消息类型
  */
 private Integer type;
 /**
  * 聊天消息
  */
 private String message;
 /**
  * 扩展消息字段
  */
 private Object ext;
 public Integer getType() {
  return type;
 }

 public void setType(Integer type) {
  this.type = type;
 }

 public MarketChatRecord getChatRecord() {
  return marketChatRecord;
 }
 public void setChatRecord(MarketChatRecord chatRecord) {
  this.marketChatRecord = chatRecord;
 }

 public Object getExt() {
  return ext;
 }

 public void setExt(Object ext) {
  this.ext = ext;
 }

 @Override
 public String toString() {
  return "Message{" +
    "type=" + type +
    ", marketChatRecord=" + marketChatRecord +
    ", ext=" + ext +
    '}';
 }

}

七.创建处理消息的handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

 /**
  * 用来保存所有的客户端连接
  */
 private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 /**
  *当Channel中有新的事件消息会自动调用
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  // 当接收到数据后会自动调用
  // 获取客户端发送过来的文本消息
  Gson gson = new Gson();
  log.info("服务器收到消息:{}",msg.text());
  System.out.println("接收到消息数据为:" + msg.text());
  Message message = gson.fromJson(msg.text(), Message.class);
//根据业务要求进行消息处理
  switch (message.getType()) {
   // 处理客户端连接的消息
   case 0:
    // 建立用户与通道的关联
   // 处理客户端发送好友消息
    break;
   case 1:
   // 处理客户端的签收消息
    break;
   case 2:
    // 将消息记录设置为已读
    break;
   case 3:
    // 接收心跳消息
    break;
   default:
    break;
  }

 }

 // 当有新的客户端连接服务器之后,会自动调用这个方法
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
  // 添加到channelGroup 通道组
  UserChannelMap.getChannelGroup().add(ctx.channel());
//  clients.add(ctx.channel());
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  log.info("{异常:}"+cause.getMessage());
  // 删除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  ctx.channel().close();
 }

 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
  //删除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  UserChannelMap.print();
 }

}

八.处理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter {

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if(evt instanceof IdleStateEvent) {
   IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

   if(idleStateEvent.state() == IdleState.READER_IDLE) {
    System.out.println("读空闲事件触发...");
   }
   else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
    System.out.println("写空闲事件触发...");
   }
   else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
    System.out.println("---------------");
    System.out.println("读写空闲事件触发");
    System.out.println("关闭通道资源");
    ctx.channel().close();
   }
  }
 }
}

搭建完成后调用测试

1.页面访问http://localhost:9001/ws
 2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。

到此这篇关于SpringBoot+Netty+WebSocket实现消息发送的示例代码的文章就介绍到这了,更多相关SpringBoot Netty WebSocket消息发送内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2020-09-17

SpringBoot+WebSocket+Netty实现消息推送的示例代码

上一篇文章讲了Netty的理论基础,这一篇讲一下Netty在项目中的应用场景之一:消息推送功能,可以满足给所有用户推送,也可以满足给指定某一个用户推送消息,创建的是SpringBoot项目,后台服务端使用Netty技术,前端页面使用WebSocket技术. 大概实现思路: 前端使用webSocket与服务端创建连接的时候,将用户ID传给服务端 服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中 如果需要给所有用户发送消息,直接执行channel组的writ

SpringBoot实现钉钉机器人消息推送的示例代码

零.前言 上一次做消息推送,是微信公众号的定时消息通知. 由于自己当时的水平不够,加上企鹅家的开发文档普遍不太友好,导致根本看不懂文档在写什么,不得不去看第三方博客来学习公众号的开发. 这次就不一样了,昨天刚看了一下,阿里的开发文档比鹅厂要清晰的多,而且在同一功能上,使用了多种语言作为示例代码,可以说很友好了.可能这就是阿里和鹅厂的区别吧...辣鸡文档和好文档的区别... 本着"授之以渔"的态度,写了这篇文章,作为官方文档的补充. 一.在群里添加机器人 在群设置的智能群助手中添加自定义

java实现钉钉机器人消息推送的示例代码

先建个钉钉群,并加好机器人 此时,机器人已经添加完毕,接下来编写我们连接机器人小哥的代码 import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; /** * @author yanghao * @version DingTalkTest.j

Java中websocket消息推送的实现代码

一.服务层 package com.demo.websocket; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.springframew

java WebSocket实现聊天消息推送功能

本文实例为大家分享了java WebSocket实现聊天消息推送功能的具体代码,供大家参考,具体内容如下 环境: JDK.1.7.0_51 apache-tomcat-7.0.53 java jar包:tomcat-coyote.jar.tomcat-juli.jar.websocket-api.jar ChatAnnotation消息发送类: import java.io.IOException; import java.util.HashMap; import java.util.Map;

基于ajax与msmq技术的消息推送功能实现代码

周末在家捣鼓了一下消息推送的简单例子,其实也没什么技术含量,欢迎大伙拍砖. 我设计的这个推送demo是基于ajax长轮询+msmq消息队列来实现的,具体交互过程如下图: 先说说这个ajax长轮询,多长时间才算长呢?这个还真不好界定. 这里是相对普通ajax请求来说的,通常处理一个请求也就是毫秒级别的时间.但是这里的长轮询方式 在ajax发送请求给服务器之后,服务器给调用端返回数据的时间多长那可还真不好说.嘿嘿,这关键要看 我们啥时候往msmq队列中推送数据了,先看看推送的效果图吧..... 抱歉

Android中利用App实现消息推送机制的代码

1.消息推送机制 服务器器端需要变被动为主动,通知客户一些开发商认为重要的信息,无论应用程序是否正在运行或者关闭. 我想到了一句话:don't call me,i will call you! qq今天在右下角弹出了一个对话框:"奥巴马宣布本拉登挂了...",正是如此. 自作聪明,就会带点小聪明,有人喜欢就有人讨厌. 2.独立进程 无论程序是否正在运行,我们都要能通知到客户,我们需要一个独立进程的后台服务. 我们需要一个独立进程的后台服务. 在androidmanifest.xml中注

Python编程实现微信企业号文本消息推送功能示例

本文实例讲述了Python微信企业号文本消息推送功能.分享给大家供大家参考,具体如下: 企业号的创建.企业号应用的创建.组.tag.part就不赘述了,一搜一大堆,但是网上拿的那些个脚本好多都不好使,所以自己修了一个 坦率的讲,这个脚本是用来作为zabbix的通知媒介脚本的,本人是个菜鸟,如果哪里不对,大神们不要笑话,python也处于学习阶段,如果有哪些地方不合理,很希望可以不吝赐教,废话不多说,脚本奉上: #!/usr/bin/python # _*_coding:utf-8 _*_ imp

微信小程序模板消息限制实现无限制主动推送的示例代码

需求背景 基于微信的通知渠道,微信小程序为开发者提供了可以高效触达用户的模板消息能力,在用户本人与小程序页面有交互行为后触发,通过微信聊天列表中的服务通知可快捷进入查看消息,点击查看详情还能跳转到下发消息的小程序的指定页面. 微信小程序允许下发模板消息的条件分为两类:支付或者提交表单.通过提交表单来下发模板消息的限制为"允许开发者向用户在7天内推送有限条数的模板消息(1次提交表单可下发1条,多次提交下条数独立,相互不影响)". 然而,用户1次触发7天内推送1条通知是明显不够用的.比如,

php实现微信模板消息推送

本文实例为大家分享了php微信模板消息推送的具体代码,供大家参考,具体内容如下 1.微信公众号模板消息配置 2.PHP代码 /** * 发送模板消息 */ public function send_notice(){ //获取access_token if ($_COOKIE['access_token']){ $access_token2=$_COOKIE['access_token']; }else{ $json_token=$this>curl_post("https://api.w