Skip to content

Commit 6881066

Browse files
authored
chore: track schema changes (#133)
1 parent 60da79a commit 6881066

5 files changed

Lines changed: 430 additions & 2 deletions

File tree

.github/workflows/ci.yml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,39 @@ jobs:
7070
- run: yarn --frozen-lockfile
7171
- run: yarn lint
7272
- run: yarn jest -i --ci
73+
74+
database_updated:
75+
runs-on: ubuntu-18.04
76+
77+
services:
78+
postgres:
79+
image: postgres:11
80+
env:
81+
POSTGRES_USER: postgres
82+
POSTGRES_PASSWORD: postgres
83+
POSTGRES_DB: graphile_worker_test
84+
ports:
85+
- "0.0.0.0:5432:5432"
86+
# needed because the postgres container does not provide a healthcheck
87+
options:
88+
--health-cmd pg_isready --health-interval 10s --health-timeout 5s
89+
--health-retries 5
90+
91+
steps:
92+
- uses: actions/checkout@v1
93+
- name: Use Node.js ${{ matrix.node-version }}
94+
uses: actions/setup-node@v1
95+
with:
96+
node-version: 12.x
97+
- run: yarn --frozen-lockfile
98+
- name: Install pg_dump
99+
run: |
100+
sudo bash -c "echo deb http://apt.postgresql.org/pub/repos/apt/ bionic-pgdg main >> /etc/apt/sources.list.d/pgdg.list"
101+
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
102+
sudo apt-get update
103+
sudo apt-get -yqq install postgresql-client-11
104+
- name: "Check schema hasn't changed"
105+
run: |
106+
yarn db:dump
107+
git update-index -q --refresh
108+
git diff-index --quiet HEAD -- || (git diff && echo "^^^ The database schema has changed, please run 'yarn db:dump'" && exit 1)

__tests__/schema.sql

Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
SELECT pg_catalog.set_config('search_path', '', false);
2+
CREATE SCHEMA graphile_worker;
3+
ALTER SCHEMA graphile_worker OWNER TO graphile_worker_role;
4+
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
5+
COMMENT ON EXTENSION pgcrypto IS 'cryptographic functions';
6+
CREATE TABLE graphile_worker.jobs (
7+
id bigint NOT NULL,
8+
queue_name text DEFAULT (public.gen_random_uuid())::text,
9+
task_identifier text NOT NULL,
10+
payload json DEFAULT '{}'::json NOT NULL,
11+
priority integer DEFAULT 0 NOT NULL,
12+
run_at timestamp with time zone DEFAULT now() NOT NULL,
13+
attempts integer DEFAULT 0 NOT NULL,
14+
max_attempts integer DEFAULT 25 NOT NULL,
15+
last_error text,
16+
created_at timestamp with time zone DEFAULT now() NOT NULL,
17+
updated_at timestamp with time zone DEFAULT now() NOT NULL,
18+
key text,
19+
locked_at timestamp with time zone,
20+
locked_by text,
21+
revision integer DEFAULT 0 NOT NULL,
22+
CONSTRAINT jobs_key_check CHECK ((length(key) > 0))
23+
);
24+
ALTER TABLE graphile_worker.jobs OWNER TO graphile_worker_role;
25+
CREATE FUNCTION graphile_worker.add_job(identifier text, payload json DEFAULT NULL::json, queue_name text DEFAULT NULL::text, run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, max_attempts integer DEFAULT NULL::integer, job_key text DEFAULT NULL::text, priority integer DEFAULT NULL::integer) RETURNS graphile_worker.jobs
26+
LANGUAGE plpgsql
27+
AS $$
28+
declare
29+
v_job "graphile_worker".jobs;
30+
begin
31+
-- Apply rationality checks
32+
if length(identifier) > 128 then
33+
raise exception 'Task identifier is too long (max length: 128).' using errcode = 'GWBID';
34+
end if;
35+
if queue_name is not null and length(queue_name) > 128 then
36+
raise exception 'Job queue name is too long (max length: 128).' using errcode = 'GWBQN';
37+
end if;
38+
if job_key is not null and length(job_key) > 512 then
39+
raise exception 'Job key is too long (max length: 512).' using errcode = 'GWBJK';
40+
end if;
41+
if max_attempts < 1 then
42+
raise exception 'Job maximum attempts must be at least 1' using errcode = 'GWBMA';
43+
end if;
44+
if job_key is not null then
45+
-- Upsert job
46+
insert into "graphile_worker".jobs (
47+
task_identifier,
48+
payload,
49+
queue_name,
50+
run_at,
51+
max_attempts,
52+
key,
53+
priority
54+
)
55+
values(
56+
identifier,
57+
coalesce(payload, '{}'::json),
58+
queue_name,
59+
coalesce(run_at, now()),
60+
coalesce(max_attempts, 25),
61+
job_key,
62+
coalesce(priority, 0)
63+
)
64+
on conflict (key) do update set
65+
task_identifier=excluded.task_identifier,
66+
payload=excluded.payload,
67+
queue_name=excluded.queue_name,
68+
max_attempts=excluded.max_attempts,
69+
run_at=excluded.run_at,
70+
priority=excluded.priority,
71+
revision=jobs.revision + 1,
72+
-- always reset error/retry state
73+
attempts=0,
74+
last_error=null
75+
where jobs.locked_at is null
76+
returning *
77+
into v_job;
78+
-- If upsert succeeded (insert or update), return early
79+
if not (v_job is null) then
80+
return v_job;
81+
end if;
82+
-- Upsert failed -> there must be an existing job that is locked. Remove
83+
-- existing key to allow a new one to be inserted, and prevent any
84+
-- subsequent retries by bumping attempts to the max allowed.
85+
update "graphile_worker".jobs
86+
set
87+
key = null,
88+
attempts = jobs.max_attempts
89+
where key = job_key;
90+
end if;
91+
-- insert the new job. Assume no conflicts due to the update above
92+
insert into "graphile_worker".jobs(
93+
task_identifier,
94+
payload,
95+
queue_name,
96+
run_at,
97+
max_attempts,
98+
key,
99+
priority
100+
)
101+
values(
102+
identifier,
103+
coalesce(payload, '{}'::json),
104+
queue_name,
105+
coalesce(run_at, now()),
106+
coalesce(max_attempts, 25),
107+
job_key,
108+
coalesce(priority, 0)
109+
)
110+
returning *
111+
into v_job;
112+
return v_job;
113+
end;
114+
$$;
115+
ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer) OWNER TO graphile_worker_role;
116+
CREATE FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) RETURNS graphile_worker.jobs
117+
LANGUAGE plpgsql
118+
AS $$
119+
declare
120+
v_row "graphile_worker".jobs;
121+
begin
122+
delete from "graphile_worker".jobs
123+
where id = job_id
124+
returning * into v_row;
125+
if v_row.queue_name is not null then
126+
update "graphile_worker".job_queues
127+
set locked_by = null, locked_at = null
128+
where queue_name = v_row.queue_name and locked_by = worker_id;
129+
end if;
130+
return v_row;
131+
end;
132+
$$;
133+
ALTER FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) OWNER TO graphile_worker_role;
134+
CREATE FUNCTION graphile_worker.complete_jobs(job_ids bigint[]) RETURNS SETOF graphile_worker.jobs
135+
LANGUAGE sql
136+
AS $$
137+
delete from "graphile_worker".jobs
138+
where id = any(job_ids)
139+
and (
140+
locked_by is null
141+
or
142+
locked_at < NOW() - interval '4 hours'
143+
)
144+
returning *;
145+
$$;
146+
ALTER FUNCTION graphile_worker.complete_jobs(job_ids bigint[]) OWNER TO graphile_worker_role;
147+
CREATE FUNCTION graphile_worker.fail_job(worker_id text, job_id bigint, error_message text) RETURNS graphile_worker.jobs
148+
LANGUAGE plpgsql STRICT
149+
AS $$
150+
declare
151+
v_row "graphile_worker".jobs;
152+
begin
153+
update "graphile_worker".jobs
154+
set
155+
last_error = error_message,
156+
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval,
157+
locked_by = null,
158+
locked_at = null
159+
where id = job_id and locked_by = worker_id
160+
returning * into v_row;
161+
if v_row.queue_name is not null then
162+
update "graphile_worker".job_queues
163+
set locked_by = null, locked_at = null
164+
where queue_name = v_row.queue_name and locked_by = worker_id;
165+
end if;
166+
return v_row;
167+
end;
168+
$$;
169+
ALTER FUNCTION graphile_worker.fail_job(worker_id text, job_id bigint, error_message text) OWNER TO graphile_worker_role;
170+
CREATE FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[] DEFAULT NULL::text[], job_expiry interval DEFAULT '04:00:00'::interval) RETURNS graphile_worker.jobs
171+
LANGUAGE plpgsql
172+
AS $$
173+
declare
174+
v_job_id bigint;
175+
v_queue_name text;
176+
v_row "graphile_worker".jobs;
177+
v_now timestamptz = now();
178+
begin
179+
if worker_id is null or length(worker_id) < 10 then
180+
raise exception 'invalid worker id';
181+
end if;
182+
select jobs.queue_name, jobs.id into v_queue_name, v_job_id
183+
from "graphile_worker".jobs
184+
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
185+
and (
186+
jobs.queue_name is null
187+
or
188+
exists (
189+
select 1
190+
from "graphile_worker".job_queues
191+
where job_queues.queue_name = jobs.queue_name
192+
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
193+
for update
194+
skip locked
195+
)
196+
)
197+
and run_at <= v_now
198+
and attempts < max_attempts
199+
and (task_identifiers is null or task_identifier = any(task_identifiers))
200+
order by priority asc, run_at asc, id asc
201+
limit 1
202+
for update
203+
skip locked;
204+
if v_job_id is null then
205+
return null;
206+
end if;
207+
if v_queue_name is not null then
208+
update "graphile_worker".job_queues
209+
set
210+
locked_by = worker_id,
211+
locked_at = v_now
212+
where job_queues.queue_name = v_queue_name;
213+
end if;
214+
update "graphile_worker".jobs
215+
set
216+
attempts = attempts + 1,
217+
locked_by = worker_id,
218+
locked_at = v_now
219+
where id = v_job_id
220+
returning * into v_row;
221+
return v_row;
222+
end;
223+
$$;
224+
ALTER FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[], job_expiry interval) OWNER TO graphile_worker_role;
225+
CREATE FUNCTION graphile_worker.jobs__decrease_job_queue_count() RETURNS trigger
226+
LANGUAGE plpgsql
227+
AS $$
228+
declare
229+
v_new_job_count int;
230+
begin
231+
update "graphile_worker".job_queues
232+
set job_count = job_queues.job_count - 1
233+
where queue_name = old.queue_name
234+
returning job_count into v_new_job_count;
235+
if v_new_job_count <= 0 then
236+
delete from "graphile_worker".job_queues where queue_name = old.queue_name and job_count <= 0;
237+
end if;
238+
return old;
239+
end;
240+
$$;
241+
ALTER FUNCTION graphile_worker.jobs__decrease_job_queue_count() OWNER TO graphile_worker_role;
242+
CREATE FUNCTION graphile_worker.jobs__increase_job_queue_count() RETURNS trigger
243+
LANGUAGE plpgsql
244+
AS $$
245+
begin
246+
insert into "graphile_worker".job_queues(queue_name, job_count)
247+
values(new.queue_name, 1)
248+
on conflict (queue_name)
249+
do update
250+
set job_count = job_queues.job_count + 1;
251+
return new;
252+
end;
253+
$$;
254+
ALTER FUNCTION graphile_worker.jobs__increase_job_queue_count() OWNER TO graphile_worker_role;
255+
CREATE FUNCTION graphile_worker.permanently_fail_jobs(job_ids bigint[], error_message text DEFAULT NULL::text) RETURNS SETOF graphile_worker.jobs
256+
LANGUAGE sql
257+
AS $$
258+
update "graphile_worker".jobs
259+
set
260+
last_error = coalesce(error_message, 'Manually marked as failed'),
261+
attempts = max_attempts
262+
where id = any(job_ids)
263+
and (
264+
locked_by is null
265+
or
266+
locked_at < NOW() - interval '4 hours'
267+
)
268+
returning *;
269+
$$;
270+
ALTER FUNCTION graphile_worker.permanently_fail_jobs(job_ids bigint[], error_message text) OWNER TO graphile_worker_role;
271+
CREATE FUNCTION graphile_worker.remove_job(job_key text) RETURNS graphile_worker.jobs
272+
LANGUAGE sql STRICT
273+
AS $$
274+
delete from "graphile_worker".jobs
275+
where key = job_key
276+
and locked_at is null
277+
returning *;
278+
$$;
279+
ALTER FUNCTION graphile_worker.remove_job(job_key text) OWNER TO graphile_worker_role;
280+
CREATE FUNCTION graphile_worker.reschedule_jobs(job_ids bigint[], run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, priority integer DEFAULT NULL::integer, attempts integer DEFAULT NULL::integer, max_attempts integer DEFAULT NULL::integer) RETURNS SETOF graphile_worker.jobs
281+
LANGUAGE sql
282+
AS $$
283+
update "graphile_worker".jobs
284+
set
285+
run_at = coalesce(reschedule_jobs.run_at, jobs.run_at),
286+
priority = coalesce(reschedule_jobs.priority, jobs.priority),
287+
attempts = coalesce(reschedule_jobs.attempts, jobs.attempts),
288+
max_attempts = coalesce(reschedule_jobs.max_attempts, jobs.max_attempts)
289+
where id = any(job_ids)
290+
and (
291+
locked_by is null
292+
or
293+
locked_at < NOW() - interval '4 hours'
294+
)
295+
returning *;
296+
$$;
297+
ALTER FUNCTION graphile_worker.reschedule_jobs(job_ids bigint[], run_at timestamp with time zone, priority integer, attempts integer, max_attempts integer) OWNER TO graphile_worker_role;
298+
CREATE FUNCTION graphile_worker.tg__update_timestamp() RETURNS trigger
299+
LANGUAGE plpgsql
300+
AS $$
301+
begin
302+
new.updated_at = greatest(now(), old.updated_at + interval '1 millisecond');
303+
return new;
304+
end;
305+
$$;
306+
ALTER FUNCTION graphile_worker.tg__update_timestamp() OWNER TO graphile_worker_role;
307+
CREATE FUNCTION graphile_worker.tg_jobs__notify_new_jobs() RETURNS trigger
308+
LANGUAGE plpgsql
309+
AS $$
310+
begin
311+
perform pg_notify('jobs:insert', '');
312+
return new;
313+
end;
314+
$$;
315+
ALTER FUNCTION graphile_worker.tg_jobs__notify_new_jobs() OWNER TO graphile_worker_role;
316+
CREATE TABLE graphile_worker.job_queues (
317+
queue_name text NOT NULL,
318+
job_count integer NOT NULL,
319+
locked_at timestamp with time zone,
320+
locked_by text
321+
);
322+
ALTER TABLE graphile_worker.job_queues OWNER TO graphile_worker_role;
323+
CREATE SEQUENCE graphile_worker.jobs_id_seq
324+
START WITH 1
325+
INCREMENT BY 1
326+
NO MINVALUE
327+
NO MAXVALUE
328+
CACHE 1;
329+
ALTER TABLE graphile_worker.jobs_id_seq OWNER TO graphile_worker_role;
330+
ALTER SEQUENCE graphile_worker.jobs_id_seq OWNED BY graphile_worker.jobs.id;
331+
CREATE TABLE graphile_worker.migrations (
332+
id integer NOT NULL,
333+
ts timestamp with time zone DEFAULT now() NOT NULL
334+
);
335+
ALTER TABLE graphile_worker.migrations OWNER TO graphile_worker_role;
336+
ALTER TABLE ONLY graphile_worker.jobs ALTER COLUMN id SET DEFAULT nextval('graphile_worker.jobs_id_seq'::regclass);
337+
ALTER TABLE ONLY graphile_worker.job_queues
338+
ADD CONSTRAINT job_queues_pkey PRIMARY KEY (queue_name);
339+
ALTER TABLE ONLY graphile_worker.jobs
340+
ADD CONSTRAINT jobs_key_key UNIQUE (key);
341+
ALTER TABLE ONLY graphile_worker.jobs
342+
ADD CONSTRAINT jobs_pkey PRIMARY KEY (id);
343+
ALTER TABLE ONLY graphile_worker.migrations
344+
ADD CONSTRAINT migrations_pkey PRIMARY KEY (id);
345+
CREATE INDEX jobs_priority_run_at_id_idx ON graphile_worker.jobs USING btree (priority, run_at, id);
346+
CREATE TRIGGER _100_timestamps BEFORE UPDATE ON graphile_worker.jobs FOR EACH ROW EXECUTE PROCEDURE graphile_worker.tg__update_timestamp();
347+
CREATE TRIGGER _500_decrease_job_queue_count AFTER DELETE ON graphile_worker.jobs FOR EACH ROW WHEN ((old.queue_name IS NOT NULL)) EXECUTE PROCEDURE graphile_worker.jobs__decrease_job_queue_count();
348+
CREATE TRIGGER _500_decrease_job_queue_count_update AFTER UPDATE OF queue_name ON graphile_worker.jobs FOR EACH ROW WHEN (((new.queue_name IS DISTINCT FROM old.queue_name) AND (old.queue_name IS NOT NULL))) EXECUTE PROCEDURE graphile_worker.jobs__decrease_job_queue_count();
349+
CREATE TRIGGER _500_increase_job_queue_count AFTER INSERT ON graphile_worker.jobs FOR EACH ROW WHEN ((new.queue_name IS NOT NULL)) EXECUTE PROCEDURE graphile_worker.jobs__increase_job_queue_count();
350+
CREATE TRIGGER _500_increase_job_queue_count_update AFTER UPDATE OF queue_name ON graphile_worker.jobs FOR EACH ROW WHEN (((new.queue_name IS DISTINCT FROM old.queue_name) AND (new.queue_name IS NOT NULL))) EXECUTE PROCEDURE graphile_worker.jobs__increase_job_queue_count();
351+
CREATE TRIGGER _900_notify_worker AFTER INSERT ON graphile_worker.jobs FOR EACH STATEMENT EXECUTE PROCEDURE graphile_worker.tg_jobs__notify_new_jobs();
352+
ALTER TABLE graphile_worker.job_queues ENABLE ROW LEVEL SECURITY;
353+
ALTER TABLE graphile_worker.jobs ENABLE ROW LEVEL SECURITY;

0 commit comments

Comments
 (0)