# ThreadPool **Repository Path**: BambooWine/thread-pool ## Basic Information - **Project Name**: ThreadPool - **Description**: 基于C++17/20实现轻量级线程池 - **Primary Language**: C++ - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 1 - **Created**: 2023-07-31 - **Last Updated**: 2024-09-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: 多线程, 并发, 线程池, Cpp17 ## README # ThreadPool ## 1. 简介 ### 1.1 项目动机 多线程技术在提高程序并发性发挥着重大作用,对于现代高性能计算也有着举足轻重的地位;C++11标准之后,提供了并发支持库,包含线程、原子操作、互斥、条件变量和future的内建支持。但是线程的创建和销毁存在的一定的开销,特别是频繁的创建销毁可能引起程序响应速度的下降,同时不便于管理。 线程池是一种池化资源技术,负责管理和调度线程执行任务;主要的特点是:可以在整个生命周期内,不断重用线程处理用户提交的任务,避免线程频繁创建和销毁带来的开销,提高响应速度,同时增加线程的可管理性。 但是C++标准中并没有相应的线程池框架,因此本项目的宗旨是:**基于C++标准,实现轻量级线程池**,同时满足: + **高性能性:** 合理设计项目结构和代码逻辑,使得程序性能和响应速度尽可能高; + **可移植性:** 仅依赖C++标准库,不适用任何第三方库; + **易使用性:** 提供方便的用户接口以及说明文档,同时提供必要的程序运行提示信息和异常检测机制; ### 1.2 项目介绍 本项目基于**C++17标准**,部分涉及C++20(此部分不必担心,仅仅使用例如std::format等格式化库),项目的整体逻辑如下所示: 1. 使用**std::function** 和 **bind** 接收用户所提交的任务,并生成相应的任务队列;使用 **packaged_task** 延迟启动和 **future** 异步获取任务的返回值; 2. 添加新任务时会激活工作线程,使其从队头获取任务;使用mutex和unique_lock以及条件变量实现**线程的同步**问题;对于一些线程间共享变量,使用 **std::atomic原子类型** 完成**无锁并发编程**; 3. 增加可选的**守护线程**,职责为:任务过多时添加工作线程以及销毁一定数目的闲置线程; 4. 支持**并行处理循环**执行的任务,尽可能提高CPU利用率以及运行效率; 5. 该线程池轻量级、易于使用、可移植,重用线程避免单个任务创建和销毁线程的开销,提高响应速度同时增加线程的可管理性; ## 2. 使用方法 ### 2.1 编译环境 本项目的开发环境为: + Windows 10 x64;Inter(R) Core(TM) i5-7200U CPU @2.50GHZ 2.70GHZ;8.00GB RAM; + Visual Studio Code 编辑器; + MinGW gcc 13.1.0,编译参数 `-std=c++23`; 此外,该库在以下环境(最低,由于使用std::format,需要求支持C++20)编译无误: + MinGW gcc 13.1.0,编译参数 `-std=c++20`; + MinGW clang 16.0.2,编译参数 `-std=c++20`; + x86-64 gcc 13.1.0,编译参数 `-std=c++20`; + x64 msvc v19.32,编译参数 `/std:c++20`; ### 2.2 项目导入 从本仓库下载源代码文件,将其中的 `thread_pool.hpp` 和 `util.hpp`包含在本地程序中,其中: + `thread_pool.hpp`:包含线程池整体代码; + `util.hpp`:提供必要的工具,包括 **同步输出流**、**时间类**; ```cpp #include #include #include "thread_pool.hpp" int main () { using namespace tp; ThreadPool pool; std::future ans = pool.push([](int n){ int sum = 0; for (int i = 0; i < n; ++i) sum += i; return sum; }, 100); pool.wait_until_done(); cout << "ans: " << ans.get() << endl; return 0; } ``` 程序输出: ```cpp ans: 4950 ``` ### 2.3 用户接口 #### 2.3.1 构造函数 ```cpp ThreadPool(size_t count = std::thread::hardware_concurrency(), bool destroy_idle = false); ``` + `count`:线程池初始线程数; + `destroy_idle`:守护线程是否需要销毁闲置线程,默认false; 对于线程数目,有 `THREADS_MAX` 和 `THREADS_MIN` 的限制,可以根据实际任务进行具体定制。 #### 2.3.2 添加任务 ```cpp template decltype(auto) push(F&& f, Args&&... args); ``` 本函数为模板方法: + `f`:函数调用器,包含函数指针、仿函数、lambda、function等; + `args`:函数参数,是形参包类型; 该 `push` 方法会接收用户提交任务,并添加到任务队列,等待工作线程响应;使用`std::invoke_result_t`获取任务返回值类型,该 `push` 方法返回 `future` 类型的变量,表示用户任务的返回值,接着通过 `get` 接口获取实际值。 ```cpp template decltype(auto) push_loop(F&& f, const T start, const T end, const size_t num_blocks = 0); ``` 类似 `push`,本函数的功能是:提交循环执行的任务(循环之间不相关); + `f`:函数调用器; + `start`:循环起始区间,整型类型; + `end`:循环结束区间,整型类型;需要强调的是,和标准库保持一直,循环区间是 **左闭右开**; + `num_blocks`:将该循环任务分为若干个子块,默认 `std::thread::hardware_concurrency()`; #### 2.3.3 线程池状态操作 + `pause`:暂停线程池; + `resume`:线程池继续执行; + `clear`:清空剩余任务; + `wait_until_done`:等待所有任务执行完毕; + `wait_for`:等待所有任务执行完毕(一段时间); ```cpp template bool wait_for(const chrono::duration<_Rep, _Period>& __rtime); ``` #### 2.3.4 线程池属性获取 + `get_threads_count`:获取当前存活线程数; + `get_threads_running`:获取当前正在运行的线程数; + `get_tasks_count`:获取任务队列的任务数目,即剩余任务数; + `get_tasks_total`:获取当前所有未完成任务数目; + `is_running`:判断线程池是否正在运行; + `is_closed`:判断线程池是否已关闭; + `is_waiting`:判断线程池是否正在等待任务执行完毕; + `is_paused`:判断线程池是否暂停; + `is_empty`:判断任务队列是否为空; #### 2.3.5 工具类 **SyncStream**:同步输出流: 函数原型: ```cpp template void println(Args&&... args); ``` 使用方法: ```cpp SyncStream().println(std::format("Bob is {} years old.", 25)); ``` 程序输出: ```cpp Bob is 25 years old. ``` **TimeGuard**:时间管理类,可以用于打印运行时间; 使用方法: ```cpp TimeGuard tg; // ... tg.print_duration(); // 打印耗时 tg.update_start() // 更新start // ... auto interval = tg.get_duration(); // 获取耗时 ``` ## 3. 测试及使用案例 ### 3.1 提交任务 该测试中,构建一个用户任务 `ff`;对比本线程池与 `std::thread` 和 `std::async` 的效率,测试代码如下: ```cpp struct AA { string s; }; long long ff(int n, int id) { long long ans = 0; AA arr[1000]; for (int i = 0; i < n; ++i) { ans += i; arr[min(i, 999)].s = to_string(i); } return ans; } void test_task() { SyncStream().println("\n========== Test Tasks =========="); TimeGuard tg; const int mx = 100; { using namespace tp; ThreadPool pool(8); vector> v(mx); for (int i = 0; i < mx; ++i) { auto r = pool.push(ff, (i + 1) * 10000, i + 1); v[i] = std::move(r); } for (int i = 0; i < mx; ++i) { v[i].get(); } } SyncStream().println(std::format("Thread-pool cost {:.3f}s", tg.get_duration())); tg.update_start(); { thread t[mx]; for (int i = 0; i < mx; ++i) { t[i] = thread{ff, (i + 1) * 10000, i + 1}; } for (int i = 0; i < mx; ++i) t[i].join(); } SyncStream().println(std::format("std::thread cost {:.3f}s", tg.get_duration())); tg.update_start(); { vector> v(mx); for (int i = 0; i < mx; ++i) { auto r = std::async(ff, (i + 1) * 10000, i + 1); v[i] = std::move(r); } for (int i = 0; i < mx; ++i) { v[i].get(); } } SyncStream().println(std::format("std::async cost {:.3f}s", tg.get_duration())); } ``` ### 3.2 并行化执行循环任务 对于一些循环间不相关的任务,可以采用 **并行化** 手段;现在对比线程池并行化和串行执行的效率;测试代码如下: ```cpp void test_loop() { constexpr int len = 1000; vector arr(len); vector ss(len); auto loop = [&arr, &ss](const int start, const int end) { for (int i = start; i < end; ++i) { size_t sum = 0; for (size_t j = 0; j < i * 1000; ++j) sum += j; arr[i] = sum; for (size_t j = 0; j < i * 50; ++j) { ss[i].s += to_string(j * i * 12344321); } } }; SyncStream().println("\n========== Test Loop ========== "); TimeGuard tg; { using namespace tp; ThreadPool pool(8); pool.push_loop(loop, 0, len, 8); pool.wait_until_done(); } SyncStream().println(std::format("parallel loop costs {:.3f}s", tg.get_duration())); ss = vector(len); tg.update_start(); { loop(0, len); } SyncStream().println(std::format("Serial loop costs {:.3f}s", tg.get_duration())); } ``` ### 3.3 测试效果图 对比测试效果图如下所示: ![image-20230801113830197](https://bamboowine-img-1259155549.cos.ap-beijing.myqcloud.com/img/image-20230801113830197.png) + Test Tasks:三者差别基本不大,在当前实验下,线程池的效率略低于 `std::async` (可能存在线程调度开销); + Test Loop:线程池的效率比串行高两倍以上; ## 4. 关于项目 + 如果您对本项目感兴趣,欢迎拉取仓库或者下载代码,在本地使用; + 如果您在使用中遇到任何错误,或者想增加新的功能,可以随时发布一个issue; + 如果您觉得本项目有用,可以考虑给个star;