登录
首页 >  文章 >  linux

【Linux】进程池详解:进程间通信秘诀

时间:2025-04-21 18:55:08 115浏览 收藏

本文详细讲解了Linux进程间通信中的进程池技术。进程池通过预先创建多个子进程来处理并发任务,有效减少了频繁创建和销毁进程的开销,从而提升性能、降低资源消耗,并加快任务响应速度。文章通过代码示例,演示了如何使用管道实现进程间通信,以及如何封装进程池类、任务类和管道类来管理进程和任务。 文中包含了完整的代码、头文件和Makefile,并详细解释了进程池的初始化、任务分发、执行和清理过程,为读者理解和应用进程池提供了全面指导,尤其适用于需要处理CPU密集型任务的场景,如Web爬虫和数据处理等。

进程池(Process Pool)是一种管理进程的技术,通过预先创建一定数量的进程来减少频繁创建和销毁进程的开销。这种方法特别适用于需要并发执行大量任务的场景,尤其是在处理CPU密集型任务时。

【Linux】进程间通信——进程池

上图展示了进程池模型,其中父进程(master进程)通过创建多个子进程(worker或slaver进程)并通过管道连接,向各个子进程派发任务。

进程池的主要作用包括:

  • 提高性能:通过预先创建进程,减少了频繁创建和销毁进程的开销,提升整体执行效率。
  • 减少系统资源消耗:避免了频繁创建和销毁进程导致的资源浪费,进程池通过复用已有进程,节省了资源。
  • 提升任务响应速度:由于进程池中的进程是预先创建的,可以快速分配空闲进程处理任务。
  • 更好的资源管理:进程池可以限制系统中的最大并发进程数,保护系统资源,避免过载。
  • 并行处理:对于CPU密集型任务,进程池通过并行化处理多个任务,显著提升处理效率。

进程池适用于大规模并发任务的处理,如Web爬虫、数据处理和大规模计算等场景。

代码模拟进程池管道信息

首先,我们通过控制创建子进程的数量来实现进程池。可以通过main函数的参数控制子进程的创建个数。

enum{
    OK = 0,
    UsageError,
    PipeError,
    ForkError,
};

void Usage(string proc){
    cout << "Usage: " << proc << " " << endl;
    cout << "Example: " << proc << " 4" << endl;
    exit(UsageError);
}

// main函数的第一个参数是数组元素个数,第二个元素是创建子进程个数
int main(int argc, char *argv[]){
    if(argc != 2){
        Usage(argv[0]);
        return UsageError;
    }
    int num = atoi(argv[1]);
    // 接下来创建管道和子进程
}

我们封装一个类来管理进程池:

using work_t = function;

class ProcessPool{
public:
    ProcessPool(int n, work_t w) : num(n), work(w) {}
    ~ProcessPool() {}
    int InitProcesspool();
    void DisPatchTasks();
    void CleanProcessPool();
private:
    vector channels; // 管道
    int num; // 进程的个数
    work_t work; // 工作类型(void())
};

Channel类用于管理管道:

class Channel{
public:
    Channel(int wfd, pid_t who) : _wfd(wfd), _who(who) {
        _name = "Channel-" + to_string(wfd) + '-' + to_string(who);
    }
    string Name() { return _name; }
    void Send(int cmd) { write(_wfd, &cmd, sizeof(cmd)); }
    void Close() { close(_wfd); }
    pid_t Id() { return _who; }
    ~Channel() {}
private:
    int _wfd; // master的写端
    string _name; // 对应worker的名字
    pid_t _who; // 记录哪个子进程的管道
};

任务类模拟三种任务:

void Download() { cout << "Downloading..." << endl; }
void Log() { cout << "Logging..." << endl; }
void Sql() { cout << "Executing SQL..." << endl; }

using task_t = function;

class TaskManager{
public:
    TaskManager(){
        srand(time(nullptr));
        InsertTask(Download);
        InsertTask(Log);
        InsertTask(Sql);
    }
    ~TaskManager() {}
    void InsertTask(task_t t) { tasks[number++] = t; }
    int SelectTask() { return rand() % number; }
    void Excute(int number) {
        if(tasks.find(number) == tasks.end()) return;
        tasks[number]();
    }
private:
    unordered_map tasks;
};

static int number = 0;
TaskManager tmp;

void Usage(string proc){
    cout << "Usage: " << proc << " " << endl;
    cout << "Example: " << proc << " 4" << endl;
    exit(UsageError);
}

接下来实现InitProcesspool()DisPatchTasks()CleanProcessPool()函数:

int InitProcesspool(){
    for (int i = 0; i < num; i++) {
        int pipefd[2];
        if (pipe(pipefd) < 0) {
            perror("pipe");
            return PipeError;
        }
        pid_t pid = fork();
        if (pid < 0) {
            perror("fork");
            return ForkError;
        } else if (pid == 0) {
            close(pipefd[1]);
            dup2(pipefd[0], 0);
            close(pipefd[0]);
            work();
            exit(OK);
        } else {
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1], pid));
        }
    }
    return OK;
}

void DisPatchTasks(){
    int who = 0;
    int num = 20;
    while (num--) {
        int task = tmp.SelectTask();
        Channel &curr = channels[who];
        who++;
        who %= channels.size();
        cout << "Dispatch task " << task << " to " << curr.Name() << endl;
        curr.Send(task);
    }
}

void CleanProcessPool() {
    for (auto &c : channels) {
        c.Close();
    }
    for (auto &e : channels) {
        pid_t rid = waitpid(e.Id(), nullptr, 0);
        if (rid > 0) {
            cout << "子进程 " << rid << " 已回收" << endl;
        }
    }
}

最后,封装main.cc文件:

#include "ProcessPool.hpp"
#include "Task.hpp"

int main(int argc, char *argv[]){
    if(argc != 2) {
        Usage(argv[0]);
        return UsageError;
    }
    int num = atoi(argv[1]);
    ProcessPool *pp = new ProcessPool(num, Worker);
    pp->InitProcesspool();
    pp->DisPatchTasks();
    pp->CleanProcessPool();
    delete pp;
    return 0;
}

相关头文件和Makefile

// Channel.hpp
#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__
#include 
#include 
#include 
using namespace std;
// ... (Channel类定义)
#endif

// ProcessPool.hpp
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include "Channel.hpp"
#include "Task.hpp"
// ... (ProcessPool类定义)

// Task.hpp
#pragma once
#include 
#include 
#include 
#include 
using namespace std;
// ... (TaskManager类定义)

// Makefile
BIN=processpool
cc=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
SRC=$(shell ls *.cc)
OBJ=$(SRC:.cc=.o)
$(BIN):$(OBJ)
    $(cc) $(LDFLAGS) $@ $^
%.o:%.cc
    $(cc) $(FLAGS) $<
clean:
    rm -f $(OBJ) $(BIN)

【Linux】进程间通信——进程池

总结:

本文详细介绍了进程池的概念及其在实际应用中的作用。通过代码模拟,我们展示了如何初始化进程池、分发任务、执行任务逻辑以及清理进程池。文章还涵盖了相关的封装类和文件结构,如main.ccChannel.hppProcessPool.hppTask.hppMakefile,这些内容为理解和实现进程池提供了全面的指导。

进程池是一种有效的资源管理技术,能够提高多任务处理的效率和系统性能。通过合理的设计和实现,进程池可以在复杂的系统中发挥重要作用,减少资源浪费并提升任务执行的稳定性。希望本文的内容能为读者在实际项目中应用进程池提供有价值的参考。

【Linux】进程间通信——进程池

本篇关于《【Linux】进程池详解:进程间通信秘诀》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于文章的相关知识,请关注golang学习网公众号!

相关阅读
更多>
最新阅读
更多>
课程推荐
更多>