44var $rdf = require ( 'rdflib' ) ;
55var redis = require ( 'redis' ) ;
66var debug = require ( './logging' ) . subscription ;
7-
87var utils = require ( './utils.js' ) ;
9-
10- var subscriptions = { } ; // Map URI to array of watchers
11- var SSEsubscriptions = { } ;
12-
138var PATCH = $rdf . Namespace ( 'http://www.w3.org/ns/pim/patch#' ) ;
149
1510exports . subscribeToChanges_SSE = function ( req , res ) {
16- var options = req . app . locals . ldp ;
11+ var ldp = req . app . locals . ldp ;
1712
1813 var messageCount ;
1914 debug ( "Server Side Events subscription" ) ;
20- var targetPath = req . originalUrl . slice ( 0 , - options . suffixChanges . length ) ; // lop off ',events'
21- if ( SSEsubscriptions [ targetPath ] === undefined ) {
22- SSEsubscriptions [ targetPath ] = redis . createClient ( ) ;
15+ var targetPath = req . originalUrl . slice ( 0 , - ldp . suffixChanges . length ) ; // lop off ',events'
16+ if ( ldp . SSEsubscriptions [ targetPath ] === undefined ) {
17+ ldp . SSEsubscriptions [ targetPath ] = redis . createClient ( ) ;
2318 }
24- var subscriber = SSEsubscriptions [ targetPath ] ;
19+ var subscriber = ldp . SSEsubscriptions [ targetPath ] ;
2520 debug ( "Server Side Events subscription: " + targetPath ) ;
2621
2722 subscriber . subscribe ( 'updates' ) ;
@@ -60,9 +55,9 @@ exports.subscribeToChanges_SSE = function(req, res) {
6055
6156exports . publishDelta_SSE = function ( req , res , patchKB , targetURI ) {
6257 // @@ TODO
63- var options = req . app . locals . ldp ;
64- var targetPath = req . originalUrl . slice ( 0 , - options . suffixChanges . length ) ; // lop off ',changes'
65- var publisherClient = SSEsubscriptions [ targetPath ] ;
58+ var ldp = req . app . locals . ldp ;
59+ var targetPath = req . originalUrl . slice ( 0 , - ldp . suffixChanges . length ) ; // lop off ',changes'
60+ var publisherClient = ldp . SSEsubscriptions [ targetPath ] ;
6661 publisherClient . publish ( 'updates' , ( '"' + targetPath + '" data changed visited' ) ) ;
6762} ;
6863
@@ -73,29 +68,29 @@ var DelayedResponse = require('http-delayed-response');
7368
7469
7570exports . subscribeToChangesLongPoll = function ( req , res ) {
76- var options = req . app . locals . ldp ;
77- var targetPath = req . originalUrl . slice ( 0 , - options . suffixChanges . length ) ; // lop off ',changes'
78- if ( subscriptions [ targetPath ] === undefined ) {
79- subscriptions [ targetPath ] = [ ] ;
71+ var ldp = req . app . locals . ldp ;
72+ var targetPath = req . originalUrl . slice ( 0 , - ldp . suffixChanges . length ) ; // lop off ',changes'
73+ if ( ldp . subscriptions [ targetPath ] === undefined ) {
74+ ldp . subscriptions [ targetPath ] = [ ] ;
8075 }
8176
8277 var delayed = new DelayedResponse ( req , res ) ;
8378
8479 var subscription = { 'targetPath' : targetPath ,
8580 'request' : req , 'response' : res , 'delayed' : delayed ,
8681 'timestamp' : utils . timestamp ( ) } ;
87- subscriptions [ targetPath ] . push ( subscription ) ;
82+ ldp . subscriptions [ targetPath ] . push ( subscription ) ;
8883
8984 var unsubscribe = function ( ) {
90- for ( var i = 0 ; i < subscriptions [ targetPath ] . length ; i ++ ) {
91- if ( subscriptions [ targetPath ] [ i ] === subscription ) {
92- subscriptions [ targetPath ] = subscriptions [ targetPath ] . splice ( i , 1 ) ;
93- debug ( "UNSUBSCRIBED " + targetPath + " now " + subscriptions [ targetPath ] . length ) ;
85+ for ( var i = 0 ; i < ldp . subscriptions [ targetPath ] . length ; i ++ ) {
86+ if ( ldp . subscriptions [ targetPath ] [ i ] === ldp . subscription ) {
87+ ldp . subscriptions [ targetPath ] = ldp . subscriptions [ targetPath ] . splice ( i , 1 ) ;
88+ debug ( "UNSUBSCRIBED " + targetPath + " now " + ldp . subscriptions [ targetPath ] . length ) ;
9489 return ;
9590 }
9691 }
9792 debug ( "ERROR - COULD NOT FIND SUB of " + subscription . timestamp +
98- " for " + targetPath + " now " + subscriptions [ targetPath ] . length ) ;
93+ " for " + targetPath + " now " + ldp . subscriptions [ targetPath ] . length ) ;
9994
10095 } ;
10196
@@ -122,7 +117,7 @@ exports.subscribeToChangesLongPoll = function(req, res) {
122117 // was: res.setTimeout
123118 req . socket . setTimeout ( 0 ) ; // Disable timeout (does this work??)
124119
125- debug ( "LONG POLL : Now " + subscriptions [ targetPath ] . length + " subscriptions for " + targetPath ) ;
120+ debug ( "LONG POLL : Now " + ldp . subscriptions [ targetPath ] . length + " subscriptions for " + targetPath ) ;
126121
127122} ;
128123
@@ -152,12 +147,12 @@ exports.publishDelta = function (req, res, patchKB, targetURI){
152147} ;
153148
154149exports . publishDelta_LongPoll = function ( req , res , patchData , targetURI ) {
155- var options = req . app . locals . ldp ;
156- debug ( " Long poll change subscription count " + ( subscriptions [ req . originalUrl ] || [ ] ) . length ) ;
157- if ( ! subscriptions [ req . originalUrl ] ) return ;
158- subscriptions [ req . originalUrl ] . map ( function ( subscription ) {
150+ var ldp = req . app . locals . ldp ;
151+ debug ( " Long poll change subscription count " + ( ldp . subscriptions [ req . originalUrl ] || [ ] ) . length ) ;
152+ if ( ! ldp . subscriptions [ req . originalUrl ] ) return ;
153+ ldp . subscriptions [ req . originalUrl ] . map ( function ( subscription ) {
159154 debug ( " Long poll change to " + req . originalUrl ) ;
160- if ( options . leavePatchConnectionOpen ) {
155+ if ( ldp . leavePatchConnectionOpen ) {
161156 subscription . response . write ( patchData ) ;
162157 } else {
163158 // debug(" --- headersSent 2 " + res.headersSent);
@@ -166,6 +161,6 @@ exports.publishDelta_LongPoll = function (req, res, patchData, targetURI){
166161 }
167162 } ) ;
168163
169- subscriptions [ req . originalUrl ] = [ ] ; // one-off polll
164+ ldp . subscriptions [ req . originalUrl ] = [ ] ; // one-off polll
170165 debug ( "LONG POLL : Now NO subscriptions for " + targetURI ) ;
171166} ;
0 commit comments