This repository was archived by the owner on Aug 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 605
Expand file tree
/
Copy pathexpressify.coffee
More file actions
176 lines (129 loc) · 5.54 KB
/
expressify.coffee
File metadata and controls
176 lines (129 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
onceReady = (bongo, callback) ->
return callback null if bongo.dbClientReady
bongo.once 'dbClientReady', callback
module.exports = (options = {}) ->
async = require 'async'
RateLimit = require('ratelimit.js').RateLimit
bongo = this
{ rateLimitOptions } = options
if rateLimitOptions?.enabled
{ userRules, guestRules } = rateLimitOptions
rateLimiter = new RateLimit bongo.redisClient, userRules
sendMetrics = (start, message) ->
duration = new Date() - start
{ method } = message
bongo.metrics.sendMethodMetrics { method, duration }
rateLimit = (id, weight, callback) ->
return callback no unless rateLimitOptions.enabled
weight ?= 1
# if id start with guest- use guest rules for the user
rateLimiter.rules = if /^guest-/.test id
then rateLimiter.convertRules guestRules
else rateLimiter.convertRules userRules
rateLimiter.incr id, weight, (err, isRateLimited) ->
console.log "[Expressify] Rate limit error occurred: #{err}" if err
callback isRateLimited
queueCount = 0
OBSERVE_QUEUE = {}
observeQueue = (id, calls, responses, callback) ->
OBSERVE_QUEUE[id] = yes
setTimeout ->
return unless OBSERVE_QUEUE[id]
console.log '[Expressify] Unhandled queue:', calls
OBSERVE_QUEUE[id] = null
callback responses
, 15000
process = (req, client, callback) ->
[client, callback] = [null, client] unless callback
{ channelName, queue } = req.body
queueId = queueCount++
# reset on every ~2k calls
queueCount = 0 if queueCount > 2048
responses = new Array queue.length
calls = queue.map (message, i) ->
{ method: "#{message.method.method ? message.method}", called: no }
workQueue = queue.map (message, i) -> (fin) ->
start = new Date()
# Generate timed out response which will be replaced with
# the real response if it works in the given time.
responses[i] = {
arguments : [{ message: 'Timed out' }]
callbacks : {}
# In the working queue req.callbacks is an object
# uses callbackId's as key and currently we only
# support one callback per req. with following
# we're putting this callbackId as returned method
# name. this can be explained like in following example; ~GG
#
# callbacks = { "12": Object }
# keys = Object.keys callbacks
# key = keys[0]
# method = +key # which makes it number. equals to Number(key)
#
# method will be 12
#
method : +(Object.keys message.callbacks)[0]
}
bongo.handleRequest channelName, { message, client }, (secretName, callbackId, args) ->
sendMetrics start, message if bongo.metrics
responses[i].method = callbackId
bongo.scrubResponse callbackId, args, (message) ->
responses[i] = message
calls[i].called = yes
fin()
observeQueue queueId, calls, responses, callback
async.parallel workQueue, ->
# If a given queue couldn't completed all items in it in 7sec.
# it will be returned with already completed results by
# observeQueue helper and it will be removed from OBSERVER_QUEUE
# In this case if somehow requests in the queue completes the process
# and hits async.parallel callback this will cause multiple responses
# from here. To prevent we're checking it out it's status in the queue
# here. Same case is valid and handled in observeQueue helper as well ~GG
callback responses if OBSERVE_QUEUE[queueId]
OBSERVE_QUEUE[queueId] = null
(req, res, next) ->
{ sessionToken, userArea, customContext, queue } = req.body
unless queue?.length
return res.status(400).end()
sendResponse = (payload) ->
if _processPayload = options.processPayload
_processPayload payload, (_payload) ->
res.send _payload
else
res.send payload
rateLimitContext = customContext ? sessionToken
if sessionToken and userArea
onceReady bongo, ->
bongo.fetchClient sessionToken, userArea, (client) ->
unless client
console.log "[Expressify] #{sessionToken} not found in #{userArea}"
res.clearCookie 'clientId', { path: '/' }
return res.status(500).send 'An error occcurred'
payload = []
healthyQueue = yes
rateLimitContext = customContext ? client.username
queue.forEach (_req) ->
unless _req.callbacks
healthyQueue = no
console.log "
[Expressify] #{sessionToken} has an unhealthy
queue to process, dropping it to floor.
"
return res.status(500).send 'An error occcurred'
if _req.method is 'authenticateUser' and _req.callbacks?
method = +(Object.keys _req.callbacks)[0]
account = client.connection.delegate
payload.push { arguments: [ account ], callbacks: {}, method }
return unless healthyQueue
if queue.length is payload.length
sendResponse payload
return
rateLimit rateLimitContext, queue.length, (isRateLimited) ->
return res.status(429).send 'Rate limit exceeded' if isRateLimited
process req, client, sendResponse
else
# use sessionToken as id for rate limiting
rateLimit rateLimitContext, queue.length, (isRateLimited) ->
return res.status(429).send 'Rate limit exceeded' if isRateLimited
process req, sendResponse