forked from coder/coder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubsub_internal_test.go
More file actions
140 lines (129 loc) · 3.9 KB
/
pubsub_internal_test.go
File metadata and controls
140 lines (129 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package pubsub
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/coder/coder/v2/testutil"
)
func Test_msgQueue_ListenerWithError(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
m := make(chan string)
e := make(chan error)
uut := newMsgQueue(ctx, nil, func(ctx context.Context, msg []byte, err error) {
m <- string(msg)
e <- err
})
defer uut.close()
// We're going to enqueue 4 messages and an error in a loop -- that is, a cycle of 5.
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
}
uut.dropped()
for i := 0; i < 4; i++ {
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, fmt.Sprintf("%d%d", j, i), msg)
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-e:
require.NoError(t, err)
}
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, "", msg)
}
select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-e:
require.ErrorIs(t, err, ErrDroppedMessages)
}
}
}
func Test_msgQueue_Listener(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
m := make(chan string)
uut := newMsgQueue(ctx, func(ctx context.Context, msg []byte) {
m <- string(msg)
}, nil)
defer uut.close()
// We're going to enqueue 4 messages and an error in a loop -- that is, a cycle of 5.
// PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned
// when we wrap around the end of the circular buffer. This tests that we correctly handle
// the wrapping and aren't dequeueing misaligned data.
cycles := (BufferSize / 5) * 2 // almost twice around the ring
for j := 0; j < cycles; j++ {
for i := 0; i < 4; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i)))
}
uut.dropped()
for i := 0; i < 4; i++ {
select {
case <-ctx.Done():
t.Fatal("timed out")
case msg := <-m:
require.Equal(t, fmt.Sprintf("%d%d", j, i), msg)
}
}
// Listener skips over errors, so we only read out the 4 real messages.
}
}
func Test_msgQueue_Full(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
firstDequeue := make(chan struct{})
allowRead := make(chan struct{})
n := 0
errors := make(chan error)
uut := newMsgQueue(ctx, nil, func(ctx context.Context, msg []byte, err error) {
if n == 0 {
close(firstDequeue)
}
<-allowRead
if err == nil {
require.Equal(t, fmt.Sprintf("%d", n), string(msg))
n++
return
}
errors <- err
})
defer uut.close()
// we send 2 more than the capacity. One extra because the call to the ListenerFunc blocks
// but only after we've dequeued a message, and then another extra because we want to exceed
// the capacity, not just reach it.
for i := 0; i < BufferSize+2; i++ {
uut.enqueue([]byte(fmt.Sprintf("%d", i)))
// ensure the first dequeue has happened before proceeding, so that this function isn't racing
// against the goroutine that dequeues items.
<-firstDequeue
}
close(allowRead)
select {
case <-ctx.Done():
t.Fatal("timed out")
case err := <-errors:
require.ErrorIs(t, err, ErrDroppedMessages)
}
// Ok, so we sent 2 more than capacity, but we only read the capacity, that's because the last
// message we send doesn't get queued, AND, it bumps a message out of the queue to make room
// for the error, so we read 2 less than we sent.
require.Equal(t, BufferSize, n)
}