11using System ;
22using System . Collections . Concurrent ;
3+ using System . Diagnostics ;
34using System . Threading ;
45using JetBrains . Annotations ;
56
@@ -8,8 +9,10 @@ namespace Npgsql
89 sealed class SingleThreadSynchronizationContext : SynchronizationContext , IDisposable
910 {
1011 readonly BlockingCollection < CallbackAndState > _tasks = new BlockingCollection < CallbackAndState > ( ) ;
12+ readonly object _lockObject = new object ( ) ;
1113 [ CanBeNull ]
12- Thread _thread ;
14+ volatile Thread _thread ;
15+ bool _doingWork ;
1316
1417 const int ThreadStayAliveMs = 10000 ;
1518 readonly string _threadName ;
@@ -23,12 +26,16 @@ public override void Post(SendOrPostCallback callback, object state)
2326 {
2427 _tasks . Add ( new CallbackAndState { Callback = callback , State = state } ) ;
2528
26- if ( _thread == null )
29+ lock ( _lockObject )
2730 {
28- lock ( this )
31+ if ( ! _doingWork )
2932 {
30- if ( _thread != null )
31- return ;
33+ // Either there is no thread, or the current thread is exiting
34+ // In which case, wait for it to complete
35+ var currentThread = _thread ;
36+ currentThread ? . Join ( ) ;
37+ Debug . Assert ( _thread is null ) ;
38+ _doingWork = true ;
3239 _thread = new Thread ( WorkLoop ) { Name = _threadName , IsBackground = true } ;
3340 _thread . Start ( ) ;
3441 }
@@ -38,10 +45,10 @@ public override void Post(SendOrPostCallback callback, object state)
3845 public void Dispose ( )
3946 {
4047 _tasks . CompleteAdding ( ) ;
41- lock ( this )
42- {
43- _thread ? . Join ( ) ;
44- }
48+ var thread = _thread ;
49+ thread ? . Join ( ) ;
50+
51+ _tasks . Dispose ( ) ;
4552 }
4653
4754 void WorkLoop ( )
@@ -54,13 +61,40 @@ void WorkLoop()
5461 {
5562 var taken = _tasks . TryTake ( out var callbackAndState , ThreadStayAliveMs ) ;
5663 if ( ! taken )
57- return ;
58- callbackAndState . Callback ( callbackAndState . State ) ;
64+ {
65+ lock ( _lockObject )
66+ {
67+ if ( _tasks . Count == 0 )
68+ {
69+ _doingWork = false ;
70+ return ;
71+ }
72+ }
73+
74+ continue ;
75+ }
76+
77+ try
78+ {
79+ Debug . Assert ( _doingWork ) ;
80+ callbackAndState . Callback ( callbackAndState . State ) ;
81+ }
82+ catch ( Exception )
83+ {
84+ // No logging until 5.0
85+ }
5986 }
6087 }
88+ catch ( Exception )
89+ {
90+ // Here we attempt to catch any exception coming from BlockingCollection _tasks
91+ lock ( _lockObject )
92+ _doingWork = false ;
93+ }
6194 finally
6295 {
63- lock ( this ) { _thread = null ; }
96+ Debug . Assert ( ! _doingWork ) ;
97+ _thread = null ;
6498 }
6599 }
66100
0 commit comments