1515 */
1616package rx .internal .util ;
1717
18- import java .io . Serializable ;
19- import java . util . concurrent . atomic . AtomicReference ;
18+ import static java .util . concurrent . atomic . AtomicReferenceFieldUpdater . newUpdater ;
19+
2020import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
2121
22+ import rx .internal .util .MpscPaddedQueue .Node ;
23+
24+ abstract class MpscLinkedQueuePad0 <E > {
25+ long p00 , p01 , p02 , p03 , p04 , p05 , p06 , p07 ;
26+ long p30 , p31 , p32 , p33 , p34 , p35 , p36 , p37 ;
27+ }
28+
29+ abstract class MpscLinkedQueueHeadRef <E > extends MpscLinkedQueuePad0 <E > {
30+ @ SuppressWarnings ("rawtypes" )
31+ private static final AtomicReferenceFieldUpdater <MpscLinkedQueueHeadRef , Node > UPDATER =
32+ newUpdater (MpscLinkedQueueHeadRef .class , Node .class , "headRef" );
33+ private volatile Node <E > headRef ;
34+
35+ protected final Node <E > headRef () {
36+ return headRef ;
37+ }
38+ protected final void headRef (Node <E > val ) {
39+ headRef = val ;
40+ }
41+ protected final void lazySetHeadRef (Node <E > newVal ) {
42+ UPDATER .lazySet (this , newVal );
43+ }
44+ }
45+
46+ abstract class MpscLinkedQueuePad1 <E > extends MpscLinkedQueueHeadRef <E > {
47+ long p00 , p01 , p02 , p03 , p04 , p05 , p06 , p07 ;
48+ long p30 , p31 , p32 , p33 , p34 , p35 , p36 , p37 ;
49+ }
50+
51+ abstract class MpscLinkedQueueTailRef <E > extends MpscLinkedQueuePad1 <E > {
52+ @ SuppressWarnings ("rawtypes" )
53+ private static final AtomicReferenceFieldUpdater <MpscLinkedQueueTailRef , Node > UPDATER =
54+ newUpdater (MpscLinkedQueueTailRef .class , Node .class , "tailRef" );
55+ private volatile Node <E > tailRef ;
56+ protected final Node <E > tailRef () {
57+ return tailRef ;
58+ }
59+ protected final void tailRef (Node <E > val ) {
60+ tailRef = val ;
61+ }
62+ @ SuppressWarnings ("unchecked" )
63+ protected final Node <E > getAndSetTailRef (Node <E > newVal ) {
64+ return (Node <E >) UPDATER .getAndSet (this , newVal );
65+ }
66+ }
2267/**
2368 * A multiple-producer single consumer queue implementation with padded reference to tail to avoid cache-line
2469 * thrashing. Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a>
25- * but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.
26- *
70+ * but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.<br>
71+ * Original algorithm presented <a
72+ * href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue"> on 1024
73+ * Cores</a> by D. Vyukov.<br>
74+ * Data structure modified to avoid false sharing between head and tail references as per implementation of
75+ * MpscLinkedQueue on <a href="https://github.com/JCTools/JCTools">JCTools project</a>.
76+ *
2777 * @param <E> the element type
2878 */
29- public final class MpscPaddedQueue <E > extends AtomicReference <MpscPaddedQueue .Node <E >> {
30- /** */
31- private static final long serialVersionUID = 1L ;
32- /** The padded tail reference. */
33- final PaddedNode <E > tail ;
34-
79+ public final class MpscPaddedQueue <E > extends MpscLinkedQueueTailRef <E > {
80+ long p00 , p01 , p02 , p03 , p04 , p05 , p06 , p07 ;
81+ long p30 , p31 , p32 , p33 , p34 , p35 , p36 , p37 ;
3582 /**
3683 * Initializes the empty queue.
3784 */
3885 public MpscPaddedQueue () {
39- Node <E > first = new Node <E >(null );
40- tail = new PaddedNode <E >();
41- tail .node = first ;
42- set (first );
86+ Node <E > stub = new Node <E >(null );
87+ headRef (stub );
88+ tailRef (stub );
4389 }
4490
4591 /**
@@ -49,7 +95,7 @@ public MpscPaddedQueue() {
4995 */
5096 public void offer (E v ) {
5197 Node <E > n = new Node <E >(v );
52- getAndSet (n ).set (n );
98+ getAndSetTailRef (n ).next (n );
5399 }
54100
55101 /**
@@ -63,25 +109,23 @@ public E poll() {
63109 }
64110 E v = n .value ;
65111 n .value = null ; // do not retain this value as the node still stays in the queue
66- tail . lazySet (n );
112+ lazySetHeadRef (n );
67113 return v ;
68114 }
69-
115+
70116 /**
71117 * Check if there is a node available without changing anything.
72118 * @return
73119 */
74120 private Node <E > peekNode () {
75121 for (;;) {
76- @ SuppressWarnings (value = "unchecked" )
77- Node <E > t = tail .node ;
78- Node <E > n = t .get ();
79- if (n != null || get () == t ) {
122+ Node <E > t = headRef ();
123+ Node <E > n = t .next ();
124+ if (n != null || headRef () == t ) {
80125 return n ;
81126 }
82127 }
83128 }
84-
85129 /**
86130 * Clears the queue.
87131 */
@@ -92,46 +136,26 @@ public void clear() {
92136 }
93137 }
94138 }
95- /** The front-padded node class housing the actual value. */
96- static abstract class PaddedNodeBase <E > extends FrontPadding {
97- private static final long serialVersionUID = 2L ;
98- volatile Node <E > node ;
99- @ SuppressWarnings (value = "rawtypes" )
100- static final AtomicReferenceFieldUpdater <PaddedNodeBase , Node > NODE_UPDATER = AtomicReferenceFieldUpdater .newUpdater (PaddedNodeBase .class , Node .class , "node" );
101- public void lazySet (Node <E > newValue ) {
102- NODE_UPDATER .lazySet (this , newValue );
103- }
104- }
105- /** Post-padding of the padded node base class. */
106- static final class PaddedNode <E > extends PaddedNodeBase <E > {
107- private static final long serialVersionUID = 3L ;
108- /** Padding. */
109- public transient long p16 , p17 , p18 , p19 , p20 , p21 , p22 ; // 56 bytes (the remaining 8 is in the base)
110- /** Padding. */
111- public transient long p24 , p25 , p26 , p27 , p28 , p29 , p30 , p31 ; // 64 bytes
112- }
113139
114140 /**
115141 * Regular node with value and reference to the next node.
116142 */
117- static final class Node <E > implements Serializable {
118- private static final long serialVersionUID = 4L ;
143+ static final class Node <E > {
119144 E value ;
120145 @ SuppressWarnings (value = "rawtypes" )
121146 static final AtomicReferenceFieldUpdater <Node , Node > TAIL_UPDATER = AtomicReferenceFieldUpdater .newUpdater (Node .class , Node .class , "tail" );
122- volatile Node <E > tail ;
147+ private volatile Node <E > next ;
123148
124- public Node (E value ) {
149+ Node (E value ) {
125150 this .value = value ;
126151 }
127152
128- public void set (Node <E > newTail ) {
129- TAIL_UPDATER .lazySet (this , newTail );
153+ void next (Node <E > newNext ) {
154+ TAIL_UPDATER .lazySet (this , newNext );
130155 }
131156
132- @ SuppressWarnings (value = "unchecked" )
133- public Node <E > get () {
134- return TAIL_UPDATER .get (this );
157+ Node <E > next () {
158+ return next ;
135159 }
136160 }
137161
0 commit comments