基于redis乐观锁实现并发排队
来源:脚本之家
时间:2023-02-25 10:17:50 136浏览 收藏
大家好,我们又见面了啊~本文《基于redis乐观锁实现并发排队》的内容中将会涉及到redis乐观锁等等。如果你正在学习数据库相关知识,欢迎关注我,以后会给大家带来更多数据库相关文章,希望我们能一起进步!下面就开始本文的正式内容~
有个需求场景是这样的,使用redis控制scrapy运行的数量。当系统的后台设置为4时,只允许scapry启动4个任务,多余的任务则进行排队。
概况
最近做了一个django + scrapy + celery + redis 的爬虫系统,客户购买的主机除了跑其他程序外,还要跑我开发的这套程序,所以需要手动控制scrapy的实例数量,避免过多的爬虫给系统造成负担。
流程设计
1、爬虫任务由用户以请求的方式发起,所有的用户的请求统一进入到celery进行排队;
2、任务数量控制的执行就交给reids,经由celery保存到redis,包含了爬虫启动所需要的必要信息,从redis取一条信息即可启动一个爬虫;
3、通过scrapyd的接口来获取当前在运行的爬虫数量,以便决定下一步流程:如果小于4,则从redis中取相应数量的信息来启动爬虫,如果大于等于4,则继续等待;
4、如果在运行爬虫的数量有所减少,则及时从reids中取相应数量的信息来启动爬虫。
代码实现
业务代码有点复杂和啰嗦,此处使用伪代码来演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import redis # 实例化一个redis连接池 pool = redis.ConnectionPool(host = '127.0.0.1' , port = 6379 , decode_responses = True , db = 4 , password = '') r = redis.Redis(connection_pool = pool) # 爬虫实例限制为4 即只允许4个scrapy实例在运行 limited = 4 # 声明redis的乐观锁 lock = r.Lock() # lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码 if lock.acquire(): # 1、从reids中取一条爬虫信息 info = redis.get() # 2、while循环监听爬虫运行的数量 while True : req = requests.get( 'http://127.0.0.1:6800/daemonstatus.json' ).json() # 统计当前有多少个爬虫在运行 running = req.get( 'running' ) + req.get( 'pending' ) # 3、判断是否等待还是要增加爬虫数量 # 3.1 如果在运行的数量大于等于设置到量 则继续等待 if running > = limited: continue # 3.2 如果小于 则启动爬虫 start_scrapy(info) # 3.3 将info从redis中删除 redis.delete(info) # 3.4 释放锁 lock.release() break |
当前,这只是伪代码而已,实际的业务逻辑可能是非常复杂的,如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | @shared_task def scrapy_control(key_uuid): r = redis.Redis(connection_pool = pool) db = MysqlDB() speed_limited = db.fetch_config( 'REPTILE_SPEED' ) speed_limited = int (speed_limited[ 0 ]) keywords_num = MysqlDB().fetch_config( 'SEARCH_RANDOM' ) keywords_num = int (keywords_num[ 0 ]) # while True: lock = r.lock( 'lock' ) with open ( 'log/celery/info.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 进入处理环节' + '\n' ) try : # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中 if lock.acquire(): with open ( 'log/celery/info.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 获得锁' + '\n' ) # 1 从redis中获取信息 redis_obj = json.loads(r.get(key_uuid)) user_id = redis_obj.get( 'user_id' ) contents = redis_obj.get( 'contents' ) # 2 使用while循环处理核心逻辑 is_hold_print = True while True : req = requests.get( 'http://127.0.0.1:6800/daemonstatus.json' ).json() running = req.get( 'running' ) + req.get( 'pending' ) # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出 if running > = speed_limited: if is_hold_print: with open ( 'log/celery/info.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 爬虫在运行,线程等待中' + '\n' ) is_hold_print = False time.sleep( 1 ) continue # 4 有空余的爬虫位置 则往下走 # 4.1 处理完所有的内容后 释放锁 if len (contents) = = 0 : r.delete(key_uuid) with open ( 'log/celery/info.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 任务已完成,从redis中删除' + '\n' ) lock.release() with open ( 'log/celery/info.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 释放锁' + '\n' ) break # 4.2 创建task任务 task_uuid = str (uuid.uuid4()) article_obj = contents.pop() article_id = article_obj.get( 'article_id' ) article = article_obj.get( 'content' ) try : Task.objects.create( task_uuid = task_uuid, user_id = user_id, article_id = article_id, content = article ) except Exception as e: with open ( 'log/celery/error.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + '->' + str (task_uuid) + ' 创建Task出错: ' + str (e) + '\n' ) # finally: # 4.3 启动爬虫任务 即便创建task失败也会启动 try : task_chain(user_id, article, task_uuid, keywords_num) except Exception as e: with open ( 'log/celery/error.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 启动任务链失败: ' + str (e) + '\n' ) # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫 time.sleep( 5 ) except Exception as e: with open ( 'log/celery/error.log' , 'a' ) as f: f.write( str (datetime.datetime.now()) + '--' + str (key_uuid) + ' 获得锁之后的操作出错: ' + str (e) + '\n' ) lock.release() |
小坑
scrapy启动速度相对较慢,所以while循环中,代码中执行到了爬虫的启动,需要sleep一下再去通过scrapyd接口获取爬虫运行的数量,如果立刻读取,可能会造成误判。
文中关于redis的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《基于redis乐观锁实现并发排队》文章吧,也可关注golang学习网公众号了解相关技术文章。
-
265 收藏
-
274 收藏
-
327 收藏
-
240 收藏
-
471 收藏
-
316 收藏
-
407 收藏
-
170 收藏
-
190 收藏
-
170 收藏
-
111 收藏
-
342 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习