1212using ServiceStack . Messaging ;
1313using ServiceStack . Text ;
1414
15+ #if NET6_0_OR_GREATER
16+ using System . Net . Http ;
17+ #endif
18+
1519namespace ServiceStack
1620{
1721 public class ServerEventConnect : ServerEventCommand
@@ -86,8 +90,6 @@ public partial class ServerEventsClient : IDisposable
8690 byte [ ] buffer ;
8791 readonly Encoding encoding = Encoding . UTF8 ;
8892
89- HttpWebRequest httpReq ;
90- HttpWebResponse response ;
9193 CancellationTokenSource cancel ;
9294 private ITimer heartbeatTimer ;
9395
@@ -166,24 +168,47 @@ public string EventStreamUri
166168 public Action OnReconnect ;
167169 public Action < Exception > OnException ;
168170
171+ #if NET6_0_OR_GREATER
172+ public Action < HttpRequestMessage > EventStreamRequestFilter { get ; set ; }
173+ public Action < HttpRequestMessage > HeartbeatRequestFilter { get ; set ; }
174+ public Action < HttpRequestMessage > UnRegisterRequestFilter { get ; set ; }
175+ /// <summary>
176+ /// Apply Request Filter to all ServerEventClient Requests
177+ /// </summary>
178+ public Action < HttpRequestMessage > AllRequestFilters { get ; set ; }
179+ HttpClient httpClient ;
180+ public Func < IServiceClient , HttpClientHandler > HttpClientHandlerFactory { get ; set ; } = client => new ( )
181+ {
182+ UseCookies = true ,
183+ CookieContainer = ( ( IHasCookieContainer ) client ) . CookieContainer ,
184+ UseDefaultCredentials = true ,
185+ AutomaticDecompression =
186+ DecompressionMethods . Brotli | DecompressionMethods . Deflate | DecompressionMethods . GZip ,
187+ } ;
188+ #else
169189 public Action < WebRequest > EventStreamRequestFilter { get ; set ; }
170190 public Action < WebRequest > HeartbeatRequestFilter { get ; set ; }
171191 public Action < WebRequest > UnRegisterRequestFilter { get ; set ; }
172-
173192 /// <summary>
174193 /// Apply Request Filter to all ServerEventClient Requests
175194 /// </summary>
176195 public Action < WebRequest > AllRequestFilters { get ; set ; }
196+ HttpWebRequest httpReq ;
197+ HttpWebResponse response ;
198+ #endif
177199
178- readonly Dictionary < string , List < Action < ServerEventMessage > > > listeners =
179- new Dictionary < string , List < Action < ServerEventMessage > > > ( ) ;
200+ readonly Dictionary < string , List < Action < ServerEventMessage > > > listeners = new ( ) ;
180201
181202 public ServerEventsClient ( string baseUri , params string [ ] channels )
182203 {
183204 this . eventStreamPath = baseUri . CombineWith ( "event-stream" ) ;
184205 this . Channels = channels ;
185206
207+ #if NET6_0_OR_GREATER
208+ this . ServiceClient = new JsonApiClient ( baseUri ) ;
209+ #else
186210 this . ServiceClient = new JsonServiceClient ( baseUri ) ;
211+ #endif
187212
188213 this . Resolver = new NewInstanceResolver ( ) ;
189214 this . ReceiverTypes = new List < Type > ( ) ;
@@ -207,6 +232,25 @@ public ServerEventsClient Start()
207232 {
208233 Interlocked . Increment ( ref timesStarted ) ;
209234
235+ #if NET6_0_OR_GREATER
236+ httpClient = new HttpClient ( HttpClientHandlerFactory ( ServiceClient ) , disposeHandler : true ) ;
237+
238+ var httpReq = new HttpRequestMessage ( HttpMethod . Get , EventStreamUri )
239+ . With ( c => c . Accept = "*/*" ) ;
240+
241+ EventStreamRequestFilter ? . Invoke ( httpReq ) ;
242+ if ( AllRequestFilters != null )
243+ {
244+ AllRequestFilters ( httpReq ) ;
245+ if ( ServiceClient is JsonApiClient apiClient )
246+ apiClient . RequestFilter = AllRequestFilters ;
247+ }
248+
249+ var httpRes = httpClient . Send ( httpReq ) ;
250+ httpRes . EnsureSuccessStatusCode ( ) ;
251+ var stream = httpRes . Content . ReadAsStream ( ) ;
252+
253+ #else
210254 httpReq = ( HttpWebRequest ) WebRequest . Create ( EventStreamUri ) ;
211255 //share auth cookies
212256 httpReq . CookieContainer = ( ( IHasCookieContainer ) ServiceClient ) . CookieContainer ;
@@ -222,6 +266,7 @@ public ServerEventsClient Start()
222266
223267 response = ( HttpWebResponse ) PclExport . Instance . GetResponse ( httpReq ) ;
224268 var stream = response . ResponseStream ( ) ;
269+ #endif
225270
226271 buffer = new byte [ BufferSize ] ;
227272 cancel = new CancellationTokenSource ( ) ;
@@ -249,10 +294,30 @@ public ServerEventsClient Start()
249294 return this ;
250295 }
251296
297+ private bool HasClient ( ) =>
298+ #if NET6_0_OR_GREATER
299+ this . httpClient != null ;
300+ #else
301+ this . httpReq != null ;
302+ #endif
303+
304+ private void UnsetClient ( )
305+ {
306+ #if NET6_0_OR_GREATER
307+ this . httpClient = null ;
308+ #else
309+ using ( response )
310+ {
311+ response = null ;
312+ }
313+ this . httpReq = null ;
314+ #endif
315+ }
316+
252317 private TaskCompletionSource < ServerEventConnect > connectTcs ;
253318 public Task < ServerEventConnect > Connect ( )
254319 {
255- if ( httpReq == null )
320+ if ( ! HasClient ( ) )
256321 Start ( ) ;
257322
258323 return connectTcs . Task ;
@@ -328,7 +393,16 @@ protected void Heartbeat(object state)
328393
329394 EnsureSynchronizationContext ( ) ;
330395
331- ConnectionInfo . HeartbeatUrl . GetStringFromUrlAsync ( requestFilter : req => {
396+ #if NET6_0_OR_GREATER
397+ var taskString = httpClient . SendStringToUrlAsync ( ConnectionInfo . HeartbeatUrl , method : HttpMethods . Get , requestFilter : req => {
398+ HeartbeatRequestFilter ? . Invoke ( req ) ;
399+ AllRequestFilters ? . Invoke ( req ) ;
400+
401+ if ( log . IsDebugEnabled )
402+ log . Debug ( "[SSE-CLIENT] Sending Heartbeat..." ) ;
403+ } ) ;
404+ #else
405+ var taskString = ConnectionInfo . HeartbeatUrl . GetStringFromUrlAsync ( requestFilter : req => {
332406 var hold = httpReq ;
333407 if ( hold != null )
334408 req . CookieContainer = hold . CookieContainer ;
@@ -338,8 +412,10 @@ protected void Heartbeat(object state)
338412
339413 if ( log . IsDebugEnabled )
340414 log . Debug ( "[SSE-CLIENT] Sending Heartbeat..." ) ;
341- } )
342- . Success ( t =>
415+ } ) ;
416+ #endif
417+
418+ taskString . Success ( t =>
343419 {
344420 if ( cancel . IsCancellationRequested )
345421 {
@@ -529,15 +605,14 @@ public void ProcessResponse(Stream stream)
529605 t . ObserveTaskExceptions ( ) ;
530606 if ( cancel . IsCancellationRequested || t . IsCanceled )
531607 {
532- httpReq = null ;
533-
608+ UnsetClient ( ) ;
534609 return ;
535610 }
536611
537612 if ( t . IsFaulted )
538613 {
539614 OnExceptionReceived ( t . Exception ) ;
540- httpReq = null ;
615+ UnsetClient ( ) ;
541616 return ;
542617 }
543618
@@ -780,6 +855,16 @@ public virtual Task InternalStop()
780855 {
781856 EnsureSynchronizationContext ( ) ;
782857 try {
858+ #if NET6_0_OR_GREATER
859+ httpClient . SendStringToUrl ( ConnectionInfo . UnRegisterUrl , method : HttpMethods . Get , requestFilter : req =>
860+ {
861+ if ( log . IsDebugEnabled )
862+ log . Debug ( "[SSE-CLIENT] Unregistering..." ) ;
863+
864+ UnRegisterRequestFilter ? . Invoke ( req ) ;
865+ AllRequestFilters ? . Invoke ( req ) ;
866+ } ) ;
867+ #else
783868 ConnectionInfo . UnRegisterUrl . GetStringFromUrl ( requestFilter : req =>
784869 {
785870 var hold = httpReq ;
@@ -792,16 +877,12 @@ public virtual Task InternalStop()
792877 UnRegisterRequestFilter ? . Invoke ( req ) ;
793878 AllRequestFilters ? . Invoke ( req ) ;
794879 } ) ;
880+ #endif
795881 } catch ( Exception ) { }
796882 }
797883
798- using ( response )
799- {
800- response = null ;
801- }
802-
803884 ConnectionInfo = null ;
804- httpReq = null ;
885+ UnsetClient ( ) ;
805886
806887 return TypeConstants . EmptyTask ;
807888 }
0 commit comments