161 lines
4.2 KiB
C++
161 lines
4.2 KiB
C++
#include "ThreadPool.h"
|
|
|
|
|
|
using namespace sequencelogic;
|
|
|
|
ThreadPool::ThreadPool() : poolSize(DEFAULT_POOL_SIZE)
|
|
{
|
|
//std::cout << "Constructed ThreadPool of size " << poolSize << std::endl;
|
|
}
|
|
|
|
ThreadPool::ThreadPool(int pool_size) : poolSize(pool_size)
|
|
{
|
|
//std::cout << "Constructed ThreadPool of size " << poolSize << std::endl;
|
|
}
|
|
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
// Release resources
|
|
if (poolState != STOPPED)
|
|
destroy();
|
|
}
|
|
|
|
// We can't pass a member function to pthread_create.
|
|
// So created the wrapper function that calls the member function
|
|
// we want to run in the thread.
|
|
extern "C"
|
|
void* start_thread(void* arg)
|
|
{
|
|
ThreadPool* tp = (ThreadPool*) arg;
|
|
tp->executeThread();
|
|
return NULL;
|
|
}
|
|
|
|
int ThreadPool::init()
|
|
{
|
|
// Create all the threads for the pool.
|
|
poolState = STARTED;
|
|
int ret = -1;
|
|
//Add all the threads
|
|
for (int i = 0; i < poolSize; i++)
|
|
{
|
|
//create a thread
|
|
pthread_t tid;
|
|
ret = pthread_create(&tid, NULL, start_thread, (void*) this);
|
|
|
|
if (ret != 0)
|
|
{
|
|
cerr << "pthread_create() failed: " << ret << std::endl;
|
|
return -1;
|
|
}
|
|
//add it to the pool
|
|
threads.push_back(tid);
|
|
}
|
|
//std::cout << poolSize << " threads created by the thread pool" << std::endl;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int ThreadPool::destroy()
|
|
{
|
|
// Note: this is not for synchronization, its for thread communication!
|
|
// destroy_threadpool() will only be called from the main thread, yet
|
|
// the modified poolState may not show up to other threads until its
|
|
// modified in a lock!
|
|
taskLock.lock();
|
|
poolState = STOPPED;
|
|
taskLock.unlock();
|
|
//std::cout << "Broadcasting STOP signal to all threads..." << std::endl;
|
|
taskCond.broadcast(); // notify all threads we are shttung down
|
|
|
|
int ret = -1;
|
|
for (int i = 0; i < poolSize; i++)
|
|
{
|
|
void* result;
|
|
ret = pthread_join(threads[i], &result);
|
|
//std::cout << "pthread_join() returned " << ret << ": " << strerror(errno) << std::endl;
|
|
taskCond.broadcast(); // try waking up a bunch of threads that are still waiting
|
|
}
|
|
//std::cout << poolSize << " threads exited from the thread pool" << std::endl;
|
|
return 0;
|
|
}
|
|
|
|
void* ThreadPool::executeThread()
|
|
{
|
|
Task* task = NULL;
|
|
//std::cout << "Starting thread " << pthread_self() << std::endl;
|
|
while(true)
|
|
{
|
|
// Try to pick a task
|
|
//std::cout << "Locking: " << pthread_self() << std::endl;
|
|
taskLock.lock();
|
|
|
|
// We need to put pthread_cond_wait in a loop for two reasons:
|
|
// 1. There can be spurious wakeups (due to signal/ENITR)
|
|
// 2. When mutex is released for waiting, another thread can be waken up
|
|
// from a signal/broadcast and that thread can mess up the condition.
|
|
// So when the current thread wakes up the condition may no longer be
|
|
// actually true!
|
|
while ((poolState != STOPPED) && (tasks.empty()))
|
|
{
|
|
// Wait until there is a task in the queue
|
|
// Unlock mutex while waiting, then lock it back when signaled
|
|
//std::cout << "Unlocking and waiting: " << pthread_self() << std::endl;
|
|
taskCond.wait(taskLock.getMutexPtr());
|
|
//std::cout << "Signaled and locking: " << pthread_self() << std::endl;
|
|
}
|
|
|
|
// If the thread was woken up to notify process shutdown, return from here
|
|
if (poolState == STOPPED)
|
|
{
|
|
////std::cout << "Unlocking and exiting: " << pthread_self() << std::endl;
|
|
taskLock.unlock();
|
|
pthread_exit(NULL);
|
|
}
|
|
|
|
//get the task to be executed
|
|
|
|
task = tasks.front();
|
|
//std::cout << "Getting task " << task << std::endl;
|
|
tasks.pop_front();
|
|
//std::cout << "Unlocking: " << pthread_self() << std::endl;
|
|
taskLock.unlock();
|
|
//std::cout << "Executing thread " << pthread_self() << std::endl;
|
|
// execute the task
|
|
if (task != NULL)
|
|
{
|
|
printMsg("Running task....");
|
|
task->run();
|
|
//std::cout << " Ran task successfully!" << std::endl;
|
|
}
|
|
//std::cout << "Done executing thread " << pthread_self() << std::endl;
|
|
delete task;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
int ThreadPool::addTask(Task* task)
|
|
{
|
|
taskLock.lock();
|
|
|
|
tasks.push_back(task);
|
|
|
|
// Wake up a thread to do some work.
|
|
taskCond.signal();
|
|
|
|
taskLock.unlock();
|
|
|
|
return 0;
|
|
}
|
|
|
|
void ThreadPool::getTaskCount(int* res)
|
|
{
|
|
taskLock.lock();
|
|
*res = tasks.size();
|
|
taskLock.unlock();
|
|
}
|
|
|
|
void ThreadPool::printMsg(std::string msg)
|
|
{
|
|
std::cout << " " << msg << std::endl;
|
|
} |