1212
1313using System ;
1414using System . Collections . Generic ;
15+ using System . IO ;
1516using System . Linq ;
1617using System . Threading ;
1718using ServiceStack . Caching ;
@@ -207,91 +208,141 @@ protected virtual void OnStart()
207208 /// <returns></returns>
208209 public IRedisClient GetClient ( )
209210 {
210- lock ( writeClients )
211+ try
211212 {
212- AssertValidReadWritePool ( ) ;
213-
214- RedisClient inActiveClient ;
215- while ( ( inActiveClient = GetInActiveWriteClient ( ) ) == null )
213+ var poolTimedOut = false ;
214+ var inactivePoolIndex = - 1 ;
215+ lock ( writeClients )
216216 {
217- if ( PoolTimeout . HasValue )
217+ AssertValidReadWritePool ( ) ;
218+
219+ RedisClient inActiveClient ;
220+ while ( ( inactivePoolIndex = GetInActiveWriteClient ( out inActiveClient ) ) == - 1 )
218221 {
219- // wait for a connection, cry out if made to wait too long
220- if ( ! Monitor . Wait ( writeClients , PoolTimeout . Value ) )
221- throw new TimeoutException ( PoolTimeoutError ) ;
222+ if ( PoolTimeout . HasValue )
223+ {
224+ // wait for a connection, cry out if made to wait too long
225+ if ( ! Monitor . Wait ( writeClients , PoolTimeout . Value ) )
226+ {
227+ poolTimedOut = true ;
228+ break ;
229+ }
230+ }
231+ else
232+ {
233+ Monitor . Wait ( writeClients , RecheckPoolAfterMs ) ;
234+ }
235+ }
236+
237+ //inActiveClient != null only for Valid InActive Clients
238+ if ( inActiveClient != null )
239+ {
240+ WritePoolIndex ++ ;
241+ inActiveClient . Active = true ;
242+
243+ InitClient ( inActiveClient ) ;
244+
245+ return inActiveClient ;
222246 }
223- else
224- Monitor . Wait ( writeClients , RecheckPoolAfterMs ) ;
225247 }
226248
227- WritePoolIndex ++ ;
228- inActiveClient . Active = true ;
249+ if ( poolTimedOut )
250+ throw new TimeoutException ( PoolTimeoutError ) ;
229251
230- InitClient ( inActiveClient ) ;
252+ //Reaches here when there's no Valid InActive Clients
253+ try
254+ {
255+ //inactivePoolIndex = index of reservedSlot || index of invalid client
256+ var existingClient = writeClients [ inactivePoolIndex ] ;
257+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
258+ {
259+ RedisState . DeactivateClient ( existingClient ) ;
260+ }
261+
262+ var newClient = InitNewClient ( RedisResolver . CreateMasterClient ( inactivePoolIndex ) ) ;
231263
264+ //Put all blocking I/O or potential Exceptions are before lock
265+ lock ( writeClients )
266+ {
267+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
268+ if ( writeClients [ inactivePoolIndex ] != existingClient )
269+ {
270+ if ( Log . IsDebugEnabled )
271+ Log . Debug ( "writeClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( writeClients [ inactivePoolIndex ] ) ) ;
272+
273+ return newClient ; //return client outside of pool
274+ }
275+
276+ WritePoolIndex ++ ;
277+ writeClients [ inactivePoolIndex ] = newClient ;
278+ return newClient ;
279+ }
280+ }
281+ catch
282+ {
283+ //Revert free-slot for any I/O exceptions that can throw (before lock)
284+ lock ( writeClients )
285+ {
286+ writeClients [ inactivePoolIndex ] = null ; //free slot
287+ }
288+ throw ;
289+ }
290+ }
291+ finally
292+ {
232293 RedisState . DisposeExpiredClients ( ) ;
294+ }
295+ }
233296
234- return inActiveClient ;
297+ class ReservedClient : RedisClient
298+ {
299+ public ReservedClient ( )
300+ {
301+ this . DeactivatedAt = DateTime . UtcNow ;
235302 }
303+
304+ public override void Dispose ( ) { }
236305 }
237306
307+ static readonly ReservedClient reservedSlot = new ReservedClient ( ) ;
308+
238309 /// <summary>
239310 /// Called within a lock
240311 /// </summary>
241312 /// <returns></returns>
242- private RedisClient GetInActiveWriteClient ( )
313+ private int GetInActiveWriteClient ( out RedisClient inactiveClient )
243314 {
244- var desiredIndex = WritePoolIndex % writeClients . Length ;
245315 //this will loop through all hosts in readClients once even though there are 2 for loops
246316 //both loops are used to try to get the prefered host according to the round robin algorithm
247317 var readWriteTotal = RedisResolver . ReadWriteHostsCount ;
318+ var desiredIndex = WritePoolIndex % writeClients . Length ;
248319 for ( int x = 0 ; x < readWriteTotal ; x ++ )
249320 {
250321 var nextHostIndex = ( desiredIndex + x ) % readWriteTotal ;
251322 for ( var i = nextHostIndex ; i < writeClients . Length ; i += readWriteTotal )
252323 {
253324 if ( writeClients [ i ] != null && ! writeClients [ i ] . Active && ! writeClients [ i ] . HadExceptions )
254- return writeClients [ i ] ;
255-
256- if ( writeClients [ i ] == null || writeClients [ i ] . HadExceptions )
257325 {
258- if ( writeClients [ i ] != null )
259- RedisState . DeactivateClient ( writeClients [ i ] ) ;
326+ inactiveClient = writeClients [ i ] ;
327+ return i ;
328+ }
260329
261- var client = InitNewClient ( RedisResolver . CreateMasterClient ( nextHostIndex ) ) ;
262- writeClients [ i ] = client ;
330+ if ( writeClients [ i ] == null )
331+ {
332+ writeClients [ i ] = reservedSlot ;
333+ inactiveClient = null ;
334+ return i ;
335+ }
263336
264- return client ;
337+ if ( writeClients [ i ] != reservedSlot && writeClients [ i ] . HadExceptions )
338+ {
339+ inactiveClient = null ;
340+ return i ;
265341 }
266342 }
267343 }
268- return null ;
269- }
270-
271- private RedisClient InitNewClient ( RedisClient client )
272- {
273- client . Id = Interlocked . Increment ( ref RedisClientCounter ) ;
274- client . ClientManager = this ;
275- client . ConnectionFilter = ConnectionFilter ;
276- if ( NamespacePrefix != null )
277- client . NamespacePrefix = NamespacePrefix ;
278- return client ;
279- }
280-
281- private void InitClient ( RedisClient client )
282- {
283- if ( this . ConnectTimeout != null )
284- client . ConnectTimeout = this . ConnectTimeout . Value ;
285- if ( this . SocketSendTimeout . HasValue )
286- client . SendTimeout = this . SocketSendTimeout . Value ;
287- if ( this . SocketReceiveTimeout . HasValue )
288- client . ReceiveTimeout = this . SocketReceiveTimeout . Value ;
289- if ( this . IdleTimeOutSecs . HasValue )
290- client . IdleTimeOutSecs = this . IdleTimeOutSecs . Value ;
291- if ( this . NamespacePrefix != null )
292- client . NamespacePrefix = NamespacePrefix ;
293- if ( Db != null && client . Db != Db ) //Reset database to default if changed
294- client . ChangeDb ( Db . Value ) ;
344+ inactiveClient = null ;
345+ return - 1 ;
295346 }
296347
297348 /// <summary>
@@ -300,39 +351,101 @@ private void InitClient(RedisClient client)
300351 /// <returns></returns>
301352 public virtual IRedisClient GetReadOnlyClient ( )
302353 {
303- lock ( readClients )
354+ try
304355 {
305- AssertValidReadOnlyPool ( ) ;
306-
307- RedisClient inActiveClient ;
308- while ( ( inActiveClient = GetInActiveReadClient ( ) ) == null )
356+ var poolTimedOut = false ;
357+ var inactivePoolIndex = - 1 ;
358+ lock ( readClients )
309359 {
310- if ( PoolTimeout . HasValue )
360+ AssertValidReadOnlyPool ( ) ;
361+
362+ RedisClient inActiveClient ;
363+ while ( ( inactivePoolIndex = GetInActiveReadClient ( out inActiveClient ) ) == - 1 )
311364 {
312- // wait for a connection, cry out if made to wait too long
313- if ( ! Monitor . Wait ( readClients , PoolTimeout . Value ) )
314- throw new TimeoutException ( PoolTimeoutError ) ;
365+ if ( PoolTimeout . HasValue )
366+ {
367+ // wait for a connection, break out if made to wait too long
368+ if ( ! Monitor . Wait ( readClients , PoolTimeout . Value ) )
369+ {
370+ poolTimedOut = true ;
371+ break ;
372+ }
373+ }
374+ else
375+ {
376+ Monitor . Wait ( readClients , RecheckPoolAfterMs ) ;
377+ }
378+ }
379+
380+ //inActiveClient != null only for Valid InActive Clients
381+ if ( inActiveClient != null )
382+ {
383+ ReadPoolIndex ++ ;
384+ inActiveClient . Active = true ;
385+
386+ InitClient ( inActiveClient ) ;
387+
388+ return inActiveClient ;
315389 }
316- else
317- Monitor . Wait ( readClients , RecheckPoolAfterMs ) ;
318390 }
319391
320- ReadPoolIndex ++ ;
321- inActiveClient . Active = true ;
392+ if ( poolTimedOut )
393+ throw new TimeoutException ( PoolTimeoutError ) ;
322394
323- InitClient ( inActiveClient ) ;
395+ //Reaches here when there's no Valid InActive Clients
396+ try
397+ {
398+ //inactivePoolIndex = index of reservedSlot || index of invalid client
399+ var existingClient = readClients [ inactivePoolIndex ] ;
400+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
401+ {
402+ RedisState . DeactivateClient ( existingClient ) ;
403+ }
324404
325- RedisState . DisposeExpiredClients ( ) ;
405+ var newClient = InitNewClient ( RedisResolver . CreateSlaveClient ( inactivePoolIndex ) ) ;
326406
327- return inActiveClient ;
407+ //Put all blocking I/O or potential Exceptions are before lock
408+ lock ( readClients )
409+ {
410+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
411+ if ( readClients [ inactivePoolIndex ] != existingClient )
412+ {
413+ if ( Log . IsDebugEnabled )
414+ Log . Debug ( "readClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( readClients [ inactivePoolIndex ] ) ) ;
415+
416+ Interlocked . Increment ( ref RedisState . TotalClientsCreatedOutsidePool ) ;
417+
418+ //Don't handle callbacks for new client outside pool
419+ newClient . ClientManager = null ;
420+ return newClient ; //return client outside of pool
421+ }
422+
423+ ReadPoolIndex ++ ;
424+ readClients [ inactivePoolIndex ] = newClient ;
425+ return newClient ;
426+ }
427+ }
428+ catch
429+ {
430+ //Revert free-slot for any I/O exceptions that can throw
431+ lock ( readClients )
432+ {
433+ readClients [ inactivePoolIndex ] = null ; //free slot
434+ }
435+ throw ;
436+ }
437+ }
438+ finally
439+ {
440+ RedisState . DisposeExpiredClients ( ) ;
328441 }
329442 }
330443
331444 /// <summary>
332445 /// Called within a lock
333446 /// </summary>
334447 /// <returns></returns>
335- private RedisClient GetInActiveReadClient ( )
448+ private int GetInActiveReadClient ( out RedisClient inactiveClient )
336449 {
337450 var desiredIndex = ReadPoolIndex % readClients . Length ;
338451 //this will loop through all hosts in readClients once even though there are 2 for loops
@@ -344,21 +457,56 @@ private RedisClient GetInActiveReadClient()
344457 for ( var i = nextHostIndex ; i < readClients . Length ; i += readOnlyTotal )
345458 {
346459 if ( readClients [ i ] != null && ! readClients [ i ] . Active && ! readClients [ i ] . HadExceptions )
347- return readClients [ i ] ;
348-
349- if ( readClients [ i ] == null || readClients [ i ] . HadExceptions )
350460 {
351- if ( readClients [ i ] != null )
352- RedisState . DeactivateClient ( readClients [ i ] ) ;
461+ inactiveClient = readClients [ i ] ;
462+ return i ;
463+ }
353464
354- var client = InitNewClient ( RedisResolver . CreateSlaveClient ( nextHostIndex ) ) ;
355- readClients [ i ] = client ;
465+ if ( readClients [ i ] == null )
466+ {
467+ readClients [ i ] = reservedSlot ;
468+ inactiveClient = null ;
469+ return i ;
470+ }
356471
357- return client ;
472+ if ( readClients [ i ] != reservedSlot && readClients [ i ] . HadExceptions )
473+ {
474+ inactiveClient = null ;
475+ return i ;
358476 }
359477 }
360478 }
361- return null ;
479+ inactiveClient = null ;
480+ return - 1 ;
481+ }
482+
483+ private RedisClient InitNewClient ( RedisClient client )
484+ {
485+ client . Id = Interlocked . Increment ( ref RedisClientCounter ) ;
486+ client . Active = true ;
487+ client . ClientManager = this ;
488+ client . ConnectionFilter = ConnectionFilter ;
489+ if ( NamespacePrefix != null )
490+ client . NamespacePrefix = NamespacePrefix ;
491+
492+ return InitClient ( client ) ;
493+ }
494+
495+ private RedisClient InitClient ( RedisClient client )
496+ {
497+ if ( this . ConnectTimeout != null )
498+ client . ConnectTimeout = this . ConnectTimeout . Value ;
499+ if ( this . SocketSendTimeout . HasValue )
500+ client . SendTimeout = this . SocketSendTimeout . Value ;
501+ if ( this . SocketReceiveTimeout . HasValue )
502+ client . ReceiveTimeout = this . SocketReceiveTimeout . Value ;
503+ if ( this . IdleTimeOutSecs . HasValue )
504+ client . IdleTimeOutSecs = this . IdleTimeOutSecs . Value ;
505+ if ( this . NamespacePrefix != null )
506+ client . NamespacePrefix = NamespacePrefix ;
507+ if ( Db != null && client . Db != Db ) //Reset database to default if changed
508+ client . ChangeDb ( Db . Value ) ;
509+ return client ;
362510 }
363511
364512 public void DisposeClient ( RedisNativeClient client )
0 commit comments