-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
Copy pathServerDispatcher.cs
342 lines (303 loc) · 12.3 KB
/
ServerDispatcher.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Globalization;
using Microsoft.CodeAnalysis.CommandLine;
using Microsoft.CodeAnalysis.Symbols;
using Microsoft.CodeAnalysis.CSharp;
namespace Microsoft.CodeAnalysis.CompilerServer
{
/// <summary>
/// This class manages the connections, timeout and general scheduling of the client
/// requests.
/// </summary>
internal sealed class ServerDispatcher
{
private enum State
{
/// <summary>
/// Server running and accepting all requests
/// </summary>
Running,
/// <summary>
/// Server is in the process of shutting down. New connections will not be accepted.
/// </summary>
ShuttingDown,
/// <summary>
/// Server is done.
/// </summary>
Completed,
}
/// <summary>
/// Default time the server will stay alive after the last request disconnects.
/// </summary>
internal static readonly TimeSpan DefaultServerKeepAlive = TimeSpan.FromMinutes(10);
/// <summary>
/// Time to delay after the last connection before initiating a garbage collection
/// in the server.
/// </summary>
internal static readonly TimeSpan GCTimeout = TimeSpan.FromSeconds(30);
private readonly ICompilerServerHost _compilerServerHost;
private readonly ICompilerServerLogger _logger;
private readonly IClientConnectionHost _clientConnectionHost;
private readonly IDiagnosticListener _diagnosticListener;
private State _state;
private Task? _timeoutTask;
private Task? _gcTask;
private Task<IClientConnection>? _listenTask;
private readonly List<Task<CompletionData>> _connectionList = new List<Task<CompletionData>>();
private TimeSpan? _keepAlive;
private bool _keepAliveIsDefault;
internal ServerDispatcher(ICompilerServerHost compilerServerHost, IClientConnectionHost clientConnectionHost, IDiagnosticListener? diagnosticListener = null)
{
_compilerServerHost = compilerServerHost;
_logger = compilerServerHost.Logger;
_clientConnectionHost = clientConnectionHost;
_diagnosticListener = diagnosticListener ?? new EmptyDiagnosticListener();
}
/// <summary>
/// This function will accept and process new connections until an event causes
/// the server to enter a passive shut down mode. For example if analyzers change
/// or the keep alive timeout is hit. At which point this function will cease
/// accepting new connections and wait for existing connections to complete before
/// returning.
/// </summary>
public void ListenAndDispatchConnections(TimeSpan? keepAlive, CancellationToken cancellationToken = default)
{
_state = State.Running;
_keepAlive = keepAlive;
_keepAliveIsDefault = true;
try
{
_clientConnectionHost.BeginListening();
ListenAndDispatchConnectionsCore(cancellationToken);
}
finally
{
_state = State.Completed;
_gcTask = null;
_timeoutTask = null;
if (_clientConnectionHost.IsListening)
{
_clientConnectionHost.EndListening();
}
if (_listenTask is not null)
{
// This type is responsible for cleaning up resources associated with _listenTask. Once EndListening
// is complete this task is guaranteed to be either completed or have a task scheduled to complete
// it. If it ran to completion we need to dispose of the value.
if (!_listenTask.IsCompleted)
{
// Wait for the task to complete
_listenTask.ContinueWith(_ => { }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
.Wait(CancellationToken.None);
}
if (_listenTask.Status == TaskStatus.RanToCompletion)
{
try
{
_listenTask.Result.Dispose();
}
catch (Exception ex)
{
_logger.LogException(ex, $"Error disposing of {nameof(_listenTask)}");
}
}
}
}
_logger.Log($"End ListenAndDispatchConnections");
}
public void ListenAndDispatchConnectionsCore(CancellationToken cancellationToken)
{
do
{
MaybeCreateListenTask();
MaybeCreateTimeoutTask();
MaybeCreateGCTask();
WaitForAnyCompletion(cancellationToken);
CheckCompletedTasks(cancellationToken);
} while (_connectionList.Count > 0 || _state == State.Running);
}
private void CheckCompletedTasks(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
ChangeToShuttingDown("Server cancellation");
Debug.Assert(_gcTask is null);
Debug.Assert(_timeoutTask is null);
}
if (_listenTask?.IsCompleted == true)
{
_diagnosticListener.ConnectionReceived();
var connectionTask = ProcessClientConnectionAsync(
_compilerServerHost,
_listenTask,
allowCompilationRequests: _state == State.Running,
cancellationToken);
_connectionList.Add(connectionTask);
// Timeout and GC are only done when there are no active connections. Now that we have a new
// connection cancel out these tasks.
_timeoutTask = null;
_gcTask = null;
_listenTask = null;
}
if (_timeoutTask?.IsCompleted == true)
{
_diagnosticListener.KeepAliveReached();
ChangeToShuttingDown("Keep alive hit");
}
if (_gcTask?.IsCompleted == true)
{
RunGC();
}
HandleCompletedConnections();
}
/// <summary>
/// The server farms out work to Task values and this method needs to wait until at least one of them
/// has completed.
/// </summary>
private void WaitForAnyCompletion(CancellationToken cancellationToken)
{
var all = new List<Task>();
all.AddRange(_connectionList);
AddNonNull(_timeoutTask);
AddNonNull(_listenTask);
AddNonNull(_gcTask);
try
{
Task.WaitAny(all.ToArray(), cancellationToken);
}
catch (OperationCanceledException)
{
// Thrown when the provided cancellationToken is cancelled. This is handled in the caller,
// here it just serves to break out of the WaitAny call.
}
void AddNonNull(Task? task)
{
if (task is object)
{
all.Add(task);
}
}
}
private void ChangeToShuttingDown(string reason)
{
if (_state == State.ShuttingDown)
{
return;
}
_logger.Log($"Shutting down server: {reason}");
Debug.Assert(_state == State.Running);
Debug.Assert(_clientConnectionHost.IsListening);
_state = State.ShuttingDown;
_timeoutTask = null;
_gcTask = null;
}
private void RunGC()
{
_gcTask = null;
GC.GetTotalMemory(forceFullCollection: true);
}
private void MaybeCreateListenTask()
{
if (_listenTask is null)
{
_listenTask = _clientConnectionHost.GetNextClientConnectionAsync();
}
}
private void MaybeCreateTimeoutTask()
{
// If there are no active clients running then the server needs to be in a timeout mode.
if (_state == State.Running && _connectionList.Count == 0 && _timeoutTask is null && _keepAlive.HasValue)
{
Debug.Assert(_listenTask != null);
_timeoutTask = Task.Delay(_keepAlive.Value);
}
}
private void MaybeCreateGCTask()
{
if (_state == State.Running && _connectionList.Count == 0 && _gcTask is null)
{
_gcTask = Task.Delay(GCTimeout);
}
}
/// <summary>
/// Checks the completed connection objects and updates the server state based on their
/// results.
/// </summary>
private void HandleCompletedConnections()
{
var shutdown = false;
var i = 0;
while (i < _connectionList.Count)
{
var current = _connectionList[i];
if (!current.IsCompleted)
{
i++;
continue;
}
_connectionList.RemoveAt(i);
// These task should never fail. Unexpected errors will be caught and translated into
// a RequestError message
Debug.Assert(current.Status == TaskStatus.RanToCompletion);
var completionData = current.Result;
switch (completionData.Reason)
{
case CompletionReason.RequestCompleted:
_logger.Log("Client request completed");
if (completionData.NewKeepAlive is { } keepAlive)
{
_logger.Log($"Client changed keep alive to {keepAlive}");
ChangeKeepAlive(keepAlive);
}
if (completionData.ShutdownRequest)
{
_logger.Log("Client requested shutdown");
shutdown = true;
}
break;
case CompletionReason.RequestError:
_logger.LogError("Client request failed");
shutdown = true;
break;
default:
_logger.LogError("Unexpected enum value");
shutdown = true;
break;
}
_diagnosticListener.ConnectionCompleted(completionData);
}
if (shutdown)
{
ChangeToShuttingDown("Error handling client connection");
}
}
private void ChangeKeepAlive(TimeSpan keepAlive)
{
if (_keepAliveIsDefault || !_keepAlive.HasValue || keepAlive > _keepAlive.Value)
{
_keepAlive = keepAlive;
_keepAliveIsDefault = false;
_diagnosticListener.UpdateKeepAlive(keepAlive);
}
}
internal static async Task<CompletionData> ProcessClientConnectionAsync(
ICompilerServerHost compilerServerHost,
Task<IClientConnection> clientStreamTask,
bool allowCompilationRequests,
CancellationToken cancellationToken)
{
var clientHandler = new ClientConnectionHandler(compilerServerHost);
return await clientHandler.ProcessAsync(clientStreamTask, allowCompilationRequests, cancellationToken).ConfigureAwait(false);
}
}
}