Skip to content

Commit a34a023

Browse files
Upgrade to NATS 2.0 (#175)
1 parent d303798 commit a34a023

File tree

5 files changed

+325
-14451
lines changed

5 files changed

+325
-14451
lines changed

README.md

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ const sync = require('feathers-sync');
4646

4747
const app = feathers();
4848

49-
app.configure(sync({
50-
uri: 'redis://localhost:6379'
51-
}));
49+
app.configure(
50+
sync({
51+
uri: 'redis://localhost:6379',
52+
})
53+
);
5254
app.use('/todos', todoService);
5355
```
5456

@@ -78,13 +80,13 @@ app.service('messages').hooks({
7880
after: {
7981
create(context) {
8082
// Don't synchronize if more than 1000 items were created at once
81-
if(context.result.length > 1000) {
83+
if (context.result.length > 1000) {
8284
context[SYNC] = false;
8385
}
8486

8587
return context;
86-
}
87-
}
88+
},
89+
},
8890
});
8991
```
9092

@@ -94,18 +96,24 @@ app.service('messages').hooks({
9496

9597
```js
9698
// Configure Redis
97-
app.configure(sync({
98-
uri: 'redis://localhost:6379'
99-
}));
100-
101-
app.configure(sync.redis({
102-
db: redisInstance
103-
}));
99+
app.configure(
100+
sync({
101+
uri: 'redis://localhost:6379',
102+
})
103+
);
104+
105+
app.configure(
106+
sync.redis({
107+
db: redisInstance,
108+
})
109+
);
104110

105111
// Configure Redis using an existing redisClient
106-
app.configure(sync.redis({
107-
redisClient: redisClient
108-
}))
112+
app.configure(
113+
sync.redis({
114+
redisClient: redisClient,
115+
})
116+
);
109117
```
110118

111119
### Redis
@@ -125,9 +133,9 @@ app.configure(sync.redis({
125133

126134
### NATS
127135

128-
- `uri` - The connection string (must start with `nats://`)
129-
- `key` (default: `feathers-sync`) - The name exchange where sync messages will be published
130-
136+
- `uri` - The connection string (example `nats://`)
137+
- `key` (default: `feathers-sync`) - The name of subject where sync messages will be published
138+
- `natsConnectionOptions` - NATS [connection options](https://github.com/nats-io/nats.js#Connection-Options)
131139

132140
## How it works
133141

@@ -145,15 +153,17 @@ Event data are serialized and deserialized using `JSON.stringify` and `JSON.pars
145153

146154
```js
147155
// BSON can serialize / deserialize `Date` values.
148-
const bson = require('bson')
149-
150-
app.configure(sync({
151-
uri: 'redis://localhost:6379',
152-
// Replies will be sent to callbacks as Buffers instead of Strings for bson.deserialize to work.
153-
redisOptions: { return_buffers: true },
154-
serialize: bson.serialize,
155-
deserialize: bson.deserialize,
156-
}));
156+
const bson = require('bson');
157+
158+
app.configure(
159+
sync({
160+
uri: 'redis://localhost:6379',
161+
// Replies will be sent to callbacks as Buffers instead of Strings for bson.deserialize to work.
162+
redisOptions: { return_buffers: true },
163+
serialize: bson.serialize,
164+
deserialize: bson.deserialize,
165+
})
166+
);
157167
```
158168

159169
> `Redis` and `AMQP` can support binary serialization / deserialization (i.e. `Buffer` data). `NATS` currently does not support custom serialization / deserialization/
@@ -171,14 +181,14 @@ const myMessagingService = {
171181

172182
subscribe(callback) {
173183
// subscribe to message queue and emit data
174-
}
175-
}
184+
},
185+
};
176186

177-
module.exports = config => {
187+
module.exports = (config) => {
178188
// If adapter supports configurable serializer / deserializer (defaults to `JSON.stringfy` / `JSON.parse`)
179189
const { deserialize, serialize } = config;
180190

181-
return app => {
191+
return (app) => {
182192
app.configure(core);
183193
app.sync = {
184194
type: 'custom',
@@ -187,16 +197,16 @@ module.exports = config => {
187197
// reject on connection error
188198
}),
189199
serialize,
190-
deserialize
200+
deserialize,
191201
};
192202

193203
// Sent every time a service
194-
app.on('sync-out', data => {
204+
app.on('sync-out', (data) => {
195205
// Publish `data` to the message queue
196206
myMessagingService.publish(data);
197207
});
198208

199-
myMessagingService.subscribe(data => {
209+
myMessagingService.subscribe((data) => {
200210
// Send the synchronization event to the application
201211
app.emit('sync-in', data);
202212
});

lib/adapters/nats.js

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,64 @@
1-
const nats = require('nats');
1+
const { connect, StringCodec } = require('nats');
22
const debug = require('debug')('feathers-sync:nats');
33
const core = require('../core');
44

5-
module.exports = config => {
5+
module.exports = (config) => {
66
debug(`Setting up NATS connection ${config.uri}`);
77

8-
return app => {
9-
const { key, serialize, deserialize } = config;
10-
const natsClient = nats.connect({ json: true, url: config.uri, ...config.natsConnectionOptions });
8+
return (app) => {
9+
const {
10+
uri,
11+
key: subject,
12+
serialize,
13+
deserialize,
14+
natsConnectionOptions
15+
} = config;
16+
17+
// Setting up nats connection with unique servers list
18+
const natsClient = connect({
19+
...natsConnectionOptions,
20+
servers: [
21+
...new Set([
22+
uri,
23+
...(natsConnectionOptions.servers
24+
? natsConnectionOptions.servers
25+
: [])
26+
])
27+
]
28+
});
29+
const stringCodec = StringCodec();
1130

1231
app.configure(core);
1332
app.sync = {
1433
type: 'nats',
1534
serialize,
1635
deserialize,
1736
ready: new Promise((resolve, reject) => {
18-
natsClient.once('connect', resolve);
19-
natsClient.once('error', reject);
37+
natsClient
38+
.then((connection) => {
39+
const sub = connection.subscribe(subject);
40+
// listening events and resolving connection
41+
(async () => {
42+
for await (const message of sub) {
43+
const data = stringCodec.decode(message.data);
44+
debug(
45+
`[${sub.getProcessed()}]: ${stringCodec.decode(message.data)}`
46+
);
47+
app.emit('sync-in', data);
48+
}
49+
debug('subscription closed');
50+
})();
51+
resolve(connection);
52+
})
53+
.catch((error) => reject(error));
2054
})
2155
};
22-
app.on('sync-out', data => {
23-
debug(`Publishing key ${key} to Nats`);
24-
natsClient.publish(key, data);
25-
});
2656

27-
natsClient.subscribe(key, (msg) => {
28-
if (msg) {
29-
debug(`Got ${key} message from Nats`);
30-
app.emit('sync-in', msg);
31-
}
57+
app.on('sync-out', async (data) => {
58+
const natsClient = await connect(natsConnectionOptions);
59+
debug(`Publishing key ${subject} to NATS`);
60+
await natsClient.publish(subject, stringCodec.encode(data));
61+
await natsClient.drain();
3262
});
3363
};
3464
};

0 commit comments

Comments
 (0)