Skip to content

Commit 3a4169b

Browse files
committed
Implement ScopedMutex for pthread_mutex_t
1 parent 2c549ad commit 3a4169b

1 file changed

Lines changed: 65 additions & 24 deletions

File tree

threadpool.cpp

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,51 @@
1616
along with this program. If not, see <http://www.gnu.org/licenses/>.
1717
*/
1818
#include "threadpool.h"
19+
#include <iostream>
1920
#include <stdlib.h>
2021
#include <errno.h>
2122

2223
using namespace std;
2324

25+
class ScopedMutex {
26+
public:
27+
explicit ScopedMutex(pthread_mutex_t * const mutex):_mutex(mutex)
28+
{
29+
int ret = pthread_mutex_lock(_mutex);
30+
switch (ret) {
31+
case 0:
32+
break;
33+
case EINVAL:
34+
throw Error("EINVAL returned by pthread_mutex_lock()");
35+
case EAGAIN:
36+
throw Error("EAGAIN returned by pthread_mutex_lock()");
37+
case EDEADLK:
38+
throw Error("EDEADLK returned by pthread_mutex_lock()");
39+
default:
40+
throw Error("UNKNOWN returned by pthread_mutex_lock()");
41+
}
42+
}
43+
44+
~ScopedMutex()
45+
{
46+
int ret = pthread_mutex_unlock(_mutex);
47+
switch (ret) {
48+
case 0:
49+
break;
50+
case EPERM:
51+
default:
52+
/*
53+
* No choose but die here. Destructor shall be a nofail function.
54+
*/
55+
std::cerr << ret << " returned by pthread_mutex_unlock()" << std::endl;
56+
57+
std::terminate();
58+
}
59+
}
60+
private:
61+
pthread_mutex_t * const _mutex;
62+
};
63+
2464

2565
ThreadPool::ThreadPool(unsigned int num_thread)
2666
:_thread_pool(num_thread)
@@ -67,36 +107,36 @@ void ThreadPool::destroy_pool(int maxPollSecs = 2)
67107

68108
bool ThreadPool::assign_work(WorkerThread *workerThread)
69109
{
70-
pthread_mutex_lock(&_mutex_work_completion);
71-
_incomplete_work++;
72-
//cout << "assign_work...incomapleteWork=" << _incomplete_work << endl;
73-
pthread_mutex_unlock(&_mutex_work_completion);
110+
{
111+
ScopedMutex mutex(&_mutex_work_completion);
112+
_incomplete_work++;
113+
}
74114

75115
sem_wait(&_available_thread);
76116

77-
pthread_mutex_lock(&_mutex_sync);
78-
//workerVec[_top_index] = workerThread;
79-
_worker_queue[_top_index] = workerThread;
80-
//cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << _top_index << "]" << endl;
81-
if(_queue_size !=1 )
82-
_top_index = (_top_index+1) % (_queue_size-1);
83-
sem_post(&_available_work);
84-
pthread_mutex_unlock(&_mutex_sync);
117+
{
118+
ScopedMutex mutex(&_mutex_sync);
119+
_worker_queue[_top_index] = workerThread;
120+
if(_queue_size !=1 )
121+
_top_index = (_top_index+1) % (_queue_size-1);
122+
sem_post(&_available_work);
123+
}
85124
return true;
86125
}
87126

88127
bool ThreadPool::fetch_work(WorkerThread **workerArg)
89128
{
90129
sem_wait(&_available_work);
91130

92-
pthread_mutex_lock(&_mutex_sync);
93-
WorkerThread * workerThread = _worker_queue[_bottom_index];
94-
_worker_queue[_bottom_index] = NULL;
95-
*workerArg = workerThread;
96-
if(_queue_size !=1 )
97-
_bottom_index = (_bottom_index+1) % (_queue_size-1);
98-
sem_post(&_available_thread);
99-
pthread_mutex_unlock(&_mutex_sync);
131+
{
132+
ScopedMutex mutex(&_mutex_sync);
133+
WorkerThread * workerThread = _worker_queue[_bottom_index];
134+
_worker_queue[_bottom_index] = NULL;
135+
*workerArg = workerThread;
136+
if(_queue_size !=1 )
137+
_bottom_index = (_bottom_index+1) % (_queue_size-1);
138+
sem_post(&_available_thread);
139+
}
100140
return true;
101141
}
102142

@@ -114,10 +154,11 @@ void *ThreadPool::thread_execute(void *param)
114154
worker = NULL;
115155
}
116156

117-
pthread_mutex_lock( &(((ThreadPool *)param)->_mutex_work_completion) );
118-
//cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
119-
((ThreadPool *)param)->_incomplete_work--;
120-
pthread_mutex_unlock( &(((ThreadPool *)param)->_mutex_work_completion) );
157+
{
158+
ThreadPool * pool = static_cast<ThreadPool*>(param);
159+
ScopedMutex mutex(&pool->_mutex_work_completion);
160+
--pool->_incomplete_work;
161+
}
121162
}
122163
return 0;
123164
}

0 commit comments

Comments
 (0)