forked from npgsql/npgsql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSingleThreadSynchronizationContext.cs
More file actions
123 lines (104 loc) · 3.42 KB
/
SingleThreadSynchronizationContext.cs
File metadata and controls
123 lines (104 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using Microsoft.Extensions.Logging;
namespace Npgsql;
sealed class SingleThreadSynchronizationContext : SynchronizationContext, IDisposable
{
readonly BlockingCollection<CallbackAndState> _tasks = new();
readonly object _lockObject = new();
volatile Thread? _thread;
bool _doingWork;
const int ThreadStayAliveMs = 10000;
readonly string _threadName;
static readonly ILogger Logger = NpgsqlLoggingConfiguration.ConnectionLogger;
internal SingleThreadSynchronizationContext(string threadName)
=> _threadName = threadName;
internal Disposable Enter() => new(this);
public override void Post(SendOrPostCallback callback, object? state)
{
_tasks.Add(new CallbackAndState { Callback = callback, State = state });
lock (_lockObject)
{
if (!_doingWork)
{
// Either there is no thread, or the current thread is exiting
// In which case, wait for it to complete
var currentThread = _thread;
currentThread?.Join();
Debug.Assert(_thread is null);
_doingWork = true;
_thread = new Thread(WorkLoop) { Name = _threadName, IsBackground = true };
_thread.Start();
}
}
}
public void Dispose()
{
_tasks.CompleteAdding();
var thread = _thread;
thread?.Join();
_tasks.Dispose();
}
void WorkLoop()
{
SetSynchronizationContext(this);
try
{
while (true)
{
var taken = _tasks.TryTake(out var callbackAndState, ThreadStayAliveMs);
if (!taken)
{
lock (_lockObject)
{
if (_tasks.Count == 0)
{
_doingWork = false;
return;
}
}
continue;
}
try
{
Debug.Assert(_doingWork);
callbackAndState.Callback(callbackAndState.State);
}
catch (Exception e)
{
Logger.LogError(e, $"Exception caught in {nameof(SingleThreadSynchronizationContext)}");
}
}
}
catch (Exception e)
{
// Here we attempt to catch any exception coming from BlockingCollection _tasks
Logger.LogError(e, $"Exception caught in {nameof(SingleThreadSynchronizationContext)}");
lock (_lockObject)
_doingWork = false;
}
finally
{
Debug.Assert(!_doingWork);
_thread = null;
}
}
struct CallbackAndState
{
internal SendOrPostCallback Callback;
internal object? State;
}
internal readonly struct Disposable : IDisposable
{
readonly SynchronizationContext? _synchronizationContext;
internal Disposable(SynchronizationContext synchronizationContext)
{
_synchronizationContext = Current;
SetSynchronizationContext(synchronizationContext);
}
public void Dispose()
=> SetSynchronizationContext(_synchronizationContext);
}
}