1616 along with this program. If not, see <http://www.gnu.org/licenses/>.
1717*/
1818#include " threadpool.h"
19+ #include < iostream>
1920#include < stdlib.h>
2021#include < errno.h>
2122
2223using namespace std ;
2324
25+ class ScopedMutex {
26+ public:
27+ explicit ScopedMutex (pthread_mutex_t * const mutex):_mutex(mutex)
28+ {
29+ int ret = pthread_mutex_lock (_mutex);
30+ switch (ret) {
31+ case 0 :
32+ break ;
33+ case EINVAL:
34+ throw Error (" EINVAL returned by pthread_mutex_lock()" );
35+ case EAGAIN:
36+ throw Error (" EAGAIN returned by pthread_mutex_lock()" );
37+ case EDEADLK:
38+ throw Error (" EDEADLK returned by pthread_mutex_lock()" );
39+ default :
40+ throw Error (" UNKNOWN returned by pthread_mutex_lock()" );
41+ }
42+ }
43+
44+ ~ScopedMutex ()
45+ {
46+ int ret = pthread_mutex_unlock (_mutex);
47+ switch (ret) {
48+ case 0 :
49+ break ;
50+ case EPERM:
51+ default :
52+ /*
53+ * No choose but die here. Destructor shall be a nofail function.
54+ */
55+ std::cerr << ret << " returned by pthread_mutex_unlock()" << std::endl;
56+
57+ std::terminate ();
58+ }
59+ }
60+ private:
61+ pthread_mutex_t * const _mutex;
62+ };
63+
2464
2565ThreadPool::ThreadPool (unsigned int num_thread)
2666:_thread_pool(num_thread)
@@ -67,36 +107,36 @@ void ThreadPool::destroy_pool(int maxPollSecs = 2)
67107
68108bool ThreadPool::assign_work (WorkerThread *workerThread)
69109{
70- pthread_mutex_lock (&_mutex_work_completion);
71- _incomplete_work++ ;
72- // cout << "assign_work...incomapleteWork=" << _incomplete_work << endl ;
73- pthread_mutex_unlock (&_mutex_work_completion);
110+ {
111+ ScopedMutex mutex (&_mutex_work_completion) ;
112+ _incomplete_work++ ;
113+ }
74114
75115 sem_wait (&_available_thread);
76116
77- pthread_mutex_lock (&_mutex_sync);
78- // workerVec[_top_index] = workerThread;
79- _worker_queue[_top_index] = workerThread;
80- // cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << _top_index << "]" << endl;
81- if (_queue_size !=1 )
82- _top_index = (_top_index+1 ) % (_queue_size-1 );
83- sem_post (&_available_work);
84- pthread_mutex_unlock (&_mutex_sync);
117+ {
118+ ScopedMutex mutex (&_mutex_sync);
119+ _worker_queue[_top_index] = workerThread;
120+ if (_queue_size !=1 )
121+ _top_index = (_top_index+1 ) % (_queue_size-1 );
122+ sem_post (&_available_work);
123+ }
85124 return true ;
86125}
87126
88127bool ThreadPool::fetch_work (WorkerThread **workerArg)
89128{
90129 sem_wait (&_available_work);
91130
92- pthread_mutex_lock (&_mutex_sync);
93- WorkerThread * workerThread = _worker_queue[_bottom_index];
94- _worker_queue[_bottom_index] = NULL ;
95- *workerArg = workerThread;
96- if (_queue_size !=1 )
97- _bottom_index = (_bottom_index+1 ) % (_queue_size-1 );
98- sem_post (&_available_thread);
99- pthread_mutex_unlock (&_mutex_sync);
131+ {
132+ ScopedMutex mutex (&_mutex_sync);
133+ WorkerThread * workerThread = _worker_queue[_bottom_index];
134+ _worker_queue[_bottom_index] = NULL ;
135+ *workerArg = workerThread;
136+ if (_queue_size !=1 )
137+ _bottom_index = (_bottom_index+1 ) % (_queue_size-1 );
138+ sem_post (&_available_thread);
139+ }
100140 return true ;
101141}
102142
@@ -114,10 +154,11 @@ void *ThreadPool::thread_execute(void *param)
114154 worker = NULL ;
115155 }
116156
117- pthread_mutex_lock ( &(((ThreadPool *)param)->_mutex_work_completion ) );
118- // cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
119- ((ThreadPool *)param)->_incomplete_work --;
120- pthread_mutex_unlock ( &(((ThreadPool *)param)->_mutex_work_completion ) );
157+ {
158+ ThreadPool * pool = static_cast <ThreadPool*>(param);
159+ ScopedMutex mutex (&pool->_mutex_work_completion );
160+ --pool->_incomplete_work ;
161+ }
121162 }
122163 return 0 ;
123164}
0 commit comments