Skip to content

Commit 625c82a

Browse files
author
Stephane Bellity
committed
Process jobs in a stadalone worker process
1 parent 4e4be5c commit 625c82a

10 files changed

Lines changed: 205 additions & 107 deletions

File tree

Procfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
web: NODE_ENV=production node -r newrelic build/web
2+
worker: NODE_ENV=production node -r newrelic build/worker

Procfile.dev

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
web: NODE_ENV=development ./node_modules/.bin/babel-watch -L server/web
2+
worker: NODE_ENV=development ./node_modules/.bin/babel-watch -L server/worker

package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
"build:client": "NODE_ENV=production ./node_modules/.bin/webpack --config ./webpack.config.js --progress --profile --colors",
2727
"build:server": "./node_modules/.bin/babel server -d build",
2828
"clean": "./node_modules/.bin/rimraf build",
29-
"start": "node ./build",
30-
"start:dev": "./node_modules/.bin/nodemon -w server --exec ./node_modules/.bin/babel-node -- server",
29+
"start": "nf start",
30+
"start:dev": "nf -j Procfile.dev start",
3131
"test": "npm run test:lint && npm run test:units && npm run test:modules",
3232
"test:lint": "./node_modules/.bin/eslint server",
3333
"test:modules": "npm outdated --depth=0",
@@ -79,10 +79,13 @@
7979
"babel-core": "^6.10.4",
8080
"babel-eslint": "^6.0.5",
8181
"babel-loader": "^6.2.4",
82+
"babel-watch": "^2.0.4",
8283
"eslint": "^2.13.1",
8384
"eslint-config-airbnb-base": "^3.0.1",
8485
"eslint-plugin-import": "^1.9.2",
86+
"foreman": "^2.0.0",
8587
"mocha": "^2.5.3",
88+
"nf": "0.0.3",
8689
"nodemon": "^1.10.0",
8790
"sinon": "^1.17.4",
8891
"supertest": "^1.2.0",

server/index.js renamed to server/bootstrap.js

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ if (process.env.NEW_RELIC_LICENSE_KEY) {
55
}
66

77
const Hull = require("hull");
8-
const Server = require("./server");
9-
8+
const kue = require("kue");
109

1110
// Configure AWS
1211
const Aws = require("aws-sdk");
@@ -15,21 +14,24 @@ Aws.config.update({
1514
secretAccessKey: process.env.AWS_SECRET_KEY
1615
});
1716

18-
const PORT = process.env.PORT || 8082;
1917

2018
if (process.env.LOG_LEVEL) {
2119
Hull.logger.transports.console.level = process.env.LOG_LEVEL;
2220
}
2321

24-
const options = {
22+
const queue = kue.createQueue({
23+
prefix: process.env.KUE_PREFIX || "hull-sql",
24+
redis: process.env.REDIS_URL
25+
});
26+
27+
const PORT = process.env.PORT || 8082;
28+
29+
export default {
30+
PORT,
31+
queue,
2532
Hull,
2633
hostSecret: process.env.SECRET || "1234",
27-
devMode: process.env.NODE_ENV === "development"
34+
devMode: process.env.NODE_ENV === "development",
35+
workerMode: process.env.WORKER_MODE || "standalone"
2836
};
2937

30-
31-
const app = Server(options);
32-
33-
34-
console.log(`Listening on port ${PORT}`);
35-
app.listen(PORT);

server/server.js

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ import express from "express";
22
import bodyParser from "body-parser";
33
import path from "path";
44
import ejs from "ejs";
5-
import moment from "moment";
65

76
import devMode from "./dev-mode";
87
import SyncAgent from "./sync-agent";
98

9+
import KueRouter from "./util/kue-router";
10+
11+
1012
module.exports = function server(options = {}) {
11-
const { Hull, hostSecret } = options;
13+
const { Hull, hostSecret, queue } = options;
1214
const { Routes } = Hull;
1315
const { Readme, Manifest } = Routes;
1416
const app = express();
@@ -29,24 +31,38 @@ module.exports = function server(options = {}) {
2931
app.get("/", Readme);
3032
app.get("/readme", Readme);
3133

34+
app.use("/kue", KueRouter({ hostSecret, queue }));
35+
3236
app.use(Hull.Middleware({ hostSecret, fetchShip: true, cacheShip: false, requireCredentials: false }));
3337

34-
app.get("/admin.html", (req, res) => {
35-
const { ship } = req.hull;
36-
if (ship.private_settings.connection_string) {
38+
app.use((req, res, next) => {
39+
req.agent = new SyncAgent({ ...req.hull, queue });
40+
next();
41+
});
42+
43+
function checkConfiguration({ agent }, res, next) {
44+
if (!agent.isEnabled()) {
45+
res.status(403).json({ status: "ignored" });
46+
} else if (!agent.isConfigured()) {
47+
res.status(403).json({ status: "not configured" });
48+
} else {
49+
next();
50+
}
51+
}
52+
53+
app.get("/admin.html", ({ agent }, res) => {
54+
if (agent.isConfigured()) {
3755
res.render("connected.html", {
3856
last_sync_at: null,
39-
...ship.private_settings
57+
...agent.ship.private_settings
4058
});
4159
} else {
4260
res.render("home.html", {});
4361
}
4462
});
4563

46-
app.post("/run", (req, res) => {
47-
const { ship } = req.hull;
48-
const query = req.body.query || ship.private_settings.query;
49-
const agent = new SyncAgent(req.hull);
64+
app.post("/run", ({ body, agent }, res) => {
65+
const query = body.query || agent.getQuery();
5066
agent
5167
.runQuery(query, { timeout: 20000 })
5268
.then(data => res.json(data))
@@ -55,41 +71,15 @@ module.exports = function server(options = {}) {
5571
);
5672
});
5773

58-
app.post("/import", (req, res) => {
59-
const { private_settings } = req.hull.ship;
60-
const agent = new SyncAgent(req.hull);
61-
agent.streamQuery(private_settings.query)
62-
.catch((err) => {
63-
const { status, message } = err || {};
64-
res.status(status || 500).send({ message });
65-
})
66-
.then(stream => {
67-
res.json({ status: "working..." });
68-
return agent.startSync(stream, new Date());
69-
});
74+
app.post("/import", checkConfiguration, ({ agent }, res) => {
75+
agent.async("startImport");
76+
res.json({ status: "scheduled" });
7077
});
7178

72-
app.post("/sync", (req, res) => {
73-
const { private_settings = {} } = req.hull.ship;
74-
75-
const oneHourAgo = moment().subtract(1, "hour").utc();
76-
const last_updated_at = private_settings.last_updated_at || private_settings.last_sync_at || oneHourAgo.toISOString();
77-
78-
if (private_settings.enabled === true) {
79-
req.hull.client.logger.info("startSync", { last_updated_at });
80-
const agent = new SyncAgent(req.hull);
81-
agent.streamQuery(private_settings.query, { last_updated_at })
82-
.then(stream => {
83-
res.json({ status: "working", last_updated_at });
84-
return agent.startSync(stream, new Date());
85-
})
86-
.catch(({ status, message }) => {
87-
res.status(status || 500).send({ message });
88-
});
89-
} else {
90-
req.hull.client.logger.info("skipSync");
91-
res.json({ status: "ignored" });
92-
}
79+
app.post("/sync", checkConfiguration, ({ agent }, res) => {
80+
// Return early if sync not enabled
81+
agent.async("startSync");
82+
res.json({ status: "scheduled" });
9383
});
9484

9585
// Error Handler

0 commit comments

Comments
 (0)