-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_mutithread.cpp
More file actions
117 lines (87 loc) · 3.26 KB
/
Copy pathtest_mutithread.cpp
File metadata and controls
117 lines (87 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "../src/util.hpp"
#include "../src/typedefs.h"
#include <cstring> // memcpy
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string>
#include <vector>
#include <map>
#include <curl/curl.h> // -lcurl
#include <pthread.h>
using namespace std;
pthread_mutex_t mylock;
static size_t write_data(void *ptr, size_t size, size_t nmemb, void *info)
{
//pthread_mutex_lock(&mylock);
// cout << pthread_self() << " => " << "size is : " << size << " nmemb is :" << nmemb << endl;
ThreadData* thread_data= static_cast<ThreadData*>(info);
Task* task = thread_data->mp_task;
FILE* p_data_file = fopen(task->m_data_file_path.c_str(),"w+");
FILE* p_config_file = fopen(task->m_config_file_path.c_str(),"r+");
//TODO: save range line number in void*info
// set fseek to config_file[line].first
// and update config_file[line].first when recv data from ptr
// fseek
fseek(p_data_file,task->m_range.first,SEEK_SET);
size_t written = fwrite(ptr, size, nmemb, p_data_file);
cout << written << " " << task->m_range.second-task->m_range.first << endl;
fclose(p_data_file);
fclose(p_config_file);
//pthread_mutex_unlock(&mylock);
return written;
}
void* get_part_of_file(void* data){
Task* task = static_cast<Task*>(data);
#if DEBUG
cout << "threadid : " << pthread_self()%1000<<" range: " ;
cout << task->m_range.first << " - " << task->m_range.second << " " << endl;
#endif
CURL* curl = curl_easy_init();
curl_slist* headers = NULL;
if(curl){
// set url
curl_easy_setopt(curl, CURLOPT_URL, task->m_url.c_str());
// set headers
headers = curl_slist_append(headers, "Accept: image/webp,*/*;q=0.8");
// set range to be download by this thread
headers = curl_slist_append(headers, Util::GenRangeHeaderStr(task->m_range).c_str());
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, task);
CURLcode res = curl_easy_perform(curl);
if(res!=CURLE_OK){
cout << "error" << endl;
}
curl_easy_cleanup(curl);
curl_slist_free_all(headers); /* free the list again */
}
}
const int POOL_SIZE = 10;
int main(int argc,char** argv){
if(argc<2){
cout << "usage : a.out url" << endl;
return 0;
}
char* url = argv[1];
pthread_mutex_init(&mylock,NULL);
// generate data_file and fill config_file first;
Util::InitFileDownload(url,POOL_SIZE);
vector<Task> tasks = Util::GenerateTasks(url,POOL_SIZE);
/* Must initialize libcurl before any threads are started */
curl_global_init(CURL_GLOBAL_ALL);
vector<pthread_t> pool(POOL_SIZE,0);
for(int i = 0; i<tasks.size();++i){
pthread_t tid;
//cout << tasks[i].m_range.first << " " << tasks[i].m_range.second << endl;
ThreadData tdata;
tdata.mp_task = &tasks[i];
tdata.m_range_id= i+3;
pthread_create(&tid,NULL,get_part_of_file,static_cast<void*>(&tdata));
pool.push_back(tid);
}
for(int i = 0; i<pool.size();++i){
pthread_join(pool[i],NULL); // write_data is called after pthread_join
}
pthread_mutex_destroy(&mylock);
}