Skip to content

Commit 6b86d7a

Browse files
committed
add: new array lock free queue
将无锁列队换成简单的无锁列队
1 parent 054867c commit 6b86d7a

7 files changed

Lines changed: 188 additions & 5 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#ifndef _ARRAYLOCKFREEQUEUE_H___
2+
#define _ARRAYLOCKFREEQUEUE_H___
3+
4+
#include <stdint.h>
5+
6+
#define ARRAY_LOCK_FREE_Q_DEFAULT_SIZE 65535 // 2^16
7+
8+
template <typename ELEM_T, uint32_t Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE>
9+
class ArrayLockFreeQueue
10+
{
11+
public:
12+
13+
ArrayLockFreeQueue();
14+
virtual ~ArrayLockFreeQueue();
15+
16+
uint32_t size();
17+
18+
bool enqueue(const ELEM_T &a_data);
19+
20+
bool dequeue(ELEM_T &a_data);
21+
22+
bool try_dequeue(ELEM_T &a_data);
23+
24+
private:
25+
26+
ELEM_T m_thequeue[Q_SIZE];
27+
28+
volatile uint32_t m_count;
29+
volatile uint32_t m_writeIndex;
30+
31+
volatile uint32_t m_readIndex;
32+
33+
volatile uint32_t m_maximumReadIndex;
34+
35+
inline uint32_t countToIndex(uint32_t a_count);
36+
};
37+
38+
#include "ArrayLockFreeQueueImp.h"
39+
40+
#endif
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#ifndef _ARRAYLOCKFREEQUEUEIMP_H___
2+
#define _ARRAYLOCKFREEQUEUEIMP_H___
3+
#include "ArrayLockFreeQueue.h"
4+
5+
#include <assert.h>
6+
#include "atom_opt.h"
7+
8+
template <typename ELEM_T, uint32_t Q_SIZE>
9+
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::ArrayLockFreeQueue() :
10+
m_writeIndex(0),
11+
m_readIndex(0),
12+
m_maximumReadIndex(0)
13+
{
14+
15+
m_count = 0;
16+
17+
}
18+
19+
template <typename ELEM_T, uint32_t Q_SIZE>
20+
ArrayLockFreeQueue<ELEM_T, Q_SIZE>::~ArrayLockFreeQueue()
21+
{
22+
23+
}
24+
25+
template <typename ELEM_T, uint32_t Q_SIZE>
26+
inline uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
27+
{
28+
return (a_count % Q_SIZE);
29+
}
30+
31+
template <typename ELEM_T, uint32_t Q_SIZE>
32+
uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::size()
33+
{
34+
uint32_t currentWriteIndex = m_writeIndex;
35+
uint32_t currentReadIndex = m_readIndex;
36+
37+
if(currentWriteIndex>=currentReadIndex)
38+
return currentWriteIndex - currentReadIndex;
39+
else
40+
return Q_SIZE + currentWriteIndex - currentReadIndex;
41+
42+
}
43+
44+
template <typename ELEM_T, uint32_t Q_SIZE>
45+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data)
46+
{
47+
uint32_t currentWriteIndex;
48+
uint32_t currentReadIndex;
49+
do
50+
{
51+
currentWriteIndex = m_writeIndex;
52+
currentReadIndex = m_readIndex;
53+
if(countToIndex(currentWriteIndex + 1) ==
54+
countToIndex(currentReadIndex))
55+
{
56+
return false;
57+
}
58+
} while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1)));
59+
60+
m_thequeue[countToIndex(currentWriteIndex)] = a_data;
61+
62+
while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
63+
{
64+
sched_yield();
65+
}
66+
67+
AtomicAdd(&m_count, 1);
68+
69+
return true;
70+
71+
}
72+
73+
template <typename ELEM_T, uint32_t Q_SIZE>
74+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::try_dequeue(ELEM_T &a_data)
75+
{
76+
return dequeue(a_data);
77+
}
78+
79+
template <typename ELEM_T, uint32_t Q_SIZE>
80+
bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data)
81+
{
82+
uint32_t currentMaximumReadIndex;
83+
uint32_t currentReadIndex;
84+
85+
do
86+
{
87+
currentReadIndex = m_readIndex;
88+
currentMaximumReadIndex = m_maximumReadIndex;
89+
90+
if(countToIndex(currentReadIndex) ==
91+
countToIndex(currentMaximumReadIndex))
92+
{
93+
return false;
94+
}
95+
96+
a_data = m_thequeue[countToIndex(currentReadIndex)];
97+
98+
if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
99+
{
100+
AtomicSub(&m_count, 1);
101+
return true;
102+
}
103+
} while(true);
104+
105+
assert(0);
106+
107+
return false;
108+
109+
}
110+
111+
#endif

QuantBox_Queue/MsgQueue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#include "stdafx.h"
22
#include "MsgQueue.h"
33

4-
CMsgQueue::CMsgQueue():m_queue(1024)
4+
//CMsgQueue::CMsgQueue():m_queue(1024)
5+
CMsgQueue::CMsgQueue()
56
{
67
m_hThread = nullptr;
78
m_bRunning = false;

QuantBox_Queue/MsgQueue.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
#include "../include/ApiStruct.h"
1313

1414
//#include "readerwriterqueue.h"
15-
#include "concurrentqueue.h"
15+
//#include "concurrentqueue.h"
16+
#include "ArrayLockFreeQueue.h"
1617

1718
using namespace std;
18-
using namespace moodycamel;
19+
//using namespace moodycamel;
1920

2021
#pragma warning(push)
2122
#pragma warning(disable:4251)
@@ -220,7 +221,8 @@ class DLL_PUBLIC CMsgQueue
220221
thread* m_hThread;
221222

222223
private:
223-
ConcurrentQueue<ResponeItem*> m_queue;
224+
// ConcurrentQueue<ResponeItem*> m_queue;
225+
ArrayLockFreeQueue<ResponeItem*, 2048> m_queue;
224226
fnOnRespone m_fnOnRespone;
225227
void* m_pClass;
226228
};

QuantBox_Queue/QuantBox_Queue.vcxproj

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
<PropertyGroup Label="UserMacros" />
9595
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
9696
<LinkIncremental>true</LinkIncremental>
97-
<OutDir>C:\Program Files\SmartQuant Ltd\OpenQuant 2014\XAPI\CTP\x86</OutDir>
97+
<OutDir>..\bin</OutDir>
9898
<TargetName>QuantBox_Queue_x86</TargetName>
9999
</PropertyGroup>
100100
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
@@ -226,6 +226,9 @@
226226
<ClInclude Include="..\include\QueueEnum.h" />
227227
<ClInclude Include="..\include\QueueHeader.h" />
228228
<ClInclude Include="..\include\QueueStruct.h" />
229+
<ClInclude Include="ArrayLockFreeQueue.h" />
230+
<ClInclude Include="ArrayLockFreeQueueImp.h" />
231+
<ClInclude Include="atom_opt.h" />
229232
<ClInclude Include="MsgQueue.h" />
230233
<ClInclude Include="RemoteQueue.h" />
231234
<ClInclude Include="stdafx.h" />

QuantBox_Queue/QuantBox_Queue.vcxproj.filters

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@
4848
<ClInclude Include="RemoteQueue.h">
4949
<Filter>Header Files</Filter>
5050
</ClInclude>
51+
<ClInclude Include="ArrayLockFreeQueue.h">
52+
<Filter>Header Files</Filter>
53+
</ClInclude>
54+
<ClInclude Include="ArrayLockFreeQueueImp.h">
55+
<Filter>Header Files</Filter>
56+
</ClInclude>
57+
<ClInclude Include="atom_opt.h">
58+
<Filter>Header Files</Filter>
59+
</ClInclude>
5160
</ItemGroup>
5261
<ItemGroup>
5362
<ClCompile Include="stdafx.cpp">

QuantBox_Queue/atom_opt.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef _ATOM_OPT_H___
2+
#define _ATOM_OPT_H___
3+
4+
#ifdef __GNUC__
5+
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
6+
#define AtomicAdd(a_ptr,a_count) __sync_fetch_and_add (a_ptr, a_count)
7+
#define AtomicSub(a_ptr,a_count) __sync_fetch_and_sub (a_ptr, a_count)
8+
#include <sched.h> // sched_yield()
9+
#else
10+
#include <Windows.h>
11+
#define CAS(a_ptr, a_oldVal, a_newVal) (a_oldVal == InterlockedCompareExchange(a_ptr, a_newVal, a_oldVal))
12+
#define sched_yield() SwitchToThread()
13+
#define AtomicAdd(a_ptr, num) InterlockedIncrement(a_ptr)
14+
#define AtomicSub(a_ptr, num) InterlockedDecrement(a_ptr)
15+
#endif
16+
17+
#endif

0 commit comments

Comments
 (0)