/* Thread Pool implementation for unix / linux environments Copyright (C) 2008 Shobhit Gupta This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "threadpool.h" #include #include #include class ScopedMutex { public: explicit ScopedMutex(pthread_mutex_t * const mutex):_mutex(mutex) { assert(mutex); int ret = pthread_mutex_lock(_mutex); switch (ret) { case 0: break; case EINVAL: throw Error("EINVAL returned by pthread_mutex_lock()"); case EAGAIN: throw Error("EAGAIN returned by pthread_mutex_lock()"); case EDEADLK: throw Error("EDEADLK returned by pthread_mutex_lock()"); default: throw Error("UNKNOWN returned by pthread_mutex_lock()"); } } ~ScopedMutex() { int ret = pthread_mutex_unlock(_mutex); switch (ret) { case 0: break; case EPERM: default: /* * No choose but die here. Destructor shall be a nofail function. */ std::cerr << ret << " returned by pthread_mutex_unlock()" << std::endl; std::terminate(); } } private: pthread_mutex_t * const _mutex; }; ThreadPool::ThreadPool(unsigned int num_thread) :_thread_pool(num_thread) ,_work() { if (0 == num_thread) { throw Error("zero thread?"); } init_sem(&_available_work); init_mutex(&_work_mutex); for (std::vector::iterator i = _thread_pool.begin(); i != _thread_pool.end(); ++i) { int ret = pthread_create(&*i, NULL, &ThreadPool::thread_execute, this); switch (ret) { case 0: break; case EAGAIN: throw Error("EAGAIN returned by pthread_create()"); case EINVAL: throw Error("EINVAL returned by pthread_create()"); case EPERM: throw Error("EPERM returned by pthread_create()"); default: throw Error("UNKNOWN returned by pthread_create()"); } } } ThreadPool::~ThreadPool() { /* * All failures are ignored in destructor. */ int ret = sem_destroy(&_available_work); if (0 != ret) { std::cerr << errno << " returned by sem_destory(). Ignore" << std::endl; } ret = pthread_mutex_destroy(&_work_mutex); if (0 != ret) { std::cerr << ret << " returned by pthread_mutex_destroy(). Ignore" << std::endl; } } void ThreadPool::assign_work(WorkerThread *workerThread) { if (!workerThread) { throw Error("null?"); } ScopedMutex mutex(&_work_mutex); _work.push_back(workerThread); post_sem(&_available_work); } WorkerThread * ThreadPool::fetch_work() { wait_sem(&_available_work); ScopedMutex mutex(&_work_mutex); WorkerThread* work = _work.front(); _work.pop_front(); return work; } void * ThreadPool::thread_execute(void *param) { ThreadPool *pool = static_cast(param); for (;;) { WorkerThread* worker = pool->fetch_work(); worker->executeThis(); } return 0; } void ThreadPool::init_mutex(pthread_mutex_t* const mutex) { int ret = pthread_mutex_init(mutex, NULL); switch (ret) { case 0: break; case EAGAIN: throw Error("EAGAIN returned by pthread_mutex_init()"); case ENOMEM: throw Error("ENOMEM returned by pthread_mutex_init()"); case EPERM: throw Error("EPERM returned by pthread_mutex_init()"); case EBUSY: throw Error("EBUSY returned by pthread_mutex_init()"); case EINVAL: throw Error("EINVAL returned by pthread_mutex_init()"); default: throw Error("UNKNOWN returned by pthread_mutex_init()"); }; } void ThreadPool::init_sem(sem_t* const sem) { int ret = sem_init(sem, 0, 0); if (0 == ret) return; switch (errno) { case EINVAL: throw Error("EINVAL returned by sem_init()"); case ENOSYS: throw Error("ENOSYS returned by sem_init()"); default: throw Error("UNKNOWN returned by sem_init()"); } } void ThreadPool::post_sem(sem_t* const sem) { int ret = sem_post(sem); if (0 != ret) { switch (errno) { case EINVAL: throw Error("EINVAL returned by sem_post()"); case EOVERFLOW: throw Error("EOVERFLOW returned by sem_post()"); default: throw Error("UNKNOWN returned by sem_post()"); } } } void ThreadPool::wait_sem(sem_t* const sem) { int ret = 0; do { ret = sem_wait(sem); } while (0 != ret && EINTR == errno); if (0 == ret) return; switch (errno) { case EINVAL: throw Error("EINVAL returned by sem_wait()"); default: throw Error("UNKNOWN returned by sem_wait()"); } } Error::Error(const char * what) :_what(what) { } Error::~Error() throw() { } const char* Error::what() const throw() { return _what; }