Skip to content

Commit dfc3931

Browse files
committed
Fix job events
1 parent 78ea07f commit dfc3931

3 files changed

Lines changed: 91 additions & 20 deletions

File tree

lib/JobService.js

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,37 @@ class JobService extends AdapterService {
1313
sorter,
1414
paginate: {
1515
max: 25
16-
}
16+
},
17+
events: ['succeeded', 'retrying', 'failed', 'progress']
1718
}, options));
1819
}
1920

21+
setup (app, path) {
22+
if (this.options.events.indexOf('succeeded') > -1) {
23+
this.queue.on('job succeeded',
24+
(id, result) => this.emit('succeeded', { id, result })
25+
);
26+
}
27+
28+
if (this.options.events.indexOf('retrying') > -1) {
29+
this.queue.on('job retrying',
30+
(id, error) => this.emit('retrying', { id, error })
31+
);
32+
}
33+
34+
if (this.options.events.indexOf('failed') > -1) {
35+
this.queue.on('job failed',
36+
(id, error) => this.emit('failed', { id, error })
37+
);
38+
}
39+
40+
if (this.options.events.indexOf('progress') > -1) {
41+
this.queue.on('job progress',
42+
(id, progress) => this.emit('progress', { id, progress })
43+
);
44+
}
45+
}
46+
2047
get queue () {
2148
return this.options.queue;
2249
}

lib/index.js

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,5 @@ module.exports = (options = {}) => {
44
if (!options.queue) {
55
throw new Error('Missing queue parameter');
66
}
7-
8-
const { queue } = options;
9-
const service = new JobService(options);
10-
11-
// Subscribe the service to the Queue PubSub events
12-
queue.on('job succeeded',
13-
(id, result) => service.emit('succeeded', { id, result })
14-
);
15-
queue.on('job retrying',
16-
(id, error) => service.emit('retrying', { id, error })
17-
);
18-
queue.on('job failed',
19-
(id, error) => service.emit('failed', { id, error })
20-
);
21-
queue.on('job progress',
22-
(id, progress) => service.emit('progress', { id, progress })
23-
);
24-
25-
return service;
7+
return new JobService(options);
268
};

test/index.test.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,68 @@ describe('Feathers Bee-Queue Service', () => {
8888
);
8989
});
9090

91+
describe('pubsub events', () => {
92+
describe('when enabled', () => {
93+
let app, queue, service;
94+
95+
beforeEach(() => {
96+
app = feathers();
97+
queue = new Queue('pubsub');
98+
service = app.use('/pubsub', plugin({
99+
queue,
100+
events: [ 'progress' ]
101+
})).service('/pubsub');
102+
service.setup(app, '/pubsub');
103+
});
104+
105+
afterEach(() => queue.destroy());
106+
107+
it('emits progress event', done => {
108+
service.create({})
109+
.then(job => queue.getJob(job.id))
110+
.then(job => {
111+
service.on('progress', event => {
112+
assert.strictEqual(event.id, job.id);
113+
assert.strictEqual(event.progress, 50);
114+
done();
115+
});
116+
job.reportProgress(50);
117+
})
118+
.catch(done);
119+
});
120+
});
121+
122+
describe('when disabled', () => {
123+
let app, queue, service;
124+
125+
beforeEach(() => {
126+
app = feathers();
127+
queue = new Queue('pubsub2');
128+
service = app.use('/pubsub2', plugin({
129+
queue,
130+
events: []
131+
})).service('/pubsub2');
132+
service.setup(app, '/pubsub2');
133+
});
134+
135+
afterEach(() => queue.destroy());
136+
137+
it('does not emit progress event', done => {
138+
service.create({})
139+
.then(job => queue.getJob(job.id))
140+
.then(job => {
141+
service.on('progress', () => {
142+
done(new Error('It should not emit an event'));
143+
});
144+
job.reportProgress(50);
145+
assert.ok(true);
146+
done();
147+
})
148+
.catch(done);
149+
});
150+
});
151+
});
152+
91153
describe('find', () => {
92154
it('validates the query status', async () => {
93155
try {

0 commit comments

Comments
 (0)