#include "ThreadPool.h" using namespace sequencelogic; ThreadPool::ThreadPool() : poolSize(DEFAULT_POOL_SIZE) { //std::cout << "Constructed ThreadPool of size " << poolSize << std::endl; } ThreadPool::ThreadPool(int pool_size) : poolSize(pool_size) { //std::cout << "Constructed ThreadPool of size " << poolSize << std::endl; } ThreadPool::~ThreadPool() { // Release resources if (poolState != STOPPED) destroy(); } // We can't pass a member function to pthread_create. // So created the wrapper function that calls the member function // we want to run in the thread. extern "C" void* start_thread(void* arg) { ThreadPool* tp = (ThreadPool*) arg; tp->executeThread(); return NULL; } int ThreadPool::init() { // Create all the threads for the pool. poolState = STARTED; int ret = -1; //Add all the threads for (int i = 0; i < poolSize; i++) { //create a thread pthread_t tid; ret = pthread_create(&tid, NULL, start_thread, (void*) this); if (ret != 0) { cerr << "pthread_create() failed: " << ret << std::endl; return -1; } //add it to the pool threads.push_back(tid); } //std::cout << poolSize << " threads created by the thread pool" << std::endl; return 0; } int ThreadPool::destroy() { // Note: this is not for synchronization, its for thread communication! // destroy_threadpool() will only be called from the main thread, yet // the modified poolState may not show up to other threads until its // modified in a lock! taskLock.lock(); poolState = STOPPED; taskLock.unlock(); //std::cout << "Broadcasting STOP signal to all threads..." << std::endl; taskCond.broadcast(); // notify all threads we are shttung down int ret = -1; for (int i = 0; i < poolSize; i++) { void* result; ret = pthread_join(threads[i], &result); //std::cout << "pthread_join() returned " << ret << ": " << strerror(errno) << std::endl; taskCond.broadcast(); // try waking up a bunch of threads that are still waiting } //std::cout << poolSize << " threads exited from the thread pool" << std::endl; return 0; } void* ThreadPool::executeThread() { Task* task = NULL; //std::cout << "Starting thread " << pthread_self() << std::endl; while(true) { // Try to pick a task //std::cout << "Locking: " << pthread_self() << std::endl; taskLock.lock(); // We need to put pthread_cond_wait in a loop for two reasons: // 1. There can be spurious wakeups (due to signal/ENITR) // 2. When mutex is released for waiting, another thread can be waken up // from a signal/broadcast and that thread can mess up the condition. // So when the current thread wakes up the condition may no longer be // actually true! while ((poolState != STOPPED) && (tasks.empty())) { // Wait until there is a task in the queue // Unlock mutex while waiting, then lock it back when signaled //std::cout << "Unlocking and waiting: " << pthread_self() << std::endl; taskCond.wait(taskLock.getMutexPtr()); //std::cout << "Signaled and locking: " << pthread_self() << std::endl; } // If the thread was woken up to notify process shutdown, return from here if (poolState == STOPPED) { ////std::cout << "Unlocking and exiting: " << pthread_self() << std::endl; taskLock.unlock(); pthread_exit(NULL); } //get the task to be executed task = tasks.front(); //std::cout << "Getting task " << task << std::endl; tasks.pop_front(); //std::cout << "Unlocking: " << pthread_self() << std::endl; taskLock.unlock(); //std::cout << "Executing thread " << pthread_self() << std::endl; // execute the task if (task != NULL) { printMsg("Running task...."); task->run(); //std::cout << " Ran task successfully!" << std::endl; } //std::cout << "Done executing thread " << pthread_self() << std::endl; delete task; } return NULL; } int ThreadPool::addTask(Task* task) { taskLock.lock(); tasks.push_back(task); // Wake up a thread to do some work. taskCond.signal(); taskLock.unlock(); return 0; } void ThreadPool::getTaskCount(int* res) { taskLock.lock(); *res = tasks.size(); taskLock.unlock(); } void ThreadPool::printMsg(std::string msg) { std::cout << " " << msg << std::endl; }