@@ -71,9 +71,10 @@ ThreadPool::ThreadPool(unsigned int num_thread)
7171 _top_index = 0 ;
7272 _bottom_index = 0 ;
7373 _incomplete_work = 0 ;
74- sem_init (&_available_work, 0 , 0 );
74+ init_sem (&_available_work);
7575 sem_init (&_available_thread, 0 , _queue_size);
7676
77+ init_mutex (&_work_mutex);
7778 init_mutex (&_mutex_sync);
7879 init_mutex (&_mutex_work_completion);
7980
@@ -105,60 +106,29 @@ void ThreadPool::destroy_pool(int maxPollSecs = 2)
105106}
106107
107108
108- bool ThreadPool::assign_work (WorkerThread *workerThread)
109+ void ThreadPool::assign_work (WorkerThread *workerThread)
109110{
110- {
111- ScopedMutex mutex (&_mutex_work_completion);
112- _incomplete_work++;
113- }
114-
115- sem_wait (&_available_thread);
116-
117- {
118- ScopedMutex mutex (&_mutex_sync);
119- _worker_queue[_top_index] = workerThread;
120- if (_queue_size !=1 )
121- _top_index = (_top_index+1 ) % (_queue_size-1 );
122- sem_post (&_available_work);
123- }
124- return true ;
111+ ScopedMutex mutex (&_work_mutex);
112+ _work.push_back (workerThread);
113+ post_sem (&_available_work);
125114}
126115
127- bool ThreadPool::fetch_work (WorkerThread **workerArg )
116+ WorkerThread * ThreadPool::fetch_work ()
128117{
129- sem_wait (&_available_work);
130-
131- {
132- ScopedMutex mutex (&_mutex_sync);
133- WorkerThread * workerThread = _worker_queue[_bottom_index];
134- _worker_queue[_bottom_index] = NULL ;
135- *workerArg = workerThread;
136- if (_queue_size !=1 )
137- _bottom_index = (_bottom_index+1 ) % (_queue_size-1 );
138- sem_post (&_available_thread);
139- }
140- return true ;
118+ wait_sem (&_available_work);
119+ ScopedMutex mutex (&_work_mutex);
120+ WorkerThread* work = _work.front ();
121+ _work.pop_front ();
122+ return work;
141123}
142124
143- void *ThreadPool::thread_execute (void *param)
125+ void * ThreadPool::thread_execute (void *param)
144126{
145- WorkerThread *worker = NULL ;
127+ ThreadPool *pool = static_cast <ThreadPool*>(param) ;
146128
147- while (((ThreadPool *)param)->fetch_work (&worker))
148- {
149- if (worker)
150- {
151- worker->executeThis ();
152- // cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
153- delete worker;
154- worker = NULL ;
155- }
156-
157- {
158- ThreadPool * pool = static_cast <ThreadPool*>(param);
159- ScopedMutex mutex (&pool->_mutex_work_completion );
160- --pool->_incomplete_work ;
161- }
129+ for (;;) {
130+ WorkerThread* worker = pool->fetch_work ();
131+ worker->executeThis ();
162132 }
163133 return 0 ;
164134}
@@ -184,6 +154,55 @@ void ThreadPool::init_mutex(pthread_mutex_t* const mutex)
184154 };
185155}
186156
157+ void ThreadPool::init_sem (sem_t * const sem)
158+ {
159+ int ret = sem_init (sem, 0 , 0 );
160+ if (0 != ret) {
161+ switch (errno) {
162+ case EINVAL:
163+ throw Error (" EINVAL returned by sem_init()" );
164+ case ENOSYS:
165+ throw Error (" ENOSYS returned by sem_init()" );
166+ default :
167+ throw Error (" UNKNOWN returned by sem_init()" );
168+ }
169+ }
170+ }
171+
172+ void ThreadPool::post_sem (sem_t * const sem)
173+ {
174+ int ret = sem_post (sem);
175+ if (0 != ret) {
176+ switch (errno) {
177+ case EINVAL:
178+ throw Error (" EINVAL returned by sem_post()" );
179+ case EOVERFLOW:
180+ throw Error (" EOVERFLOW returned by sem_post()" );
181+ default :
182+ throw Error (" UNKNOWN returned by sem_post()" );
183+ }
184+ }
185+ }
186+
187+ void ThreadPool::wait_sem (sem_t * const sem)
188+ {
189+ for (;;) {
190+ int ret = sem_wait (sem);
191+ if (0 == ret) {
192+ return ;
193+ }
194+
195+ switch (errno) {
196+ case EINTR:
197+ break ;
198+ case EINVAL:
199+ throw Error (" EINVAL returned by sem_wait()" );
200+ default :
201+ throw Error (" UNKNOWN returned by sem_wait()" );
202+ }
203+ }
204+ }
205+
187206Error::Error (const char * what)
188207:_what(what)
189208{
0 commit comments