Skip to content

Commit 465320c

Browse files
Aaron BoxerAaron Boxer
authored andcommitted
1) added tests for typed pipeline
2) added feature: pipeline is now replayable - new command class that stores the initial redis command - pipeline and transaction have init method, so they can be re-initialized - added test for replay: works for non-generic pipeline, but fails for generic pipeline
1 parent edcc62e commit 465320c

20 files changed

Lines changed: 817 additions & 391 deletions

src/ServiceStack.Redis/Generic/IRedisTypedClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public interface IRedisTypedClient<T>
2525
IHasNamed<IRedisSortedSet<T>> SortedSets { get; set; }
2626
IRedisHash<TKey, T> GetHash<TKey>(string hashId);
2727

28-
IRedisTypedTransaction<T> CreateTransaction();
28+
IRedisTypedTransaction<T> CreateTransaction();
29+
IRedisTypedPipeline<T> CreatePipeline();
2930

3031
IDisposable AcquireLock();
3132
IDisposable AcquireLock(TimeSpan timeOut);
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using ServiceStack.Redis.Generic;
4+
5+
namespace ServiceStack.Redis.Generic
6+
{
7+
/// <summary>
8+
/// A complete redis command, with method to send command, receive response, and run callback on success or failure
9+
/// </summary>
10+
internal class QueuedRedisTypedCommand<T> : QueuedRedisOperation
11+
{
12+
13+
public Action<IRedisTypedClient<T>> VoidReturnCommand { get; set; }
14+
public Func<IRedisTypedClient<T>, int> IntReturnCommand { get; set; }
15+
public Func<IRedisTypedClient<T>, long> LongReturnCommand { get; set; }
16+
public Func<IRedisTypedClient<T>, bool> BoolReturnCommand { get; set; }
17+
public Func<IRedisTypedClient<T>, byte[]> BytesReturnCommand { get; set; }
18+
public Func<IRedisTypedClient<T>, byte[][]> MultiBytesReturnCommand { get; set; }
19+
public Func<IRedisTypedClient<T>, string> StringReturnCommand { get; set; }
20+
public Func<IRedisTypedClient<T>, List<string>> MultiStringReturnCommand { get; set; }
21+
public Func<IRedisTypedClient<T>, List<T>> MultiObjectReturnCommand { get; set; }
22+
public Func<IRedisTypedClient<T>, double> DoubleReturnCommand { get; set; }
23+
public Func<IRedisTypedClient<T>, T> ObjectReturnCommand { get; set; }
24+
25+
public void Execute(IRedisTypedClient<T> client)
26+
{
27+
try
28+
{
29+
if (VoidReturnCommand != null)
30+
{
31+
VoidReturnCommand(client);
32+
33+
}
34+
else if (IntReturnCommand != null)
35+
{
36+
IntReturnCommand(client);
37+
38+
}
39+
else if (LongReturnCommand != null)
40+
{
41+
LongReturnCommand(client);
42+
43+
}
44+
else if (DoubleReturnCommand != null)
45+
{
46+
DoubleReturnCommand(client);
47+
48+
}
49+
else if (BytesReturnCommand != null)
50+
{
51+
BytesReturnCommand(client);
52+
53+
}
54+
else if (StringReturnCommand != null)
55+
{
56+
StringReturnCommand(client);
57+
58+
}
59+
else if (MultiBytesReturnCommand != null)
60+
{
61+
MultiBytesReturnCommand(client);
62+
63+
}
64+
else if (MultiStringReturnCommand != null)
65+
{
66+
MultiStringReturnCommand(client);
67+
68+
}
69+
}
70+
catch (Exception ex)
71+
{
72+
Log.Error(ex);
73+
}
74+
}
75+
76+
}
77+
}

src/ServiceStack.Redis/Generic/RedisTypedClient.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public IRedisPipelineBase Pipeline
9696
client.Pipeline = value;
9797
}
9898
}
99+
99100
public void Multi()
100101
{
101102
this.client.Multi();
@@ -111,14 +112,14 @@ public void Exec()
111112
client.Exec();
112113
}
113114

114-
internal void AddTypeIdsRegisteredDuringTransaction()
115+
internal void AddTypeIdsRegisteredDuringPipeline()
115116
{
116-
client.AddTypeIdsRegisteredDuringTransaction();
117+
client.AddTypeIdsRegisteredDuringPipeline();
117118
}
118119

119-
internal void ClearTypeIdsRegisteredDuringTransaction()
120+
internal void ClearTypeIdsRegisteredDuringPipeline()
120121
{
121-
client.ClearTypeIdsRegisteredDuringTransaction();
122+
client.ClearTypeIdsRegisteredDuringPipeline();
122123
}
123124

124125
public List<string> GetAllKeys()

src/ServiceStack.Redis/Generic/RedisTypedCommandQueue.cs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ public void QueueCommand(Action<IRedisTypedClient<T>> command, Action onSuccessC
2929

3030
public void QueueCommand(Action<IRedisTypedClient<T>> command, Action onSuccessCallback, Action<Exception> onErrorCallback)
3131
{
32-
BeginQueuedCommand(new QueuedRedisOperation
32+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
3333
{
34+
VoidReturnCommand = command,
3435
OnSuccessVoidCallback = onSuccessCallback,
3536
OnErrorCallback = onErrorCallback
3637
});
@@ -50,8 +51,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, int> command, Action<int> on
5051

5152
public void QueueCommand(Func<IRedisTypedClient<T>, int> command, Action<int> onSuccessCallback, Action<Exception> onErrorCallback)
5253
{
53-
BeginQueuedCommand(new QueuedRedisOperation
54+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
5455
{
56+
IntReturnCommand = command,
5557
OnSuccessIntCallback = onSuccessCallback,
5658
OnErrorCallback = onErrorCallback
5759
});
@@ -71,8 +73,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, long> command, Action<long>
7173

7274
public void QueueCommand(Func<IRedisTypedClient<T>, long> command, Action<long> onSuccessCallback, Action<Exception> onErrorCallback)
7375
{
74-
BeginQueuedCommand(new QueuedRedisOperation
76+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
7577
{
78+
LongReturnCommand = command,
7679
OnSuccessLongCallback = onSuccessCallback,
7780
OnErrorCallback = onErrorCallback
7881
});
@@ -92,8 +95,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, bool> command, Action<bool>
9295

9396
public void QueueCommand(Func<IRedisTypedClient<T>, bool> command, Action<bool> onSuccessCallback, Action<Exception> onErrorCallback)
9497
{
95-
BeginQueuedCommand(new QueuedRedisOperation
98+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
9699
{
100+
BoolReturnCommand = command,
97101
OnSuccessBoolCallback = onSuccessCallback,
98102
OnErrorCallback = onErrorCallback
99103
});
@@ -113,8 +117,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, double> command, Action<doub
113117

114118
public void QueueCommand(Func<IRedisTypedClient<T>, double> command, Action<double> onSuccessCallback, Action<Exception> onErrorCallback)
115119
{
116-
BeginQueuedCommand(new QueuedRedisOperation
120+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
117121
{
122+
DoubleReturnCommand = command,
118123
OnSuccessDoubleCallback = onSuccessCallback,
119124
OnErrorCallback = onErrorCallback
120125
});
@@ -134,8 +139,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, byte[]> command, Action<byte
134139

135140
public void QueueCommand(Func<IRedisTypedClient<T>, byte[]> command, Action<byte[]> onSuccessCallback, Action<Exception> onErrorCallback)
136141
{
137-
BeginQueuedCommand(new QueuedRedisOperation
142+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
138143
{
144+
BytesReturnCommand = command,
139145
OnSuccessBytesCallback = onSuccessCallback,
140146
OnErrorCallback = onErrorCallback
141147
});
@@ -155,8 +161,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, string> command, Action<stri
155161

156162
public void QueueCommand(Func<IRedisTypedClient<T>, string> command, Action<string> onSuccessCallback, Action<Exception> onErrorCallback)
157163
{
158-
BeginQueuedCommand(new QueuedRedisOperation
164+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
159165
{
166+
StringReturnCommand = command,
160167
OnSuccessStringCallback = onSuccessCallback,
161168
OnErrorCallback = onErrorCallback
162169
});
@@ -175,8 +182,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, T> command, Action<T> onSucc
175182

176183
public void QueueCommand(Func<IRedisTypedClient<T>, T> command, Action<T> onSuccessCallback, Action<Exception> onErrorCallback)
177184
{
178-
BeginQueuedCommand(new QueuedRedisOperation
185+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
179186
{
187+
ObjectReturnCommand = command,
180188
OnSuccessTypeCallback = x => onSuccessCallback(JsonSerializer.DeserializeFromString<T>(x)),
181189
OnErrorCallback = onErrorCallback
182190
});
@@ -196,8 +204,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, byte[][]> command, Action<by
196204

197205
public void QueueCommand(Func<IRedisTypedClient<T>, byte[][]> command, Action<byte[][]> onSuccessCallback, Action<Exception> onErrorCallback)
198206
{
199-
BeginQueuedCommand(new QueuedRedisOperation
207+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
200208
{
209+
MultiBytesReturnCommand = command,
201210
OnSuccessMultiBytesCallback = onSuccessCallback,
202211
OnErrorCallback = onErrorCallback
203212
});
@@ -217,8 +226,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, List<string>> command, Actio
217226

218227
public void QueueCommand(Func<IRedisTypedClient<T>, List<string>> command, Action<List<string>> onSuccessCallback, Action<Exception> onErrorCallback)
219228
{
220-
BeginQueuedCommand(new QueuedRedisOperation
229+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
221230
{
231+
MultiStringReturnCommand = command,
222232
OnSuccessMultiStringCallback = onSuccessCallback,
223233
OnErrorCallback = onErrorCallback
224234
});
@@ -237,8 +247,9 @@ public void QueueCommand(Func<IRedisTypedClient<T>, List<T>> command, Action<Lis
237247

238248
public void QueueCommand(Func<IRedisTypedClient<T>, List<T>> command, Action<List<T>> onSuccessCallback, Action<Exception> onErrorCallback)
239249
{
240-
BeginQueuedCommand(new QueuedRedisOperation
250+
BeginQueuedCommand(new QueuedRedisTypedCommand<T>
241251
{
252+
MultiObjectReturnCommand = command,
242253
OnSuccessMultiTypeCallback = x => onSuccessCallback(x.ConvertAll(y => JsonSerializer.DeserializeFromString<T>(y))),
243254
OnErrorCallback = onErrorCallback
244255
});

src/ServiceStack.Redis/Generic/RedisTypedPipeline.cs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,60 @@ public class RedisTypedPipeline<T> : RedisTypedCommandQueue<T>, IRedisTypedPipel
1212
internal RedisTypedPipeline(RedisTypedClient<T> redisClient)
1313
: base(redisClient)
1414
{
15-
if (redisClient.Transaction != null)
15+
Init();
16+
}
17+
18+
protected virtual void Init()
19+
{
20+
if (RedisClient.Transaction != null)
1621
throw new InvalidOperationException("A transaction is already in use");
1722

18-
if (redisClient.Pipeline != null)
23+
if (RedisClient.Pipeline != null)
1924
throw new InvalidOperationException("A pipeline is already in use");
2025

21-
redisClient.Pipeline = this;
22-
}
26+
RedisClient.Pipeline = this;
27+
28+
}
2329
public void Flush()
2430
{
25-
// flush send buffers
26-
RedisClient.FlushSendBuffer();
27-
28-
//receive expected results
29-
foreach (var queuedCommand in QueuedCommands)
30-
{
31-
queuedCommand.ProcessResult();
32-
}
31+
try
32+
{
33+
34+
35+
// flush send buffers
36+
RedisClient.FlushSendBuffer();
37+
38+
//receive expected results
39+
foreach (var queuedCommand in QueuedCommands)
40+
{
41+
queuedCommand.ProcessResult();
42+
}
43+
44+
}
45+
finally
46+
{
47+
ClosePipeline();
48+
RedisClient.AddTypeIdsRegisteredDuringPipeline();
49+
}
3350
}
51+
protected void Execute()
52+
{
53+
foreach (var queuedCommand in QueuedCommands)
54+
{
55+
var cmd = queuedCommand as QueuedRedisTypedCommand<T>;
56+
if (cmd != null)
57+
cmd.Execute(RedisClient);
58+
}
59+
}
60+
61+
public void Replay()
62+
{
63+
RedisClient.Pipeline = this;
64+
Execute();
65+
Flush();
66+
}
3467

35-
protected void ClosePipeline()
68+
protected void ClosePipeline()
3669
{
3770
RedisClient.ResetSendBuffer();
3871
RedisClient.Pipeline = null;

src/ServiceStack.Redis/Generic/RedisTypedTransaction.cs

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@ internal class RedisTypedTransaction<T>
2525
internal RedisTypedTransaction(RedisTypedClient<T> redisClient)
2626
: base(redisClient)
2727
{
28-
redisClient.Transaction = this;
29-
redisClient.Multi();
28+
3029
}
3130

31+
protected override void Init()
32+
{
33+
base.Init();
34+
RedisClient.Transaction = this;
35+
RedisClient.Multi();
36+
}
37+
3238
/// <summary>
3339
/// Put "QUEUED" messages at back of queue
3440
/// </summary>
@@ -82,7 +88,7 @@ public void Commit()
8288
{
8389
RedisClient.Transaction = null;
8490
base.ClosePipeline();
85-
RedisClient.AddTypeIdsRegisteredDuringTransaction();
91+
RedisClient.AddTypeIdsRegisteredDuringPipeline();
8692
}
8793
}
8894

@@ -106,7 +112,7 @@ public void Rollback()
106112
throw new InvalidOperationException("There is no current transaction to Rollback");
107113

108114
RedisClient.Transaction = null;
109-
RedisClient.ClearTypeIdsRegisteredDuringTransaction();
115+
RedisClient.ClearTypeIdsRegisteredDuringPipeline();
110116
}
111117

112118
public void Dispose()
@@ -118,44 +124,9 @@ public void Dispose()
118124

119125
#region Overrides of RedisQueueCompletableOperation methods
120126

121-
public override void CompleteVoidQueuedCommand(Action voidReadCommand)
122-
{
123-
base.CompleteVoidQueuedCommand(voidReadCommand);
124-
QueueExpectQueued();
125-
}
126-
public override void CompleteIntQueuedCommand(Func<int> intReadCommand)
127-
{
128-
base.CompleteIntQueuedCommand(intReadCommand);
129-
QueueExpectQueued();
130-
}
131-
public override void CompleteLongQueuedCommand(Func<long> longReadCommand)
132-
{
133-
base.CompleteLongQueuedCommand(longReadCommand);
134-
QueueExpectQueued();
135-
}
136-
public override void CompleteBytesQueuedCommand(Func<byte[]> bytesReadCommand)
137-
{
138-
base.CompleteBytesQueuedCommand(bytesReadCommand);
139-
QueueExpectQueued();
140-
}
141-
public override void CompleteMultiBytesQueuedCommand(Func<byte[][]> multiBytesReadCommand)
142-
{
143-
base.CompleteMultiBytesQueuedCommand(multiBytesReadCommand);
144-
QueueExpectQueued();
145-
}
146-
public override void CompleteStringQueuedCommand(Func<string> stringReadCommand)
147-
{
148-
base.CompleteStringQueuedCommand(stringReadCommand);
149-
QueueExpectQueued();
150-
}
151-
public override void CompleteMultiStringQueuedCommand(Func<List<string>> multiStringReadCommand)
152-
{
153-
base.CompleteMultiStringQueuedCommand(multiStringReadCommand);
154-
QueueExpectQueued();
155-
}
156-
public override void CompleteDoubleQueuedCommand(Func<double> doubleReadCommand)
127+
protected override void AddCurrentQueuedOperation()
157128
{
158-
base.CompleteDoubleQueuedCommand(doubleReadCommand);
129+
base.AddCurrentQueuedOperation();
159130
QueueExpectQueued();
160131
}
161132
#endregion

0 commit comments

Comments
 (0)