基于Redis实现阻塞队列的方式
来源:脚本之家
时间:2023-01-07 11:59:15 188浏览 收藏
IT行业相对于一般传统行业,发展更新速度更快,一旦停止了学习,很快就会被行业所淘汰。所以我们需要踏踏实实的不断学习,精进自己的技术,尤其是初学者。今天golang学习网给大家整理了《基于Redis实现阻塞队列的方式》,聊聊Redis阻塞队列,我们一起来看看吧!
日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。
这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。
也可以使用基于 JVM 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失。
大部分的项目都会用到 Redis 中间件作为缓存使用,此时使用 Redis 的 list 结构来实现队列则是非常合适的选择。
因此,本文主要讲解基于 Redis 的方式实现异步队列。
本文首发个人技术博客: https://nullpointer.pw/redis-block-queue.html
基于 Redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用LPUSH
生产消息,然后 while(true) 中通过RPOP
消费消息,这种方式的确可以实现,但是不断代码不断的轮询,势必会消耗一些系统的资源。
第二种方式也是不推荐的方式,也是通过 LPUSH
生产消息,然后通过 BRPOP
进行阻塞地等待并消费消息,这种方式较第一种方式减少了无用的轮询,降低系统资源的消耗,但是可能会存在队列消息丢失的情况,如果取出了消息然后处理失败,这个被取出的消息就将丢失。
第二种方式就是下文要介绍的方式,首先也是通过 LPUSH
生产消息,然后通过 BRPOPLPUSH
阻塞地等待 list 新消息到来,有了新消息才开始消费,同时将消息备份到另外一个 list 当中,这种方式具备了第二种方式的优点,即减少了无用的轮询,同时也对消息进行了备份不会丢失数据,如果处理成功,可以通过 LREM
对备份的 list 中当前的这条消息进行删除处理。这种方式实现方式可以参考 模式: 安全的队列 .
Redis 基础
# 将一个或多个值 value 插入到列表 key 的表头 LPUSH key value [value …] # 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。 BRPOPLPUSH source destination timeout # 根据参数 count 的值,移除列表中与参数 value 相等的元素。 LREM key count value
代码实现队列消息生产者
笔者使用的是 Spring 相关 API 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。
@Resource private RedisConnectionFactory connectionFactory; public void lPush(@Nonnull String key, @Nonnull String value) { RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory); try { byte[] byteKey = RedisSerializer.string().serialize(getKey(key)); byte[] byteValue = RedisSerializer.string().serialize(value); assert byteKey != null; connection.lPush(byteKey, byteValue); } finally { RedisConnectionUtils.releaseConnection(connection, connectionFactory); } }
代码实现队列消息消费者
因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 Java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。
有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 BRPOPLPUSH
设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 QueryTimeoutException 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。
代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。
public void bRPopLPush(@Nonnull String key, Consumerconsumer) { CompletableFuture.runAsync(() -> { RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory); try { byte[] srcKey = RedisSerializer.string().serialize(getKey(key)); byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key)); assert srcKey != null; assert dstKey != null; while (true) { byte[] byteValue = new byte[0]; boolean success = false; try { byteValue = connection.bRPopLPush(0, srcKey, dstKey); if (byteValue != null && byteValue.length != 0) { consumer.accept(new String(byteValue)); success = true; } } catch (Exception ignored) { // 防止获取 key 达到超时时间抛出 QueryTimeoutException 异常退出 } finally { if (success) { // 处理成功才删除备份队列的 key connection.lRem(dstKey, 1, byteValue); } } } } finally { RedisConnectionUtils.releaseConnection(connection, connectionFactory); } }); }
测试代码
@Test public void testLPush() throws InterruptedException { String queueA = "queueA"; int i = 0; while (true) { String msg = "Hello-" + i++; redisBlockQueue.lPush(queueA, msg); System.out.println("lPush: " + msg); Thread.sleep(3000); } } @Test public void testBRPopLPush() { String queueA = "queueA"; redisBlockQueue.bRPopLPush(queueA, (val) -> { // 在这里处理具体的业务逻辑 System.out.println("val: " + val); }); // 防止 Junit 进程退出 LockSupport.park(); }
项目使用方式
为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可,
队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费
@Resource private RedisBlockQueue redisBlockQueue; @PostConstruct public void init() { redisBlockQueue.bRPopLPush(xx, (value) -> { //... }); }
本文完整代码下载github 地址
到这里,我们也就讲完了《基于Redis实现阻塞队列的方式》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于redis的知识点!
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
342 收藏
-
361 收藏
-
159 收藏
-
164 收藏
-
221 收藏
-
156 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习
-
- 舒服的小懒猪
- 赞 👍👍,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢up主分享博文!
- 2023-02-20 06:39:07
-
- 单薄的毛衣
- 细节满满,码住,感谢楼主的这篇技术贴,我会继续支持!
- 2023-02-06 09:13:39
-
- 失眠的冬日
- 这篇文章出现的刚刚好,太详细了,真优秀,已加入收藏夹了,关注up主了!希望up主能多写数据库相关的文章。
- 2023-01-21 20:47:49
-
- 过时的吐司
- 这篇技术文章太及时了,太全面了,太给力了,mark,关注作者了!希望作者能多写数据库相关的文章。
- 2023-01-19 18:50:19
-
- 安静的泥猴桃
- 这篇文章真是及时雨啊,太详细了,受益颇多,收藏了,关注大佬了!希望大佬能多写数据库相关的文章。
- 2023-01-19 05:37:56
-
- 精明的龙猫
- 感谢大佬分享,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢师傅分享技术贴!
- 2023-01-12 21:45:10
-
- 冷艳的小懒虫
- 太给力了,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢作者分享技术贴!
- 2023-01-08 23:58:58