关于使用Redisson订阅数问题
来源:脚本之家
时间:2023-02-17 13:26:16 314浏览 收藏
本篇文章给大家分享《关于使用Redisson订阅数问题》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。
一、前提
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize
的大小不够,需要提高配置才能解决。
二、源码分析
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
1、RedissonLock#lock() 方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 尝试获取,如果ttl == null,则表示获取锁成功 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题 RFuturefuture = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 后面代码忽略 try { // 无限循环获取锁,直到获取锁成功 // ... } finally { // 取消订阅锁释放事件 unsubscribe(future, threadId); } }
总结下主要逻辑:
- 获取当前线程的线程id;
- tryAquire尝试获取锁,并返回ttl
- 如果ttl为空,则结束流程;否则进入后续逻辑;
- this.subscribe(threadId)订阅当前线程,返回一个RFuture;
- 如果在指定时间没有监听到,则会产生如上异常。
- 订阅成功后, 通过while(true)循环,一直尝试获取锁
- fially代码块,会解除订阅
所以上述这情况问题应该出现在subscribe()方法中
2、详细看下subscribe()方法
protected RFuturesubscribe(long threadId) { // entryName 格式:“id:name”; // channelName 格式:“redisson_lock__channel:name”; return pubSub.subscribe(getEntryName(), getChannelName()); }
RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { // .... this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) { this(config, id); this.config = cfg; // 初始化 initTimer(cfg); initSingleEntry(); } protected void initTimer(MasterSlaveServersConfig config) { int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()}; Arrays.sort(timeouts); int minTimeout = timeouts[0]; if (minTimeout % 100 != 0) { minTimeout = (minTimeout % 100) / 2; } else if (minTimeout == 100) { minTimeout = 50; } else { minTimeout = 100; } timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false); connectionWatcher = new IdleConnectionWatcher(this, config); // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例: subscribeService = new PublishSubscribeService(this, config); }
PublishSubscribeService构造函数
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this); public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(); this.connectionManager = connectionManager; this.config = config; for (int i = 0; i3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面
private final ConcurrentMapentries = new ConcurrentHashMap(); public RFuture subscribe(String entryName, String channelName) { // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量 // public AsyncSemaphore getSemaphore(ChannelName channelName) { // return locks[Math.abs(channelName.hashCode() % locks.length)]; // } AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); AtomicReference listenerHolder = new AtomicReference (); RPromise newPromise = new RedissonPromise () { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { // 如果存在RedissonLockEntry, 则直接利用已有的监听 E entry = entries.get(entryName); if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener (newPromise)); return; } E value = createEntry(newPromise); value.acquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener (newPromise)); return; } // 创建监听, RedisPubSubListener AsyncSemaphore#acquire()方法
public void acquire(Runnable listener) { acquire(listener, 1); } public void acquire(Runnable listener, int permits) { boolean run = false; synchronized (this) { // counter初始化值为1 if (counter梳理上述逻辑:
1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法
4、PublishSubscribeService#subscribe逻辑如下:
private final ConcurrentMapname2PubSubConnection = new ConcurrentHashMap(); private final Queue freePubSubConnections = new ConcurrentLinkedQueue(); public RFuture subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener>... listeners) { RPromise promise = new RedissonPromise (); // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。 subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners); return promise; } private void subscribe(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中 addListeners(channelName, promise, type, lock, connEntry, listeners); return; } // 没有时,才是最重要的逻辑 freePubSubLock.acquire(new Runnable() { @Override public void run() { if (promise.isDone()) { lock.release(); freePubSubLock.release(); return; } // 从队列中取头部元素 PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { // 第一次肯定是没有的需要建立 connect(codec, channelName, promise, type, lock, listeners); return; } // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。 int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } // 如果remainFreeAmount=0, 则从队列中移除 if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release(); // 增加监听 RFuture subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = freeEntry.psubscribe(codec, channelName); } else { future = freeEntry.subscribe(codec, channelName); } future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { if (!promise.isDone()) { subscribeFuture.cancel(false); } return; } connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { subscribeFuture.cancel(false); } }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); } }); } private void connect(Codec codec, ChannelName channelName, RPromise promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener>... listeners) { // 根据channelName计算出slot获取PubSubConnection int slot = connectionManager.calcSlot(channelName.getName()); RFuture connFuture = nextPubSubConnection(slot); promise.onComplete((res, e) -> { if (e != null) { ((RPromise ) connFuture).tryFailure(e); } }); connFuture.onComplete((conn, e) -> { if (e != null) { freePubSubLock.release(); lock.release(); promise.tryFailure(e); return; } // 这里会从配置中读取subscriptionsPerConnection PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); // 每获取一次,subscriptionsPerConnection就会减直到为0 int remainFreeAmount = entry.tryAcquire(); // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中 PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } if (remainFreeAmount > 0) { // 加入到队列中 freePubSubConnections.add(entry); } freePubSubLock.release(); RFuture subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); // 这里真正的进行订阅(底层与redis交互) ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = entry.psubscribe(codec, channelName); } else { future = entry.subscribe(codec, channelName); } future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { if (!promise.isDone()) { subscribeFuture.cancel(false); } return; } connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { subscribeFuture.cancel(false); } }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); }); } PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:
public int tryAcquire() { while (true) { int value = subscribedChannelsAmount.get(); if (value == 0) { return -1; } if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { return value - 1; } } }梳理上述逻辑:
1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是进行底层的subscribe和addListener3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
4、如果remainFreeAmount 5、最后也是进行底层的subscribe和addListener;三 总结
根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeService 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection它的默认值5时,remainFreeAmount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(future)中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
解决方法: 在初始化Redisson可以可指定这个配置项的值。
相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-settings
今天关于《关于使用Redisson订阅数问题》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
342 收藏
-
361 收藏
-
159 收藏
-
164 收藏
-
221 收藏
-
156 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习
-
- 开朗的手链
- 真优秀,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢作者大大分享技术文章!
- 2023-05-18 16:33:29
-
- 谨慎的蜡烛
- 这篇技术文章真是及时雨啊,好细啊,赞 👍👍,码起来,关注师傅了!希望师傅能多写数据库相关的文章。
- 2023-05-07 15:01:31
-
- 土豪的流沙
- 细节满满,码住,感谢大佬的这篇技术文章,我会继续支持!
- 2023-04-17 12:52:36
-
- 霸气的白羊
- 太细致了,mark,感谢老哥的这篇技术贴,我会继续支持!
- 2023-03-16 09:51:50
-
- 称心的楼房
- 很棒,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢作者分享技术文章!
- 2023-03-14 22:10:39
-
- 欢喜的八宝粥
- 这篇文章内容真及时,好细啊,受益颇多,码住,关注作者大大了!希望作者大大能多写数据库相关的文章。
- 2023-03-08 02:02:09
-
- 甜美的小土豆
- 这篇文章内容太及时了,up主加油!
- 2023-02-25 02:55:09