Skip to content

Commit ac74d07

Browse files
author
shobhitgupta12
committed
Revised and correct all previous bugs.
-Shobhit git-svn-id: http://cppthreadpool.googlecode.com/svn/trunk@2 7843d62f-7e47-0410-842a-0f0505ad6057
1 parent 90be90d commit ac74d07

3 files changed

Lines changed: 313 additions & 0 deletions

File tree

main.cpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Thread Pool implementation for unix / linux environments
3+
Copyright (C) 2008 Shobhit Gupta
4+
5+
This program is free software: you can redistribute it and/or modify
6+
it under the terms of the GNU General Public License as published by
7+
the Free Software Foundation, either version 3 of the License, or
8+
(at your option) any later version.
9+
10+
This program is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
19+
#include <iostream>
20+
#include "threadpool.h"
21+
22+
using namespace std;
23+
24+
25+
#define ITERATIONS 200
26+
27+
class SampleWorkerThread : public WorkerThread
28+
{
29+
public:
30+
int id;
31+
32+
unsigned virtual executeThis()
33+
{
34+
// Instead of sleep() we could do anytime consuming work here.
35+
//Using ThreadPools is advantageous only when the work to be done is really time consuming. (atleast 1 or 2 seconds)
36+
sleep(2);
37+
38+
return(0);
39+
}
40+
41+
42+
SampleWorkerThread(int id) : WorkerThread(id), id(id)
43+
{
44+
// cout << "Creating SampleWorkerThread " << id << "\t address=" << this << endl;
45+
}
46+
47+
~SampleWorkerThread()
48+
{
49+
// cout << "Deleting SampleWorkerThread " << id << "\t address=" << this << endl;
50+
}
51+
};
52+
53+
54+
int main(int argc, char **argv)
55+
{
56+
//ThreadPool(N);
57+
//Create a Threadpool with N number of threads
58+
ThreadPool* myPool = new ThreadPool(25);
59+
myPool->initializeThreads();
60+
61+
//We will count time elapsed after initializeThreads()
62+
time_t t1=time(NULL);
63+
64+
//Lets start bullying ThreadPool with tonnes of work !!!
65+
for(unsigned int i=0;i<ITERATIONS;i++){
66+
SampleWorkerThread* myThread = new SampleWorkerThread(i);
67+
//cout << "myThread[" << myThread->id << "] = [" << myThread << "]" << endl;
68+
myPool->assignWork(myThread);
69+
}
70+
71+
// destroyPool(int maxPollSecs)
72+
// Before actually destroying the ThreadPool, this function checks if all the pending work is completed.
73+
// If the work is still not done, then it will check again after maxPollSecs
74+
// The default value for maxPollSecs is 2 seconds.
75+
// And ofcourse the user is supposed to adjust it for his needs.
76+
77+
myPool->destroyPool(2);
78+
79+
time_t t2=time(NULL);
80+
cout << t2-t1 << " seconds elapsed\n" << endl;
81+
delete myPool;
82+
83+
return 0;
84+
}

threadpool.cpp

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
Thread Pool implementation for unix / linux environments
3+
Copyright (C) 2008 Shobhit Gupta
4+
5+
This program is free software: you can redistribute it and/or modify
6+
it under the terms of the GNU General Public License as published by
7+
the Free Software Foundation, either version 3 of the License, or
8+
(at your option) any later version.
9+
10+
This program is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
19+
#include <stdlib.h>
20+
#include "threadpool.h"
21+
22+
using namespace std;
23+
24+
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
25+
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
26+
27+
28+
29+
ThreadPool::ThreadPool()
30+
{
31+
ThreadPool(2);
32+
}
33+
34+
ThreadPool::ThreadPool(int maxThreads)
35+
{
36+
if (maxThreads < 1) maxThreads=1;
37+
38+
//mutexSync = PTHREAD_MUTEX_INITIALIZER;
39+
//mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
40+
41+
pthread_mutex_lock(&mutexSync);
42+
this->maxThreads = maxThreads;
43+
this->queueSize = maxThreads;
44+
//workerQueue = new WorkerThread *[maxThreads];
45+
workerQueue.resize(maxThreads, NULL);
46+
topIndex = 0;
47+
bottomIndex = 0;
48+
incompleteWork = 0;
49+
sem_init(&availableWork, 0, 0);
50+
sem_init(&availableThreads, 0, queueSize);
51+
pthread_mutex_unlock(&mutexSync);
52+
}
53+
54+
void ThreadPool::initializeThreads()
55+
{
56+
for(int i = 0; i<maxThreads; ++i)
57+
{
58+
pthread_t tempThread;
59+
pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this );
60+
//threadIdVec[i] = tempThread;
61+
}
62+
63+
}
64+
65+
ThreadPool::~ThreadPool()
66+
{
67+
workerQueue.clear();
68+
}
69+
70+
71+
72+
void ThreadPool::destroyPool(int maxPollSecs = 2)
73+
{
74+
while( incompleteWork>0 )
75+
{
76+
//cout << "Work is still incomplete=" << incompleteWork << endl;
77+
sleep(maxPollSecs);
78+
}
79+
cout << "All Done!! Wow! That was a lot of work!" << endl;
80+
sem_destroy(&availableWork);
81+
sem_destroy(&availableThreads);
82+
pthread_mutex_destroy(&mutexSync);
83+
pthread_mutex_destroy(&mutexWorkCompletion);
84+
85+
}
86+
87+
88+
bool ThreadPool::assignWork(WorkerThread *workerThread)
89+
{
90+
pthread_mutex_lock(&mutexWorkCompletion);
91+
incompleteWork++;
92+
//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
93+
pthread_mutex_unlock(&mutexWorkCompletion);
94+
95+
sem_wait(&availableThreads);
96+
97+
pthread_mutex_lock(&mutexSync);
98+
//workerVec[topIndex] = workerThread;
99+
workerQueue[topIndex] = workerThread;
100+
//cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
101+
if(queueSize !=1 )
102+
topIndex = (topIndex+1) % (queueSize-1);
103+
sem_post(&availableWork);
104+
pthread_mutex_unlock(&mutexSync);
105+
return true;
106+
}
107+
108+
bool ThreadPool::fetchWork(WorkerThread **workerArg)
109+
{
110+
sem_wait(&availableWork);
111+
112+
pthread_mutex_lock(&mutexSync);
113+
WorkerThread * workerThread = workerQueue[bottomIndex];
114+
workerQueue[bottomIndex] = NULL;
115+
*workerArg = workerThread;
116+
if(queueSize !=1 )
117+
bottomIndex = (bottomIndex+1) % (queueSize-1);
118+
sem_post(&availableThreads);
119+
pthread_mutex_unlock(&mutexSync);
120+
return true;
121+
}
122+
123+
void *ThreadPool::threadExecute(void *param)
124+
{
125+
WorkerThread *worker = NULL;
126+
127+
while(((ThreadPool *)param)->fetchWork(&worker))
128+
{
129+
if(worker)
130+
{
131+
worker->executeThis();
132+
//cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
133+
delete worker;
134+
worker = NULL;
135+
}
136+
137+
pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
138+
//cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
139+
((ThreadPool *)param)->incompleteWork--;
140+
pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
141+
}
142+
return 0;
143+
}

threadpool.h

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Thread Pool implementation for unix / linux environments
3+
Copyright (C) 2008 Shobhit Gupta
4+
5+
This program is free software: you can redistribute it and/or modify
6+
it under the terms of the GNU General Public License as published by
7+
the Free Software Foundation, either version 3 of the License, or
8+
(at your option) any later version.
9+
10+
This program is distributed in the hope that it will be useful,
11+
but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
GNU General Public License for more details.
14+
15+
You should have received a copy of the GNU General Public License
16+
along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
18+
19+
#include <pthread.h>
20+
#include <semaphore.h>
21+
#include <iostream>
22+
#include <vector>
23+
24+
using namespace std;
25+
/*
26+
WorkerThread class
27+
This class needs to be sobclassed by the user.
28+
*/
29+
class WorkerThread{
30+
public:
31+
int id;
32+
33+
unsigned virtual executeThis()
34+
{
35+
return 0;
36+
}
37+
38+
WorkerThread(int id) : id(id) {}
39+
virtual ~WorkerThread(){}
40+
};
41+
42+
/*
43+
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and ynchronizations between all threads.
44+
*/
45+
class ThreadPool{
46+
public:
47+
ThreadPool();
48+
ThreadPool(int maxThreadsTemp);
49+
virtual ~ThreadPool();
50+
51+
void destroyPool(int maxPollSecs);
52+
53+
bool assignWork(WorkerThread *worker);
54+
bool fetchWork(WorkerThread **worker);
55+
56+
void initializeThreads();
57+
58+
static void *threadExecute(void *param);
59+
60+
static pthread_mutex_t mutexSync;
61+
static pthread_mutex_t mutexWorkCompletion;
62+
63+
64+
private:
65+
int maxThreads;
66+
67+
pthread_cond_t condCrit;
68+
sem_t availableWork;
69+
sem_t availableThreads;
70+
71+
//WorkerThread ** workerQueue;
72+
vector<WorkerThread *> workerQueue;
73+
74+
int topIndex;
75+
int bottomIndex;
76+
77+
int incompleteWork;
78+
79+
80+
int queueSize;
81+
82+
};
83+
84+
85+
86+

0 commit comments

Comments
 (0)