Skip to content
This repository was archived by the owner on Dec 24, 2022. It is now read-only.

Commit 1e6dd09

Browse files
committed
Refactor pooling impl, move all blocking code or code that can throw outside of the lock
1 parent c099685 commit 1e6dd09

4 files changed

Lines changed: 332 additions & 114 deletions

File tree

src/ServiceStack.Redis/PooledRedisClientManager.cs

Lines changed: 226 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
using System;
1414
using System.Collections.Generic;
15+
using System.IO;
1516
using System.Linq;
1617
using System.Threading;
1718
using ServiceStack.Caching;
@@ -207,91 +208,141 @@ protected virtual void OnStart()
207208
/// <returns></returns>
208209
public IRedisClient GetClient()
209210
{
210-
lock (writeClients)
211+
try
211212
{
212-
AssertValidReadWritePool();
213-
214-
RedisClient inActiveClient;
215-
while ((inActiveClient = GetInActiveWriteClient()) == null)
213+
var poolTimedOut = false;
214+
var inactivePoolIndex = -1;
215+
lock (writeClients)
216216
{
217-
if (PoolTimeout.HasValue)
217+
AssertValidReadWritePool();
218+
219+
RedisClient inActiveClient;
220+
while ((inactivePoolIndex = GetInActiveWriteClient(out inActiveClient)) == -1)
218221
{
219-
// wait for a connection, cry out if made to wait too long
220-
if (!Monitor.Wait(writeClients, PoolTimeout.Value))
221-
throw new TimeoutException(PoolTimeoutError);
222+
if (PoolTimeout.HasValue)
223+
{
224+
// wait for a connection, cry out if made to wait too long
225+
if (!Monitor.Wait(writeClients, PoolTimeout.Value))
226+
{
227+
poolTimedOut = true;
228+
break;
229+
}
230+
}
231+
else
232+
{
233+
Monitor.Wait(writeClients, RecheckPoolAfterMs);
234+
}
235+
}
236+
237+
//inActiveClient != null only for Valid InActive Clients
238+
if (inActiveClient != null)
239+
{
240+
WritePoolIndex++;
241+
inActiveClient.Active = true;
242+
243+
InitClient(inActiveClient);
244+
245+
return inActiveClient;
222246
}
223-
else
224-
Monitor.Wait(writeClients, RecheckPoolAfterMs);
225247
}
226248

227-
WritePoolIndex++;
228-
inActiveClient.Active = true;
249+
if (poolTimedOut)
250+
throw new TimeoutException(PoolTimeoutError);
229251

230-
InitClient(inActiveClient);
252+
//Reaches here when there's no Valid InActive Clients
253+
try
254+
{
255+
//inactivePoolIndex = index of reservedSlot || index of invalid client
256+
var existingClient = writeClients[inactivePoolIndex];
257+
if (existingClient != null && existingClient != reservedSlot && existingClient.HadExceptions)
258+
{
259+
RedisState.DeactivateClient(existingClient);
260+
}
261+
262+
var newClient = InitNewClient(RedisResolver.CreateMasterClient(inactivePoolIndex));
231263

264+
//Put all blocking I/O or potential Exceptions are before lock
265+
lock (writeClients)
266+
{
267+
//If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
268+
if (writeClients[inactivePoolIndex] != existingClient)
269+
{
270+
if (Log.IsDebugEnabled)
271+
Log.Debug("writeClients[inactivePoolIndex] != existingClient: {0}".Fmt(writeClients[inactivePoolIndex]));
272+
273+
return newClient; //return client outside of pool
274+
}
275+
276+
WritePoolIndex++;
277+
writeClients[inactivePoolIndex] = newClient;
278+
return newClient;
279+
}
280+
}
281+
catch
282+
{
283+
//Revert free-slot for any I/O exceptions that can throw (before lock)
284+
lock (writeClients)
285+
{
286+
writeClients[inactivePoolIndex] = null; //free slot
287+
}
288+
throw;
289+
}
290+
}
291+
finally
292+
{
232293
RedisState.DisposeExpiredClients();
294+
}
295+
}
233296

234-
return inActiveClient;
297+
class ReservedClient : RedisClient
298+
{
299+
public ReservedClient()
300+
{
301+
this.DeactivatedAt = DateTime.UtcNow;
235302
}
303+
304+
public override void Dispose() {}
236305
}
237306

307+
static readonly ReservedClient reservedSlot = new ReservedClient();
308+
238309
/// <summary>
239310
/// Called within a lock
240311
/// </summary>
241312
/// <returns></returns>
242-
private RedisClient GetInActiveWriteClient()
313+
private int GetInActiveWriteClient(out RedisClient inactiveClient)
243314
{
244-
var desiredIndex = WritePoolIndex % writeClients.Length;
245315
//this will loop through all hosts in readClients once even though there are 2 for loops
246316
//both loops are used to try to get the prefered host according to the round robin algorithm
247317
var readWriteTotal = RedisResolver.ReadWriteHostsCount;
318+
var desiredIndex = WritePoolIndex % writeClients.Length;
248319
for (int x = 0; x < readWriteTotal; x++)
249320
{
250321
var nextHostIndex = (desiredIndex + x) % readWriteTotal;
251322
for (var i = nextHostIndex; i < writeClients.Length; i += readWriteTotal)
252323
{
253324
if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
254-
return writeClients[i];
255-
256-
if (writeClients[i] == null || writeClients[i].HadExceptions)
257325
{
258-
if (writeClients[i] != null)
259-
RedisState.DeactivateClient(writeClients[i]);
326+
inactiveClient = writeClients[i];
327+
return i;
328+
}
260329

261-
var client = InitNewClient(RedisResolver.CreateMasterClient(nextHostIndex));
262-
writeClients[i] = client;
330+
if (writeClients[i] == null)
331+
{
332+
writeClients[i] = reservedSlot;
333+
inactiveClient = null;
334+
return i;
335+
}
263336

264-
return client;
337+
if (writeClients[i] != reservedSlot && writeClients[i].HadExceptions)
338+
{
339+
inactiveClient = null;
340+
return i;
265341
}
266342
}
267343
}
268-
return null;
269-
}
270-
271-
private RedisClient InitNewClient(RedisClient client)
272-
{
273-
client.Id = Interlocked.Increment(ref RedisClientCounter);
274-
client.ClientManager = this;
275-
client.ConnectionFilter = ConnectionFilter;
276-
if (NamespacePrefix != null)
277-
client.NamespacePrefix = NamespacePrefix;
278-
return client;
279-
}
280-
281-
private void InitClient(RedisClient client)
282-
{
283-
if (this.ConnectTimeout != null)
284-
client.ConnectTimeout = this.ConnectTimeout.Value;
285-
if (this.SocketSendTimeout.HasValue)
286-
client.SendTimeout = this.SocketSendTimeout.Value;
287-
if (this.SocketReceiveTimeout.HasValue)
288-
client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
289-
if (this.IdleTimeOutSecs.HasValue)
290-
client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
291-
if (this.NamespacePrefix != null)
292-
client.NamespacePrefix = NamespacePrefix;
293-
if (Db != null && client.Db != Db) //Reset database to default if changed
294-
client.ChangeDb(Db.Value);
344+
inactiveClient = null;
345+
return -1;
295346
}
296347

297348
/// <summary>
@@ -300,39 +351,101 @@ private void InitClient(RedisClient client)
300351
/// <returns></returns>
301352
public virtual IRedisClient GetReadOnlyClient()
302353
{
303-
lock (readClients)
354+
try
304355
{
305-
AssertValidReadOnlyPool();
306-
307-
RedisClient inActiveClient;
308-
while ((inActiveClient = GetInActiveReadClient()) == null)
356+
var poolTimedOut = false;
357+
var inactivePoolIndex = -1;
358+
lock (readClients)
309359
{
310-
if (PoolTimeout.HasValue)
360+
AssertValidReadOnlyPool();
361+
362+
RedisClient inActiveClient;
363+
while ((inactivePoolIndex = GetInActiveReadClient(out inActiveClient)) == -1)
311364
{
312-
// wait for a connection, cry out if made to wait too long
313-
if (!Monitor.Wait(readClients, PoolTimeout.Value))
314-
throw new TimeoutException(PoolTimeoutError);
365+
if (PoolTimeout.HasValue)
366+
{
367+
// wait for a connection, break out if made to wait too long
368+
if (!Monitor.Wait(readClients, PoolTimeout.Value))
369+
{
370+
poolTimedOut = true;
371+
break;
372+
}
373+
}
374+
else
375+
{
376+
Monitor.Wait(readClients, RecheckPoolAfterMs);
377+
}
378+
}
379+
380+
//inActiveClient != null only for Valid InActive Clients
381+
if (inActiveClient != null)
382+
{
383+
ReadPoolIndex++;
384+
inActiveClient.Active = true;
385+
386+
InitClient(inActiveClient);
387+
388+
return inActiveClient;
315389
}
316-
else
317-
Monitor.Wait(readClients, RecheckPoolAfterMs);
318390
}
319391

320-
ReadPoolIndex++;
321-
inActiveClient.Active = true;
392+
if (poolTimedOut)
393+
throw new TimeoutException(PoolTimeoutError);
322394

323-
InitClient(inActiveClient);
395+
//Reaches here when there's no Valid InActive Clients
396+
try
397+
{
398+
//inactivePoolIndex = index of reservedSlot || index of invalid client
399+
var existingClient = readClients[inactivePoolIndex];
400+
if (existingClient != null && existingClient != reservedSlot && existingClient.HadExceptions)
401+
{
402+
RedisState.DeactivateClient(existingClient);
403+
}
324404

325-
RedisState.DisposeExpiredClients();
405+
var newClient = InitNewClient(RedisResolver.CreateSlaveClient(inactivePoolIndex));
326406

327-
return inActiveClient;
407+
//Put all blocking I/O or potential Exceptions are before lock
408+
lock (readClients)
409+
{
410+
//If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
411+
if (readClients[inactivePoolIndex] != existingClient)
412+
{
413+
if (Log.IsDebugEnabled)
414+
Log.Debug("readClients[inactivePoolIndex] != existingClient: {0}".Fmt(readClients[inactivePoolIndex]));
415+
416+
Interlocked.Increment(ref RedisState.TotalClientsCreatedOutsidePool);
417+
418+
//Don't handle callbacks for new client outside pool
419+
newClient.ClientManager = null;
420+
return newClient; //return client outside of pool
421+
}
422+
423+
ReadPoolIndex++;
424+
readClients[inactivePoolIndex] = newClient;
425+
return newClient;
426+
}
427+
}
428+
catch
429+
{
430+
//Revert free-slot for any I/O exceptions that can throw
431+
lock (readClients)
432+
{
433+
readClients[inactivePoolIndex] = null; //free slot
434+
}
435+
throw;
436+
}
437+
}
438+
finally
439+
{
440+
RedisState.DisposeExpiredClients();
328441
}
329442
}
330443

331444
/// <summary>
332445
/// Called within a lock
333446
/// </summary>
334447
/// <returns></returns>
335-
private RedisClient GetInActiveReadClient()
448+
private int GetInActiveReadClient(out RedisClient inactiveClient)
336449
{
337450
var desiredIndex = ReadPoolIndex % readClients.Length;
338451
//this will loop through all hosts in readClients once even though there are 2 for loops
@@ -344,21 +457,56 @@ private RedisClient GetInActiveReadClient()
344457
for (var i = nextHostIndex; i < readClients.Length; i += readOnlyTotal)
345458
{
346459
if (readClients[i] != null && !readClients[i].Active && !readClients[i].HadExceptions)
347-
return readClients[i];
348-
349-
if (readClients[i] == null || readClients[i].HadExceptions)
350460
{
351-
if (readClients[i] != null)
352-
RedisState.DeactivateClient(readClients[i]);
461+
inactiveClient = readClients[i];
462+
return i;
463+
}
353464

354-
var client = InitNewClient(RedisResolver.CreateSlaveClient(nextHostIndex));
355-
readClients[i] = client;
465+
if (readClients[i] == null)
466+
{
467+
readClients[i] = reservedSlot;
468+
inactiveClient = null;
469+
return i;
470+
}
356471

357-
return client;
472+
if (readClients[i] != reservedSlot && readClients[i].HadExceptions)
473+
{
474+
inactiveClient = null;
475+
return i;
358476
}
359477
}
360478
}
361-
return null;
479+
inactiveClient = null;
480+
return -1;
481+
}
482+
483+
private RedisClient InitNewClient(RedisClient client)
484+
{
485+
client.Id = Interlocked.Increment(ref RedisClientCounter);
486+
client.Active = true;
487+
client.ClientManager = this;
488+
client.ConnectionFilter = ConnectionFilter;
489+
if (NamespacePrefix != null)
490+
client.NamespacePrefix = NamespacePrefix;
491+
492+
return InitClient(client);
493+
}
494+
495+
private RedisClient InitClient(RedisClient client)
496+
{
497+
if (this.ConnectTimeout != null)
498+
client.ConnectTimeout = this.ConnectTimeout.Value;
499+
if (this.SocketSendTimeout.HasValue)
500+
client.SendTimeout = this.SocketSendTimeout.Value;
501+
if (this.SocketReceiveTimeout.HasValue)
502+
client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
503+
if (this.IdleTimeOutSecs.HasValue)
504+
client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
505+
if (this.NamespacePrefix != null)
506+
client.NamespacePrefix = NamespacePrefix;
507+
if (Db != null && client.Db != Db) //Reset database to default if changed
508+
client.ChangeDb(Db.Value);
509+
return client;
362510
}
363511

364512
public void DisposeClient(RedisNativeClient client)

0 commit comments

Comments
 (0)