'linux通過c++實現線程池類'

Linux 電腦 CPPLinux開發架構師 2019-09-03
"

前言

初學C++,想封裝點常用的C++類,已經寫好了mutex,cond,thread的類,想用起來寫點東西,於是就決定寫線程池了,這裡拙筆記錄下學習筆記.

本文主要內容包括: 線程池的概念 、 使用原因 、 適用場景 、 線程池的實現 、 任務調度邏輯、 樣例測試 .

線程池的概念

線程池是指在一個多線程程序中創建一個線程集合,在執行新的任務的時候不是新建一個線程,而是使用線程池中已創建好的線程,一旦任務執行完畢,線程就會休眠等待新的任務分配下來,線程池中的線程數量取決於機器給進程所能分配的內存大小,以及應用程序的需求.

使用原因及適用場合

1.在服務器端編程中,最原始的方法我們使用順序化的結構,一個服務器只能處理一個客戶,如果同時2個客戶端鏈接上來了,服務器只能先處理了先到達的那個個,這樣第二個客戶端只能等了,影響客戶的響應時間.它只適用於客戶量少的短連接.這時候有方案2.

2.在多線程服務器端編程中,一個服務器如果要處理多條鏈接的客戶端,當鏈接很少的時候我們可以每來一條鏈接創建一個線程。但當併發量很大的時候呢,不停地的增加線程,在某個時間計算機資源可能耗盡.於是有了方案3

3.為了彌補方案2中每個請求創建線程的缺陷,我們使用固定大小線程池,全部IO交給IO複用線程解決(本文不涉及),而任務計算交給線程池.如果任務彼此獨立,IO壓力不大,那麼這種方案非常適合.

當然服務器模型遠不止這3種,還有很多方案,本文不涉.

線程池的實現原理

線程池類主要維繫兩個隊列: 任務隊列 , 線程隊列

"

前言

初學C++,想封裝點常用的C++類,已經寫好了mutex,cond,thread的類,想用起來寫點東西,於是就決定寫線程池了,這裡拙筆記錄下學習筆記.

本文主要內容包括: 線程池的概念 、 使用原因 、 適用場景 、 線程池的實現 、 任務調度邏輯、 樣例測試 .

線程池的概念

線程池是指在一個多線程程序中創建一個線程集合,在執行新的任務的時候不是新建一個線程,而是使用線程池中已創建好的線程,一旦任務執行完畢,線程就會休眠等待新的任務分配下來,線程池中的線程數量取決於機器給進程所能分配的內存大小,以及應用程序的需求.

使用原因及適用場合

1.在服務器端編程中,最原始的方法我們使用順序化的結構,一個服務器只能處理一個客戶,如果同時2個客戶端鏈接上來了,服務器只能先處理了先到達的那個個,這樣第二個客戶端只能等了,影響客戶的響應時間.它只適用於客戶量少的短連接.這時候有方案2.

2.在多線程服務器端編程中,一個服務器如果要處理多條鏈接的客戶端,當鏈接很少的時候我們可以每來一條鏈接創建一個線程。但當併發量很大的時候呢,不停地的增加線程,在某個時間計算機資源可能耗盡.於是有了方案3

3.為了彌補方案2中每個請求創建線程的缺陷,我們使用固定大小線程池,全部IO交給IO複用線程解決(本文不涉及),而任務計算交給線程池.如果任務彼此獨立,IO壓力不大,那麼這種方案非常適合.

當然服務器模型遠不止這3種,還有很多方案,本文不涉.

線程池的實現原理

線程池類主要維繫兩個隊列: 任務隊列 , 線程隊列

linux通過c++實現線程池類

線程池通過take方法從線程隊列提取任務,到一個線程中去執行; 有任務就提取執行,無任務則阻塞線程休眠.

任務隊列 可以單獨寫個任務類出來,也可以寫個任務類基類,預留虛任務函數接口,繼承下來泛化.

當然最便利的方法就是直接用函數地址來做任務咯.

typedef void (*Task)(void);

線程隊列線程隊列通過我自己寫的線程類實現.

#include <pthread.h>
class Thread{
public:
typedef void (*threadFun_t)(void *arg);
explicit Thread(const threadFun_t &threadRoutine, void *arg);
~Thread();
void start();
void join();
static void *threadGuide(void *arg);
pthread_t getThreadId() const{
return m_threadId;
}
private:
pthread_t m_threadId;
bool m_isRuning;
threadFun_t m_threadRoutine;
void *m_threadArg;
};
Thread::Thread(const threadFun_t &threadRoutine, void *arg)
:m_isRuning(false),
m_threadId(0),
m_threadRoutine(threadRoutine),
m_threadArg(arg){
}
Thread::~Thread(){
if(m_isRuning){//如果線程正在執行,則分離此線程.
CHECK(!pthread_detach(m_threadId));
}
}
void *Thread::threadGuide(void *arg){
Thread *p = static_cast<Thread *>(arg);
p->m_threadRoutine(p->m_threadArg);
return NULL;
}
void Thread::join(){
VERIFY(m_isRuning);
CHECK(!pthread_join(m_threadId, NULL));
m_isRuning = false;
}
void Thread::start(){
pthread_attr_t attr;
CHECK(!pthread_attr_init(&attr));
//CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); //set thread separation state property
CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)); //Set thread inheritance
CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER)); //set thread scheduling policy
CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)); //Set thread scope
CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this));
m_isRuning = true;
}{
MutexLockGuard lock(m_mutex);
m_isRuning = false;
}
}

構造傳入帶一個空類型參數指針( 為什麼要帶這個空類型指針後面會提 )函數指針,通過start()方法創建線程,然後執行threadGuide()方法調用構造時候傳入的函數指針執行咱們想運行的函數實現Thread類.

這兩個隊列在線程池中的定義如下:

private:
std::vector<Thread *> m_threads;
std::deque<Task> m_tasks;

任務調度邏輯

任務分配邏輯主要靠兩個條件變量實現,(條件變量本文不做詳述)

1.任務隊列是否空.

2.任務隊列是否滿.

其邏輯如下圖所示:

"

前言

初學C++,想封裝點常用的C++類,已經寫好了mutex,cond,thread的類,想用起來寫點東西,於是就決定寫線程池了,這裡拙筆記錄下學習筆記.

本文主要內容包括: 線程池的概念 、 使用原因 、 適用場景 、 線程池的實現 、 任務調度邏輯、 樣例測試 .

線程池的概念

線程池是指在一個多線程程序中創建一個線程集合,在執行新的任務的時候不是新建一個線程,而是使用線程池中已創建好的線程,一旦任務執行完畢,線程就會休眠等待新的任務分配下來,線程池中的線程數量取決於機器給進程所能分配的內存大小,以及應用程序的需求.

使用原因及適用場合

1.在服務器端編程中,最原始的方法我們使用順序化的結構,一個服務器只能處理一個客戶,如果同時2個客戶端鏈接上來了,服務器只能先處理了先到達的那個個,這樣第二個客戶端只能等了,影響客戶的響應時間.它只適用於客戶量少的短連接.這時候有方案2.

2.在多線程服務器端編程中,一個服務器如果要處理多條鏈接的客戶端,當鏈接很少的時候我們可以每來一條鏈接創建一個線程。但當併發量很大的時候呢,不停地的增加線程,在某個時間計算機資源可能耗盡.於是有了方案3

3.為了彌補方案2中每個請求創建線程的缺陷,我們使用固定大小線程池,全部IO交給IO複用線程解決(本文不涉及),而任務計算交給線程池.如果任務彼此獨立,IO壓力不大,那麼這種方案非常適合.

當然服務器模型遠不止這3種,還有很多方案,本文不涉.

線程池的實現原理

線程池類主要維繫兩個隊列: 任務隊列 , 線程隊列

linux通過c++實現線程池類

線程池通過take方法從線程隊列提取任務,到一個線程中去執行; 有任務就提取執行,無任務則阻塞線程休眠.

任務隊列 可以單獨寫個任務類出來,也可以寫個任務類基類,預留虛任務函數接口,繼承下來泛化.

當然最便利的方法就是直接用函數地址來做任務咯.

typedef void (*Task)(void);

線程隊列線程隊列通過我自己寫的線程類實現.

#include <pthread.h>
class Thread{
public:
typedef void (*threadFun_t)(void *arg);
explicit Thread(const threadFun_t &threadRoutine, void *arg);
~Thread();
void start();
void join();
static void *threadGuide(void *arg);
pthread_t getThreadId() const{
return m_threadId;
}
private:
pthread_t m_threadId;
bool m_isRuning;
threadFun_t m_threadRoutine;
void *m_threadArg;
};
Thread::Thread(const threadFun_t &threadRoutine, void *arg)
:m_isRuning(false),
m_threadId(0),
m_threadRoutine(threadRoutine),
m_threadArg(arg){
}
Thread::~Thread(){
if(m_isRuning){//如果線程正在執行,則分離此線程.
CHECK(!pthread_detach(m_threadId));
}
}
void *Thread::threadGuide(void *arg){
Thread *p = static_cast<Thread *>(arg);
p->m_threadRoutine(p->m_threadArg);
return NULL;
}
void Thread::join(){
VERIFY(m_isRuning);
CHECK(!pthread_join(m_threadId, NULL));
m_isRuning = false;
}
void Thread::start(){
pthread_attr_t attr;
CHECK(!pthread_attr_init(&attr));
//CHECK(!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); //set thread separation state property
CHECK(!pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED)); //Set thread inheritance
CHECK(!pthread_attr_setschedpolicy(&attr, SCHED_OTHER)); //set thread scheduling policy
CHECK(!pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM)); //Set thread scope
CHECK(!pthread_create(&m_threadId, &attr, threadGuide, this));
m_isRuning = true;
}{
MutexLockGuard lock(m_mutex);
m_isRuning = false;
}
}

構造傳入帶一個空類型參數指針( 為什麼要帶這個空類型指針後面會提 )函數指針,通過start()方法創建線程,然後執行threadGuide()方法調用構造時候傳入的函數指針執行咱們想運行的函數實現Thread類.

這兩個隊列在線程池中的定義如下:

private:
std::vector<Thread *> m_threads;
std::deque<Task> m_tasks;

任務調度邏輯

任務分配邏輯主要靠兩個條件變量實現,(條件變量本文不做詳述)

1.任務隊列是否空.

2.任務隊列是否滿.

其邏輯如下圖所示:

linux通過c++實現線程池類

start()方法是線程池的運行方法.通過它創建線程池.

threadRoutine()就是我們就是線程池中創建的線程,

線程跑起來後,通過 isRunning 控制線程循環是否退出.

stop()方法關閉線程池,回收資源.

循環中判斷 : 有任務則執行,無任務則wait 阻塞等待.

void ThreadPool::start(){
m_isRuning = true;
m_threads.reserve(m_threadsSize);
for(size_t i = 0; i < m_threadsSize; i++){
m_threads.push_back(new Thread(threadRoutine, this));
m_threads[i]->start();
}
}

Thread(threadRoutine, this) 這裡就是為什麼我線程類要帶一個無符號類型指針參數的原因,因為靜態函數無法調用c++的類成員函數(主要原因是類在編譯期間未實例化沒有明確的地址.)我們只能通過線程池對象的this指針調用它的成員.

任務調度的源碼實現:

ThreadPool::ThreadPool(size_t tasksSize, size_t threadsSize)
:m_tasksSzie(tasksSize),
m_threadsSize(threadsSize),
m_mutex(),
m_tasksEmpty(m_mutex),
m_tasksFull(m_mutex),
m_isRuning(false){
}
ThreadPool::~ThreadPool(){
if(m_isRuning){
stop();
}
}
void ThreadPool::threadRoutine(void *arg){
ThreadPool *p = static_cast<ThreadPool *>(arg);
while(p->m_isRuning){
ThreadPool::Task task(p->take());
if(task){
task();
}
}
}
ThreadPool::Task ThreadPool::take(){
MutexLockGuard lock(m_mutex);
while(m_tasks.empty() && m_isRuning){
m_tasksEmpty.wait();
}
if(!m_tasks.empty()){
Task task = m_tasks.front();
m_tasks.pop_front();
m_tasksFull.notify();
return task;
}
return NULL;
}
void ThreadPool::addTask(Task task){
if(m_threads.empty()){//如果線程池是空的,直接跑任務.
task();
}
else{
MutexLockGuard lock(m_mutex);
while(m_tasksSzie > 0 && m_tasks.size() >= m_tasksSzie){
m_tasksFull.wait();
}
m_tasks.push_back(task);
m_tasksEmpty.notify();
}
}
void ThreadPool::start(){
m_isRuning = true;
m_threads.reserve(m_threadsSize);
for(size_t i = 0; i < m_threadsSize; i++){
m_threads.push_back(new Thread(threadRoutine, this));
m_threads[i]->start();
}
}
void ThreadPool::stop(){
{
MutexLockGuard lock(m_mutex);
m_isRuning = false;
m_tasksEmpty.notifyAll();
}
for(int i = m_threadsSize - 1; i >= 0; i--){
m_threads[i]->join();
delete(m_threads[i]);
m_threads.pop_back();
}
}

程序測試

測試代碼:

創建一個有兩個線程,擁有5個任務的任務隊列,執行8個加數任務.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "MutexLock.hh"
#include "Thread.hh"
#include <unistd.h>
#include "Condition.hh"
#include "ThreadPool.hh"
#include <vector>
//threadPool test
MutexLock CntLock;
int cnt = 0;
void test(void){
unsigned long i = 0xfffffff;
//MutexLockGuard loo(CntLock);
//CntLock.lock();
while(i--);
printf("%d\\n", ++cnt);
//CntLock.unlock();
sleep(1);
}
int main()
{
//ThreadPool Test
ThreadPool tp(5, 2);
tp.start();
sleep(3);
for(int i = 0; i < 8; i++)
tp.addTask(test);

getchar();
return 0;
}

簡單test結果:

thread 140068496353024 run task
thread 140068504745728 run task
1
2
thread 140068504745728 run task
thread 140068496353024 run task
3
4
thread 140068496353024 run task
thread 140068504745728 run task
5
6
thread 140068496353024 run task
thread 140068504745728 run task
7
8
"

相關推薦

推薦中...