-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcallback-queue.ts
More file actions
153 lines (132 loc) · 4.05 KB
/
callback-queue.ts
File metadata and controls
153 lines (132 loc) · 4.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import P from 'pino'
import { setTimeout } from 'node:timers/promises'
import * as callbackQueue from '@microfleet/callback-queue'
import { serializeError, deserializeError } from 'serialize-error'
import type { Redis, Cluster } from 'ioredis'
// callback buckets
const queue = new Map<string, callbackQueue.Thunk>()
const { isArray } = Array
export type RedisInstance = Redis | Cluster
export type Publisher = (key: string, err?: Error | null, ...args: any[]) => Promise<void>
export type Consumer = (channel: string, message: string) => void
const kError = new Error('callback called multiple times')
/**
* Call functions stored in local queues
* @param queueName
* @param args
* @param logger
*/
function call(queueName: string, args: any[], logger: P.Logger): void {
const callback = queue.get(queueName)
if (!callback) {
throw kError
}
// these are async anyways - gonna schedule them
logger.debug({ queueName, args }, 'Calling callback')
callback(...args)
// clean local queue
queue.delete(queueName)
}
/**
* Add callback into local queue
* @param key - queue key
* @param callback - function to add
*/
export function add(key: string, callback: callbackQueue.Thunk): boolean {
const aggregator = callbackQueue.add(key, callback)
if (!aggregator) {
return false
}
queue.set(key, aggregator)
return true
}
/**
* Creates publish function that is used later on to process callbacks
* @param redis
* @param pubsubChannel
* @param logger
*/
export function createPublisher(redis: RedisInstance, pubsubChannel: string, logger: P.Logger): Publisher {
return async function publishResult(lockRedisKey: string, err?: Error | null, ...args: any[]): Promise<void> {
const broadcastArgs = [err ? serializeError(err) : null, ...args]
const localArgs = [err, ...args];
// post result to other processes
(async () => {
try {
await redis.publish(pubsubChannel, JSON.stringify([lockRedisKey, broadcastArgs]))
} catch (err) {
logger.warn({ err }, 'failed to publish results')
}
})()
// call local queue for faster processing
// we don't care here if it fails, it could've been already processed
try {
call(lockRedisKey, localArgs, logger)
} catch (err) {
logger.trace({ err, lockRedisKey }, 'failed to perform call')
}
}
}
/**
* Helper function to parse possible JSON from the Buffer
*/
function tryParsing(message: string, logger: P.Logger) {
try {
return JSON.parse(message)
} catch (err) {
logger.warn({ originalMessage: message, err }, 'Cant parse message')
return null
}
}
/**
* Creates publish function that is used later on to process callbacks
* @param {Object} redis
*/
export function createConsumer(redis: RedisInstance, pubsubChannel: string, logger: P.Logger): Consumer {
const connect = async (): Promise<void> => {
try {
await redis.subscribe(pubsubChannel)
logger.info({ pubsubChannel }, 'Subscribed to channel')
} catch (err) {
logger.error({ err }, 'Failed to subsctibe to pubsub channel')
await setTimeout(250)
return connect()
}
}
// init connection
connect()
return async function redisEventListener(channel, _message) {
if (channel.toString() !== pubsubChannel) {
return
}
const message = tryParsing(_message, logger)
if (!isArray(message)) {
return
}
const [key, args] = message
if (!key || !isArray(args)) {
logger.warn({ redisMsg: message }, 'Malformed message passed: no key or args')
return
}
// no listeners here
// eat the error
try {
if (args[0]) {
args[0] = deserializeError(args[0])
}
call(key, args, logger)
} catch (err) {
if (err !== kError) {
logger.warn({ err }, 'call failed')
}
}
}
}
// //////////////////////////////////////////////////////////////////////////////
// Private section
// //////////////////////////////////////////////////////////////////////////////
/**
* Reference to original call function,
* used for testing
*/
export const _call = call