websocket+redis动态订阅和动态取消订阅的实现示例
来源:脚本之家
时间:2023-01-20 19:14:06 320浏览 收藏
怎么入门数据库编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《websocket+redis动态订阅和动态取消订阅的实现示例》,涉及到websocketredis、动态订阅,有需要的可以收藏一下
原理
websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。
订阅频道消息格式:
{ "cmd":"subscribe", "topic":[ "topic_name" ] }
模糊订阅格式
{ "cmd":"psubscribe", "topic":[ "topic_name" ] }
取消订阅格式
{ "cmd":"unsubscribe", "topic":[ "topic_name" ] }
两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。
redis订阅监听类
package com.curtain.core; import com.curtain.config.GetBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Arrays; /** * @Author Curtain * @Date 2021/6/7 14:27 * @Description */ @Component @Slf4j public class RedisPubSub extends JedisPubSub { private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class); private Jedis jedis; //订阅 public void subscribe(String... channels) { jedis = jedisPool.getResource(); try { jedis.subscribe(this, channels); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到异常后关闭连接重新订阅 log.info("监听遇到异常,四秒后重新订阅频道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } subscribe(channels); } } //模糊订阅 public void psubscribe(String... channels) { Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe(this, channels); } catch (ArithmeticException e) {//取消订阅故意造成的异常 if (jedis != null) jedis.close(); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到异常后关闭连接重新订阅 log.info("监听遇到异常,四秒后重新订阅频道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } psubscribe(channels); } } public void unsubscribeAndClose(String... channels){ unsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } public void punsubscribeAndClose(String... channels){ punsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } @Override public void onSubscribe(String channel, int subscribedChannels) { log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId()); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId()); } @Override public void onPMessage(String pattern, String channel, String message) { log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, pattern); WebSocketServer.publish(message, channel); } @Override public void onMessage(String channel, String message) { log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { log.info("unsubscribe redis channel:" + channel); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { log.info("punsubscribe redis channel:" + pattern); } }
1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。
webSocket订阅推送类
这个类会有两个ConcurrentHashMap
外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。
还有个ConcurrentHashMap
信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。
package com.curtain.core; import com.alibaba.fastjson.JSON; import com.curtain.config.WebsocketProperties; import com.curtain.service.Cancelable; import com.curtain.service.impl.TaskExecuteService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * @Author Curtain * @Date 2021/5/14 16:49 * @Description */ @ServerEndpoint("/ws") @Component @Slf4j public class WebSocketServer { /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static volatile ConcurrentHashMap> webSocketMap = new ConcurrentHashMap(); /** * 存放psub的事件 **/ private static volatile ConcurrentHashMap > pWebSocketMap = new ConcurrentHashMap(); /** * 存放topic(pattern)-对应的RedisPubsub */ private static volatile ConcurrentHashMap redisPubSubMap = new ConcurrentHashMap(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String sessionId = ""; //要注入的对象 private static TaskExecuteService executeService; private static WebsocketProperties properties; private Cancelable cancelable; @Autowired public void setTaskExecuteService(TaskExecuteService taskExecuteService) { WebSocketServer.executeService = taskExecuteService; } @Autowired public void setWebsocketProperties(WebsocketProperties properties) { WebSocketServer.properties = properties; } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { this.session = session; this.sessionId = session.getId(); //构造推送数据 Map pubHeader = new HashMap(); pubHeader.put("name", "connect_status"); pubHeader.put("type", "create"); pubHeader.put("from", "pubsub"); pubHeader.put("time", new Date().getTime() / 1000); Map pubPayload = new HashMap(); pubPayload.put("status", "success"); Map pubMap = new HashMap(); pubMap.put("header", pubHeader); pubMap.put("payload", pubPayload); sendMessage(JSON.toJSONString(pubMap)); cancelable = executeService.runPeriodly(() -> { try { if (cancelable != null && !session.isOpen()) { log.info("断开连接,停止发送ping"); cancelable.cancel(); } else { String data = "ping"; ByteBuffer payload = ByteBuffer.wrap(data.getBytes()); session.getBasicRemote().sendPing(payload); } } catch (IOException e) { e.printStackTrace(); } }, properties.getPeriod()); } @OnMessage public void onMessage(String message) { synchronized (session) { Map msgMap = (Map) JSON.parse(message); String cmd = (String) msgMap.get("cmd"); //订阅消息 if ("subscribe".equals(cmd)) { List topics = (List ) msgMap.get("topic"); //本地记录订阅信息 for (int i = 0; i map = new ConcurrentHashMap(); map.put(this.sessionId, this); webSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.subscribe(topic); }).start(); } log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic); log(); log.info("============================subscribe-end============================"); } } //psubscribe if ("psubscribe".equals(cmd)) { List topics = (List ) msgMap.get("topic"); //本地记录订阅信息 for (int i = 0; i map = new ConcurrentHashMap(); map.put(this.sessionId, this); pWebSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.psubscribe(topic); }).start(); } log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic); log(); log.info("============================psubscribe-end============================"); } } //取消订阅 if ("unsubscribe".equals(cmd)) { List topics = (List ) msgMap.get("topic"); //删除本地对应的订阅信息 for (String topic : topics) { log.info("============================unsubscribe-start============================"); log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic); if (webSocketMap.containsKey(topic)) { ConcurrentHashMap map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } if (pWebSocketMap.containsKey(topic)) { ConcurrentHashMap map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic); log(); log.info("============================unsubscribe-end============================"); } } } } @OnMessage public void onPong(PongMessage pongMessage) { try { log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { synchronized (session) { log.info("============================onclose-start============================"); //删除订阅 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //删除模糊订阅 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("sessionId:" + this.sessionId + ",断开连接:"); //debug log(); log.info("============================onclose-end============================"); } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { synchronized (session) { log.info("============================onError-start============================"); log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage()); error.printStackTrace(); log.info("关闭错误用户对应的连接"); //删除订阅 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap map = webSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //删除模糊订阅 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap map = pWebSocketMap.get(topic); map.remove(this.sessionId); if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info("完成错误用户对应的连接关闭"); //debug log(); log.info("============================onError-end============================"); } } /** * 实现服务器主动推送 */ public void sendMessage(String message) { synchronized (session) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } public static void publish(String msg, String topic) { ConcurrentHashMap map = webSocketMap.get(topic); if (map != null && map.values() != null) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } map = pWebSocketMap.get(topic); if (map != null && map.values() != null) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } } private void log() { log.info(">>>>>>>>>"); Iterator iterator1 = webSocketMap.keySet().iterator(); while (iterator1.hasNext()) { String topic = (String) iterator1.next(); log.info("topic:" + topic); Iterator iterator2 = webSocketMap.get(topic).keySet().iterator(); while (iterator2.hasNext()) { String session = (String) iterator2.next(); log.info("订阅" + topic + "的sessionId:" + session); } } log.info(">>>>>>>>>"); } }
项目地址
上面介绍了核心代码,下面是完整代码地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
Update20220415
参考评论区老哥的建议,将redis订阅监听类里面的subscribe和psubscribe方法调整如下:
//订阅 @Override public void subscribe(String... channels) { boolean done = true; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.subscribe(this, channels); done = false; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到异常后关闭连接重新订阅 log.info("监听遇到异常,四秒后重新订阅频道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } //模糊订阅 @Override public void psubscribe(String... channels) { boolean done = true; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe(this, channels); done = false; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null) jedis.close(); //遇到异常后关闭连接重新订阅 log.info("监听遇到异常,四秒后重新订阅频道:"); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep(4000); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } }
到这里,我们也就讲完了《websocket+redis动态订阅和动态取消订阅的实现示例》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于redis的知识点!
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
342 收藏
-
361 收藏
-
159 收藏
-
164 收藏
-
221 收藏
-
156 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习
-
- 微笑的长颈鹿
- 这篇技术贴真是及时雨啊,太细致了,写的不错,mark,关注大佬了!希望大佬能多写数据库相关的文章。
- 2023-02-24 13:21:58
-
- 冷傲的鞋子
- 太详细了,已加入收藏夹了,感谢楼主的这篇博文,我会继续支持!
- 2023-02-23 15:50:31
-
- 心灵美的方盒
- 感谢大佬分享,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢作者分享博文!
- 2023-02-20 12:32:43
-
- 还单身的魔镜
- 太全面了,mark,感谢老哥的这篇文章内容,我会继续支持!
- 2023-02-14 09:36:26
-
- 优美的热狗
- 太给力了,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢up主分享文章!
- 2023-02-07 13:44:04
-
- 小巧的丝袜
- 这篇技术文章真及时,师傅加油!
- 2023-02-06 07:30:55
-
- 老迟到的自行车
- 这篇博文真及时,好细啊,受益颇多,收藏了,关注作者大大了!希望作者大大能多写数据库相关的文章。
- 2023-01-23 02:37:14