Skip to content

Commit 7075307

Browse files
committed
Much more efficient use of threads to process requests
Fixed: correct termination for Windows
1 parent bffab87 commit 7075307

File tree

5 files changed

+107
-81
lines changed

5 files changed

+107
-81
lines changed

httpserver.pro.user

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!DOCTYPE QtCreatorProject>
3-
<!-- Written by QtCreator 3.2.1, 2014-11-13T02:24:14. -->
3+
<!-- Written by QtCreator 3.2.1, 2014-11-15T23:29:46. -->
44
<qtcreator>
55
<data>
66
<variable>EnvironmentId</variable>

httpserver/Server.cpp

Lines changed: 94 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ namespace HttpServer
318318

319319
auto it_mime = mimes_types.find(file_ext);
320320

321-
std::string file_mime_type = mimes_types.end() != it_mime ? it_mime->second : "application/octet-stream";
321+
std::string file_mime_type = mimes_types.cend() != it_mime ? it_mime->second : "application/octet-stream";
322322

323323
std::string headers("HTTP/1.1 200 OK\r\n");
324324
headers += "Content-Type: " + file_mime_type + "\r\n"
@@ -838,16 +838,57 @@ namespace HttpServer
838838
return app_exit_code;
839839
}
840840

841+
void Server::threadRequestCycle(std::queue<Socket> &sockets) const
842+
{
843+
while (true)
844+
{
845+
Socket clientSocket;
846+
847+
this->eventThreadCycle->wait();
848+
849+
if (false == process_flag)
850+
{
851+
break;
852+
}
853+
854+
this->sockets_queue_mtx.lock();
855+
856+
if (sockets.size() )
857+
{
858+
clientSocket = std::move(sockets.front() );
859+
sockets.pop();
860+
}
861+
862+
if (sockets.empty() )
863+
{
864+
this->eventThreadCycle->reset();
865+
866+
this->eventNotFullQueue->notify();
867+
}
868+
869+
this->sockets_queue_mtx.unlock();
870+
871+
if (clientSocket.is_open() )
872+
{
873+
++this->threads_working_count;
874+
875+
this->threadRequestProc(clientSocket);
876+
877+
--this->threads_working_count;
878+
}
879+
}
880+
}
881+
841882
/**
842883
* Цикл обработки очереди запросов
843884
*/
844885
int Server::cycleQueue(std::queue<Socket> &sockets)
845886
{
846-
auto it_option = settings.find("threads_max_count");
887+
auto it_option = this->settings.find("threads_max_count");
847888

848889
size_t threads_max_count = 0;
849890

850-
if (settings.cend() != it_option)
891+
if (this->settings.cend() != it_option)
851892
{
852893
threads_max_count = std::strtoull(it_option->second.c_str(), nullptr, 10);
853894
}
@@ -864,68 +905,40 @@ namespace HttpServer
864905
threads_max_count *= 2;
865906
}
866907

867-
std::function<int(Server *, Socket)> serverThreadRequestProc = std::mem_fn(&Server::threadRequestProc);
908+
this->threads_working_count = 0;
909+
this->eventThreadCycle = new Event(false, true);
910+
911+
std::function<void(Server *, std::queue<Socket> &)> serverThreadRequestCycle = std::mem_fn(&Server::threadRequestCycle);
868912

869913
std::vector<std::thread> active_threads;
870914
active_threads.reserve(threads_max_count);
871915

872916
// For update applications modules
873917
do
874918
{
875-
if (eventUpdateModule->notifed() )
919+
if (this->eventUpdateModule->notifed() )
876920
{
877921
updateModules();
878922
}
879923

880924
// Cycle creation threads applications requests
881925
do
882926
{
883-
if (threads_max_count <= active_threads.size() )
884-
{
885-
size_t i = 0;
886-
887-
while (false == System::isDoneThread(active_threads[i].native_handle() ) )
888-
{
889-
if (++i == active_threads.size() )
890-
{
891-
std::this_thread::yield();
892-
i = 0;
893-
}
894-
}
895-
}
896-
897-
for (size_t i = 0; i != active_threads.size();)
898-
{
899-
auto th = active_threads.begin() + i;
900-
901-
if (System::isDoneThread(th->native_handle() ) )
902-
{
903-
th->join();
904-
active_threads.erase(th);
905-
}
906-
else
907-
{
908-
++i;
909-
}
910-
}
911-
912-
while (active_threads.size() <= threads_max_count && sockets.empty() == false)
927+
while (this->threads_working_count == active_threads.size() && active_threads.size() < threads_max_count && sockets.empty() == false)
913928
{
914-
active_threads.emplace_back(serverThreadRequestProc, this, std::move(sockets.front() ) );
915-
sockets.pop();
929+
active_threads.emplace_back(serverThreadRequestCycle, this, std::ref(sockets) );
916930
}
917931

918-
if (false == eventNotFullQueue->notifed() )
919-
{
920-
eventNotFullQueue->notify();
921-
}
932+
this->eventThreadCycle->notify();
922933

923-
eventProcessQueue->wait();
934+
this->eventProcessQueue->wait();
924935
}
925-
while (process_flag);
936+
while (this->process_flag);
926937

927938
// Data clear
928939

940+
this->eventThreadCycle->notify();
941+
929942
if (false == active_threads.empty() )
930943
{
931944
// Join threads (wait completion)
@@ -937,9 +950,21 @@ namespace HttpServer
937950
active_threads.clear();
938951
}
939952

940-
eventNotFullQueue->notify();
953+
this->eventNotFullQueue->notify();
941954
}
942-
while (eventUpdateModule->notifed() );
955+
while (this->eventUpdateModule->notifed() );
956+
957+
if (false == this->server_sockets.empty() )
958+
{
959+
for (Socket &s : this->server_sockets)
960+
{
961+
s.close();
962+
}
963+
964+
this->server_sockets.clear();
965+
}
966+
967+
delete this->eventThreadCycle;
943968

944969
return 0;
945970
}
@@ -1678,13 +1703,11 @@ namespace HttpServer
16781703
// Applications settings list
16791704
std::unordered_set<ServerApplicationSettings *> applications;
16801705
// Get full applications settings list
1681-
apps_tree.collectApplicationSettings(applications);
1706+
this->apps_tree.collectApplicationSettings(applications);
16821707

16831708
// Bind ports set
16841709
std::unordered_set<int> ports;
16851710

1686-
std::vector<Socket> server_sockets;
1687-
16881711
// Open applications sockets
16891712
for (auto &app : applications)
16901713
{
@@ -1703,7 +1726,7 @@ namespace HttpServer
17031726
{
17041727
sock.nonblock(true);
17051728

1706-
server_sockets.emplace_back(std::move(sock) );
1729+
this->server_sockets.emplace_back(std::move(sock) );
17071730

17081731
ports.emplace(port);
17091732
}
@@ -1724,29 +1747,31 @@ namespace HttpServer
17241747
}
17251748
}
17261749

1727-
if (server_sockets.empty() )
1750+
if (this->server_sockets.empty() )
17281751
{
17291752
std::cout << "Error: do not open any socket;" << std::endl;
17301753
return 2;
17311754
}
17321755

1733-
sockets_list.create(server_sockets.size() );
1756+
SocketList sockets_list;
1757+
1758+
sockets_list.create(this->server_sockets.size() );
17341759

1735-
for (auto &sock : server_sockets)
1760+
for (auto &sock : this->server_sockets)
17361761
{
17371762
sockets_list.addSocket(sock);
17381763
}
17391764

17401765
std::cout << "Log: start server cycle;" << std::endl << std::endl;
17411766

17421767
const size_t queue_max_length = 1024;
1743-
eventNotFullQueue = new Event(true, true);
1744-
eventProcessQueue = new Event();
1745-
eventUpdateModule = new Event(false, true);
1768+
this->eventNotFullQueue = new Event(true, true);
1769+
this->eventProcessQueue = new Event();
1770+
this->eventUpdateModule = new Event(false, true);
17461771

17471772
std::queue<Socket> sockets;
17481773

1749-
process_flag = true;
1774+
this->process_flag = true;
17501775

17511776
std::function<int(Server *, std::queue<Socket> &)> serverCycleQueue = std::mem_fn(&Server::cycleQueue);
17521777

@@ -1759,45 +1784,39 @@ namespace HttpServer
17591784
{
17601785
if (sockets_list.accept(client_sockets) )
17611786
{
1787+
this->sockets_queue_mtx.lock();
1788+
17621789
for (Socket &sock : client_sockets)
17631790
{
17641791
if (sock.is_open() )
17651792
{
17661793
sock.nonblock(true);
17671794
sockets.emplace(std::move(sock) );
1795+
}
1796+
}
17681797

1769-
if (sockets.size() <= queue_max_length)
1770-
{
1771-
eventNotFullQueue->reset();
1772-
}
1798+
this->sockets_queue_mtx.unlock();
17731799

1774-
eventProcessQueue->notify();
1775-
}
1800+
this->eventProcessQueue->notify();
1801+
1802+
if (sockets.size() >= queue_max_length)
1803+
{
1804+
this->eventNotFullQueue->reset();
17761805
}
17771806

17781807
client_sockets.clear();
17791808

1780-
eventNotFullQueue->wait();
1809+
this->eventNotFullQueue->wait();
17811810
}
17821811
}
1783-
while (process_flag || eventUpdateModule->notifed() );
1812+
while (this->process_flag || this->eventUpdateModule->notifed() );
17841813

1785-
eventProcessQueue->notify();
1814+
this->eventProcessQueue->notify();
17861815

17871816
threadQueue.join();
17881817

17891818
sockets_list.destroy();
17901819

1791-
if (server_sockets.size() )
1792-
{
1793-
for (Socket &s : server_sockets)
1794-
{
1795-
s.close();
1796-
}
1797-
1798-
server_sockets.clear();
1799-
}
1800-
18011820
clear();
18021821

18031822
std::cout << "Log: final server cycle;" << std::endl;

httpserver/Server.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <string>
99
#include <unordered_map>
1010
#include <csignal>
11+
#include <atomic>
1112

1213
#include "SocketList.h"
1314
#include "DataVariantAbstract.h"
@@ -30,11 +31,15 @@ namespace HttpServer
3031

3132
ServerApplicationsTree apps_tree;
3233

33-
SocketList sockets_list;
34+
std::vector<Socket> server_sockets;
3435

3536
Event *eventNotFullQueue;
3637
Event *eventProcessQueue;
3738
Event *eventUpdateModule;
39+
Event *eventThreadCycle;
40+
41+
mutable std::atomic_size_t threads_working_count;
42+
mutable std::mutex sockets_queue_mtx;
3843

3944
// Флаг, означающий - активированы ли главные циклы сервера
4045
// (с помощью этого флага можно деактивировать циклы, чтобы завершить работу сервера)
@@ -44,7 +49,8 @@ namespace HttpServer
4449
protected:
4550
int cycleQueue(std::queue<Socket> &);
4651
void sendStatus(const Socket &, const std::chrono::milliseconds &, const size_t) const;
47-
int threadRequestProc(Socket) const;
52+
int threadRequestProc(Socket) const;
53+
void threadRequestCycle(std::queue<Socket> &) const;
4854
int transferFilePart(const Socket &, const std::chrono::milliseconds &, const std::string &, const time_t, const size_t, const std::string &, const std::string &, const std::string &, const bool) const;
4955
int transferFile(const Socket &, const std::chrono::milliseconds &, const std::string &, const std::unordered_map<std::string, std::string> &, const std::map<std::string, std::string> &, const std::string &, const bool) const;
5056
bool parseIncomingVars(std::unordered_multimap<std::string, std::string> &, const std::string &) const;

httpserver/SignalsHandles.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,10 @@ int bindSignalsHandles(HttpServer::Server *server)
115115

116116
HINSTANCE hInstance = GetModuleHandle(nullptr);
117117

118-
WNDCLASSEX wcex = {0};
118+
WNDCLASSEX wcex = {
119+
sizeof(WNDCLASSEX)
120+
};
119121

120-
wcex.cbSize = sizeof(WNDCLASSEX);
121122
wcex.lpfnWndProc = WndProc;
122123
wcex.hInstance = hInstance;
123124
wcex.lpszClassName = myWndClassName;

httpserver/SocketList.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ namespace HttpServer
138138
#elif POSIX
139139
size_t count = ::epoll_wait(obj_list, epoll_events.data(), epoll_events.size(), ~0);
140140

141-
if ( (size_t)~0 == count)
141+
if (std::numeric_limits<size_t>::max() == count)
142142
{
143143
return false;
144144
}

0 commit comments

Comments
 (0)