Skip to content

Commit 490c7e3

Browse files
committed
Change implementation of fetch_work() and assign_work()
- There is no work lost bug now.
1 parent 3a4169b commit 490c7e3

2 files changed

Lines changed: 76 additions & 50 deletions

File tree

threadpool.cpp

Lines changed: 66 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ ThreadPool::ThreadPool(unsigned int num_thread)
7171
_top_index = 0;
7272
_bottom_index = 0;
7373
_incomplete_work = 0;
74-
sem_init(&_available_work, 0, 0);
74+
init_sem(&_available_work);
7575
sem_init(&_available_thread, 0, _queue_size);
7676

77+
init_mutex(&_work_mutex);
7778
init_mutex(&_mutex_sync);
7879
init_mutex(&_mutex_work_completion);
7980

@@ -105,60 +106,29 @@ void ThreadPool::destroy_pool(int maxPollSecs = 2)
105106
}
106107

107108

108-
bool ThreadPool::assign_work(WorkerThread *workerThread)
109+
void ThreadPool::assign_work(WorkerThread *workerThread)
109110
{
110-
{
111-
ScopedMutex mutex(&_mutex_work_completion);
112-
_incomplete_work++;
113-
}
114-
115-
sem_wait(&_available_thread);
116-
117-
{
118-
ScopedMutex mutex(&_mutex_sync);
119-
_worker_queue[_top_index] = workerThread;
120-
if(_queue_size !=1 )
121-
_top_index = (_top_index+1) % (_queue_size-1);
122-
sem_post(&_available_work);
123-
}
124-
return true;
111+
ScopedMutex mutex(&_work_mutex);
112+
_work.push_back(workerThread);
113+
post_sem(&_available_work);
125114
}
126115

127-
bool ThreadPool::fetch_work(WorkerThread **workerArg)
116+
WorkerThread * ThreadPool::fetch_work()
128117
{
129-
sem_wait(&_available_work);
130-
131-
{
132-
ScopedMutex mutex(&_mutex_sync);
133-
WorkerThread * workerThread = _worker_queue[_bottom_index];
134-
_worker_queue[_bottom_index] = NULL;
135-
*workerArg = workerThread;
136-
if(_queue_size !=1 )
137-
_bottom_index = (_bottom_index+1) % (_queue_size-1);
138-
sem_post(&_available_thread);
139-
}
140-
return true;
118+
wait_sem(&_available_work);
119+
ScopedMutex mutex(&_work_mutex);
120+
WorkerThread* work = _work.front();
121+
_work.pop_front();
122+
return work;
141123
}
142124

143-
void *ThreadPool::thread_execute(void *param)
125+
void * ThreadPool::thread_execute(void *param)
144126
{
145-
WorkerThread *worker = NULL;
127+
ThreadPool *pool = static_cast<ThreadPool*>(param);
146128

147-
while(((ThreadPool *)param)->fetch_work(&worker))
148-
{
149-
if(worker)
150-
{
151-
worker->executeThis();
152-
//cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
153-
delete worker;
154-
worker = NULL;
155-
}
156-
157-
{
158-
ThreadPool * pool = static_cast<ThreadPool*>(param);
159-
ScopedMutex mutex(&pool->_mutex_work_completion);
160-
--pool->_incomplete_work;
161-
}
129+
for (;;) {
130+
WorkerThread* worker = pool->fetch_work();
131+
worker->executeThis();
162132
}
163133
return 0;
164134
}
@@ -184,6 +154,55 @@ void ThreadPool::init_mutex(pthread_mutex_t* const mutex)
184154
};
185155
}
186156

157+
void ThreadPool::init_sem(sem_t* const sem)
158+
{
159+
int ret = sem_init(sem, 0, 0);
160+
if (0 != ret) {
161+
switch (errno) {
162+
case EINVAL:
163+
throw Error("EINVAL returned by sem_init()");
164+
case ENOSYS:
165+
throw Error("ENOSYS returned by sem_init()");
166+
default:
167+
throw Error("UNKNOWN returned by sem_init()");
168+
}
169+
}
170+
}
171+
172+
void ThreadPool::post_sem(sem_t* const sem)
173+
{
174+
int ret = sem_post(sem);
175+
if (0 != ret) {
176+
switch (errno) {
177+
case EINVAL:
178+
throw Error("EINVAL returned by sem_post()");
179+
case EOVERFLOW:
180+
throw Error("EOVERFLOW returned by sem_post()");
181+
default:
182+
throw Error("UNKNOWN returned by sem_post()");
183+
}
184+
}
185+
}
186+
187+
void ThreadPool::wait_sem(sem_t* const sem)
188+
{
189+
for (;;) {
190+
int ret = sem_wait(sem);
191+
if (0 == ret) {
192+
return;
193+
}
194+
195+
switch (errno) {
196+
case EINTR:
197+
break;
198+
case EINVAL:
199+
throw Error("EINVAL returned by sem_wait()");
200+
default:
201+
throw Error("UNKNOWN returned by sem_wait()");
202+
}
203+
}
204+
}
205+
187206
Error::Error(const char * what)
188207
:_what(what)
189208
{

threadpool.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <semaphore.h>
2323
#include <iostream>
2424
#include <vector>
25+
#include <list>
2526

2627
class Error: public std::exception {
2728
public:
@@ -59,22 +60,28 @@ class ThreadPool{
5960

6061
void destroy_pool(int maxPollSecs);
6162

62-
bool assign_work(WorkerThread *worker);
63-
bool fetch_work(WorkerThread **worker);
63+
void assign_work(WorkerThread *worker);
64+
WorkerThread* fetch_work();
6465

6566
static void *thread_execute(void *param);
6667

6768

6869

6970
private:
7071
static void init_mutex(pthread_mutex_t* mutex);
72+
static void init_sem(sem_t* sem);
73+
static void post_sem(sem_t* sem);
74+
static void wait_sem(sem_t* sem);
7175
std::vector<pthread_t> _thread_pool;
76+
std::list<WorkerThread *> _work;
77+
pthread_mutex_t _work_mutex;
78+
sem_t _available_work;
79+
7280
unsigned int _num_thread;
7381

7482
pthread_mutex_t _mutex_sync;
7583
pthread_mutex_t _mutex_work_completion;
7684

77-
sem_t _available_work;
7885
sem_t _available_thread;
7986

8087
std::vector<WorkerThread *> _worker_queue;

0 commit comments

Comments
 (0)