Node.js高并发调度:Redis资源队列与轮询实现
时间:2025-12-11 14:12:33 273浏览 收藏
在IT行业这个发展更新速度很快的行业,只有不停止的学习,才不会被行业所淘汰。如果你是文章学习者,那么本文《Node.js 高并发调度:Redis 实现资源队列与轮询》就很适合你!本篇内容主要包括##content_title##,希望对大家的知识积累有所帮助,助力实战开发!

本文探讨了在 Node.js 应用中如何高效管理并发资源访问,特别是当资源需要被用户独占一段时间时。针对传统单线程队列在处理不同资源类型时的局限性,文章提出了一种基于 Redis 的解耦式解决方案。通过将资源持有状态存储在 Redis 中并结合客户端轮询机制,取代了长时间阻塞的请求-响应模式,实现了更灵活、可扩展的任务调度,确保不同资源请求间的独立性,并有效处理资源过期与手动释放。
引言:并发资源访问的挑战
在 Node.js 应用中,管理对有限资源的并发访问是一个常见而复杂的挑战。例如,一个系统可能需要向用户分配特定“标题”(如“标题A”或“标题B”),并且每个标题在同一时间只能被一个用户持有。用户一旦获得标题,可以持有60秒或选择提前释放。理想情况下,如果用户请求的是不同的标题(例如,用户1请求标题A,用户3请求标题B),它们应该尽可能并行处理,互不影响,而只有当多个用户请求同一个标题时,才需要进行排队等待。
然而,传统的单线程或基于简单队列的实现往往难以满足这种精细化的并发控制需求。当一个全局队列被用于限制总并发量为1时,即使是针对不同资源的请求,也可能因为某个资源的长时间持有而被迫等待,这大大降低了系统的响应性和资源利用率。
现有 p-queue 方案的局限性分析
为了实现上述需求,一种常见的尝试是结合使用一个全局队列和多个资源特定队列。例如,使用 p-queue 库构建一个主队列(concurrency: 1)来确保应用整体只处理一个任务,同时为每个标题类型创建独立的队列(concurrency: 1)来管理特定标题的访问。
import PQueue from "p-queue";
// 主队列,确保应用一次只处理一个任务
const queue = new PQueue({ concurrency: 1 });
// 辅助函数,将任务添加到主队列
const addJobToQueue = async (cb, options) => {
await queue.add(cb, options);
};
enum Title {
A = "a",
B = "b"
}
// 为每个标题创建独立的队列
const titleRequestQueues = {
[Title.A]: new PQueue({ concurrency: 1 }),
[Title.B]: new PQueue({ concurrency: 1 }),
};
const requestTitle = async (title: Title, userId: string) => {
try {
await titleRequestQueues[title].add(
() => addJobToQueue(
() => addTitle(title, userId) // addTitle 内部包含长时间阻塞操作
)
);
} catch (error) {
// 错误处理
}
};
const addTitle = async (_title: Title, userId: string) => {
// 模拟耗时操作,例如从数据库获取标题
await fetchTitleForUser();
try {
// promptForCancel 模拟标题持有,会阻塞60秒或直到手动取消
await promptForCancel(60 * 1000);
// 标题被用户主动释放或时间到期
} catch (error) {
// 标题时间到期
}
};上述方案的问题在于,尽管为每个标题设置了独立队列,但最终所有任务都通过 addJobToQueue 被提交到主队列 queue。由于主队列的 concurrency 被设置为 1,这意味着任何一个任务(包括 addTitle 内部的 promptForCancel 模拟的60秒持有期)都会阻塞整个主队列,直到其完成。因此,如果用户1请求标题A并进入 promptForCancel 阶段,用户3请求标题B,它仍然必须等待用户1的整个60秒持有期结束,而非仅等待 fetchTitleForUser 的短暂执行时间。这与我们期望的不同标题请求可以并行处理的目标相悖。
解耦式架构:Redis 状态管理与轮询机制
为了克服上述局限性,我们需要一种更彻底的解耦方案,将资源访问的“持有”状态从 HTTP 请求的生命周期中分离出来,并通过一个共享的、高性能的状态存储来实现异步通知和管理。这里,Redis 是一个理想的选择,配合客户端轮询机制,可以构建一个高并发、可扩展的资源访问系统。
核心思想:
- 解耦请求与持有: 用户发出请求后,服务器不立即阻塞等待资源分配,而是将请求放入队列并立即响应。
- Redis 作为状态中心: 使用 Redis 存储每个标题的当前持有者、过期时间以及等待队列。
- 后台任务分配: 一个独立的后台进程或定时任务持续监控 Redis 中的状态,负责分配和回收标题。
- 客户端轮询: 用户端定期向服务器查询自己是否已获得标题的访问权限。
为何选择 Redis?
- 高性能键值存储: Redis 内存操作速度极快,适合作为共享状态的存储。
- 支持过期时间 (TTL): 可以为键设置自动过期时间,完美匹配资源持有时间限制。
- 原子操作: Redis 提供了 SETNX、列表操作等原子命令,有助于避免并发冲突。
- 数据结构丰富: 列表(List)可用于构建等待队列,字符串(String)用于存储当前持有者。
详细实现策略
以下是基于 Redis 和轮询机制的详细实现步骤及示例代码:
1. 请求入队 (Server-side, Fire-and-Forget)
当用户请求一个标题时,服务器不立即尝试获取资源,而是将用户 ID 加入对应标题的 Redis 列表(作为等待队列),然后立即向客户端返回一个“请求已接收”的响应。
// 假设 redisClient 已经通过 'redis' 库初始化并连接
const redisClient = require('redis').createClient();
redisClient.on('error', (err) => console.error('Redis Client Error', err));
(async () => {
await redisClient.connect();
console.log('Connected to Redis.');
})();
/**
* 用户请求标题,将请求加入Redis等待队列。
* @param {string} title - 请求的标题(例如 'A' 或 'B')。
* @param {string} userId - 发出请求的用户ID。
* @returns {object} - 立即返回请求状态。
*/
async function requestTitle(title: string, userId: string) {
const queueKey = `title:${title}:queue`;
await redisClient.rPush(queueKey, userId); // 将用户ID添加到队列尾部
console.log(`User ${userId} requested title ${title}, added to queue.`);
return { status: 'queued', message: '您的请求已进入队列,请稍后查询状态。' };
}
// 示例:在一个 HTTP 路由中调用
// app.post('/request-title', async (req, res) => {
// const { title, userId } = req.body;
// const result = await requestTitle(title, userId);
// res.json(result);
// });2. 资源分配与持有 (Server-side, Background Task)
一个独立的后台进程或定时任务负责监控所有标题的持有状态。它会定期检查 Redis 中每个标题的当前持有者。如果某个标题当前没有持有者或其持有时间已过期,它将从该标题的等待队列中取出下一个用户,并授予其访问权限。
/**
* 定期检查并分配标题访问权限。
* 这是一个后台任务,应持续运行。
*/
async function grantTitleAccess() {
const titles = ['a', 'b']; // 假设系统中有这些标题
const holdDurationSeconds = 60; // 标题持有时间
for (const title of titles) {
const currentHolderKey = `title:${title}:current_holder`;
const queueKey = `title:${title}:queue`;
const currentHolder = await redisClient.get(currentHolderKey);
if (!currentHolder) { // 如果当前没有持有者或持有已过期
const nextUserId = await redisClient.lPop(queueKey); // 从队列头部取出下一个用户
if (nextUserId) {
// 原子性地设置持有者和过期时间
// SETEX 命令确保设置值和过期时间是原子操作
await redisClient.setEx(currentHolderKey, holdDurationSeconds, nextUserId);
console.log(`Title ${title} granted to User ${nextUserId} for ${holdDurationSeconds} seconds.`);
// 可以在这里触发通知机制,例如通过 WebSocket 通知用户
}
}
}
}
// 每秒运行一次资源分配逻辑
setInterval(grantTitleAccess, 1000);
console.log('Background task for title granting started.');3. 用户轮询 (Client-side & Server-side)
客户端不再阻塞等待,而是定期向服务器发送请求,查询自己是否已获得某个标题的访问权限。服务器根据用户 ID 检查 Redis 中相应标题的 current_holder 键。
// Server-side endpoint for polling
// 假设使用 Express.js
// const express = require('express');
// const app = express();
/**
* 提供给客户端查询标题访问状态的API。
* @param {object} req - HTTP请求对象,包含 title 和 userId。
* @param {object} res - HTTP响应对象。
*/
// app.get('/check-access/:title/:userId', async (req, res) => {
// const { title, userId } = req.params;
// const currentHolderKey = `title:${title}:current_holder`;
// const currentHolder = await redisClient.get(currentHolderKey);
// if (currentHolder === userId) {
// const ttl = await redisClient.ttl(currentHolderKey); // 获取剩余过期时间
// return res.json({ hasAccess: true, title: title, userId: userId, expiresIn: ttl > 0 ? ttl : 0 });
// } else {
// // 还可以查询用户在队列中的位置,提供更友好的提示
// const queueKey = `title:${title}:queue`;
// const queueLength = await redisClient.lLen(queueKey);
// // 注意:lPos 是 Redis 6.0.6+ 版本才有的命令,用于查找元素位置
// // 对于旧版本Redis,可能需要获取整个列表并在应用层查找
// const position = await redisClient.lPos(queueKey, userId); // 获取用户在队列中的位置 (0-based)
// return res.json({ hasAccess: false, title: title, userId: userId, queuePosition: position !== null ? position + 1 : -1, totalInQueue: queueLength });
// }
// });
// Client-side polling (概念性示例)
/*
async function pollForTitle(title, userId) {
console.log(`User ${userId} started polling for title ${title}...`);
const intervalId = setInterval(async () => {
try {
const response = await fetch(`/check-access/${title}/${userId}`);
const data = await response.json();
if (data.hasAccess) {
console.log(`User ${userId} has gained access to title ${title}! Expires in ${data.expiresIn} seconds.`);
clearInterval(intervalId); // 停止轮询
// 在这里执行获得标题后的操作
} else if (data.queuePosition !== -1) {
console.log(`User ${userId} still waiting for title ${title}. You are #${data.queuePosition} in queue.`);
} else {
console.log(`User ${userId} is not in queue for title ${title}.`);
clearInterval(intervalId); // 如果不在到这里,我们也就讲完了《Node.js高并发调度:Redis资源队列与轮询实现》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
426 收藏
-
458 收藏
-
204 收藏
-
436 收藏
-
464 收藏
-
166 收藏
-
121 收藏
-
375 收藏
-
151 收藏
-
217 收藏
-
233 收藏
-
292 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 485次学习