|
| 1 | +SELECT pg_catalog.set_config('search_path', '', false); |
| 2 | +CREATE SCHEMA graphile_worker; |
| 3 | +ALTER SCHEMA graphile_worker OWNER TO graphile_worker_role; |
| 4 | +CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public; |
| 5 | +COMMENT ON EXTENSION pgcrypto IS 'cryptographic functions'; |
| 6 | +CREATE TABLE graphile_worker.jobs ( |
| 7 | + id bigint NOT NULL, |
| 8 | + queue_name text DEFAULT (public.gen_random_uuid())::text, |
| 9 | + task_identifier text NOT NULL, |
| 10 | + payload json DEFAULT '{}'::json NOT NULL, |
| 11 | + priority integer DEFAULT 0 NOT NULL, |
| 12 | + run_at timestamp with time zone DEFAULT now() NOT NULL, |
| 13 | + attempts integer DEFAULT 0 NOT NULL, |
| 14 | + max_attempts integer DEFAULT 25 NOT NULL, |
| 15 | + last_error text, |
| 16 | + created_at timestamp with time zone DEFAULT now() NOT NULL, |
| 17 | + updated_at timestamp with time zone DEFAULT now() NOT NULL, |
| 18 | + key text, |
| 19 | + locked_at timestamp with time zone, |
| 20 | + locked_by text, |
| 21 | + revision integer DEFAULT 0 NOT NULL, |
| 22 | + CONSTRAINT jobs_key_check CHECK ((length(key) > 0)) |
| 23 | +); |
| 24 | +ALTER TABLE graphile_worker.jobs OWNER TO graphile_worker_role; |
| 25 | +CREATE FUNCTION graphile_worker.add_job(identifier text, payload json DEFAULT NULL::json, queue_name text DEFAULT NULL::text, run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, max_attempts integer DEFAULT NULL::integer, job_key text DEFAULT NULL::text, priority integer DEFAULT NULL::integer) RETURNS graphile_worker.jobs |
| 26 | + LANGUAGE plpgsql |
| 27 | + AS $$ |
| 28 | +declare |
| 29 | + v_job "graphile_worker".jobs; |
| 30 | +begin |
| 31 | + -- Apply rationality checks |
| 32 | + if length(identifier) > 128 then |
| 33 | + raise exception 'Task identifier is too long (max length: 128).' using errcode = 'GWBID'; |
| 34 | + end if; |
| 35 | + if queue_name is not null and length(queue_name) > 128 then |
| 36 | + raise exception 'Job queue name is too long (max length: 128).' using errcode = 'GWBQN'; |
| 37 | + end if; |
| 38 | + if job_key is not null and length(job_key) > 512 then |
| 39 | + raise exception 'Job key is too long (max length: 512).' using errcode = 'GWBJK'; |
| 40 | + end if; |
| 41 | + if max_attempts < 1 then |
| 42 | + raise exception 'Job maximum attempts must be at least 1' using errcode = 'GWBMA'; |
| 43 | + end if; |
| 44 | + if job_key is not null then |
| 45 | + -- Upsert job |
| 46 | + insert into "graphile_worker".jobs ( |
| 47 | + task_identifier, |
| 48 | + payload, |
| 49 | + queue_name, |
| 50 | + run_at, |
| 51 | + max_attempts, |
| 52 | + key, |
| 53 | + priority |
| 54 | + ) |
| 55 | + values( |
| 56 | + identifier, |
| 57 | + coalesce(payload, '{}'::json), |
| 58 | + queue_name, |
| 59 | + coalesce(run_at, now()), |
| 60 | + coalesce(max_attempts, 25), |
| 61 | + job_key, |
| 62 | + coalesce(priority, 0) |
| 63 | + ) |
| 64 | + on conflict (key) do update set |
| 65 | + task_identifier=excluded.task_identifier, |
| 66 | + payload=excluded.payload, |
| 67 | + queue_name=excluded.queue_name, |
| 68 | + max_attempts=excluded.max_attempts, |
| 69 | + run_at=excluded.run_at, |
| 70 | + priority=excluded.priority, |
| 71 | + revision=jobs.revision + 1, |
| 72 | + -- always reset error/retry state |
| 73 | + attempts=0, |
| 74 | + last_error=null |
| 75 | + where jobs.locked_at is null |
| 76 | + returning * |
| 77 | + into v_job; |
| 78 | + -- If upsert succeeded (insert or update), return early |
| 79 | + if not (v_job is null) then |
| 80 | + return v_job; |
| 81 | + end if; |
| 82 | + -- Upsert failed -> there must be an existing job that is locked. Remove |
| 83 | + -- existing key to allow a new one to be inserted, and prevent any |
| 84 | + -- subsequent retries by bumping attempts to the max allowed. |
| 85 | + update "graphile_worker".jobs |
| 86 | + set |
| 87 | + key = null, |
| 88 | + attempts = jobs.max_attempts |
| 89 | + where key = job_key; |
| 90 | + end if; |
| 91 | + -- insert the new job. Assume no conflicts due to the update above |
| 92 | + insert into "graphile_worker".jobs( |
| 93 | + task_identifier, |
| 94 | + payload, |
| 95 | + queue_name, |
| 96 | + run_at, |
| 97 | + max_attempts, |
| 98 | + key, |
| 99 | + priority |
| 100 | + ) |
| 101 | + values( |
| 102 | + identifier, |
| 103 | + coalesce(payload, '{}'::json), |
| 104 | + queue_name, |
| 105 | + coalesce(run_at, now()), |
| 106 | + coalesce(max_attempts, 25), |
| 107 | + job_key, |
| 108 | + coalesce(priority, 0) |
| 109 | + ) |
| 110 | + returning * |
| 111 | + into v_job; |
| 112 | + return v_job; |
| 113 | +end; |
| 114 | +$$; |
| 115 | +ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer) OWNER TO graphile_worker_role; |
| 116 | +CREATE FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) RETURNS graphile_worker.jobs |
| 117 | + LANGUAGE plpgsql |
| 118 | + AS $$ |
| 119 | +declare |
| 120 | + v_row "graphile_worker".jobs; |
| 121 | +begin |
| 122 | + delete from "graphile_worker".jobs |
| 123 | + where id = job_id |
| 124 | + returning * into v_row; |
| 125 | + if v_row.queue_name is not null then |
| 126 | + update "graphile_worker".job_queues |
| 127 | + set locked_by = null, locked_at = null |
| 128 | + where queue_name = v_row.queue_name and locked_by = worker_id; |
| 129 | + end if; |
| 130 | + return v_row; |
| 131 | +end; |
| 132 | +$$; |
| 133 | +ALTER FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) OWNER TO graphile_worker_role; |
| 134 | +CREATE FUNCTION graphile_worker.complete_jobs(job_ids bigint[]) RETURNS SETOF graphile_worker.jobs |
| 135 | + LANGUAGE sql |
| 136 | + AS $$ |
| 137 | + delete from "graphile_worker".jobs |
| 138 | + where id = any(job_ids) |
| 139 | + and ( |
| 140 | + locked_by is null |
| 141 | + or |
| 142 | + locked_at < NOW() - interval '4 hours' |
| 143 | + ) |
| 144 | + returning *; |
| 145 | +$$; |
| 146 | +ALTER FUNCTION graphile_worker.complete_jobs(job_ids bigint[]) OWNER TO graphile_worker_role; |
| 147 | +CREATE FUNCTION graphile_worker.fail_job(worker_id text, job_id bigint, error_message text) RETURNS graphile_worker.jobs |
| 148 | + LANGUAGE plpgsql STRICT |
| 149 | + AS $$ |
| 150 | +declare |
| 151 | + v_row "graphile_worker".jobs; |
| 152 | +begin |
| 153 | + update "graphile_worker".jobs |
| 154 | + set |
| 155 | + last_error = error_message, |
| 156 | + run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, |
| 157 | + locked_by = null, |
| 158 | + locked_at = null |
| 159 | + where id = job_id and locked_by = worker_id |
| 160 | + returning * into v_row; |
| 161 | + if v_row.queue_name is not null then |
| 162 | + update "graphile_worker".job_queues |
| 163 | + set locked_by = null, locked_at = null |
| 164 | + where queue_name = v_row.queue_name and locked_by = worker_id; |
| 165 | + end if; |
| 166 | + return v_row; |
| 167 | +end; |
| 168 | +$$; |
| 169 | +ALTER FUNCTION graphile_worker.fail_job(worker_id text, job_id bigint, error_message text) OWNER TO graphile_worker_role; |
| 170 | +CREATE FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[] DEFAULT NULL::text[], job_expiry interval DEFAULT '04:00:00'::interval) RETURNS graphile_worker.jobs |
| 171 | + LANGUAGE plpgsql |
| 172 | + AS $$ |
| 173 | +declare |
| 174 | + v_job_id bigint; |
| 175 | + v_queue_name text; |
| 176 | + v_row "graphile_worker".jobs; |
| 177 | + v_now timestamptz = now(); |
| 178 | +begin |
| 179 | + if worker_id is null or length(worker_id) < 10 then |
| 180 | + raise exception 'invalid worker id'; |
| 181 | + end if; |
| 182 | + select jobs.queue_name, jobs.id into v_queue_name, v_job_id |
| 183 | + from "graphile_worker".jobs |
| 184 | + where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) |
| 185 | + and ( |
| 186 | + jobs.queue_name is null |
| 187 | + or |
| 188 | + exists ( |
| 189 | + select 1 |
| 190 | + from "graphile_worker".job_queues |
| 191 | + where job_queues.queue_name = jobs.queue_name |
| 192 | + and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry)) |
| 193 | + for update |
| 194 | + skip locked |
| 195 | + ) |
| 196 | + ) |
| 197 | + and run_at <= v_now |
| 198 | + and attempts < max_attempts |
| 199 | + and (task_identifiers is null or task_identifier = any(task_identifiers)) |
| 200 | + order by priority asc, run_at asc, id asc |
| 201 | + limit 1 |
| 202 | + for update |
| 203 | + skip locked; |
| 204 | + if v_job_id is null then |
| 205 | + return null; |
| 206 | + end if; |
| 207 | + if v_queue_name is not null then |
| 208 | + update "graphile_worker".job_queues |
| 209 | + set |
| 210 | + locked_by = worker_id, |
| 211 | + locked_at = v_now |
| 212 | + where job_queues.queue_name = v_queue_name; |
| 213 | + end if; |
| 214 | + update "graphile_worker".jobs |
| 215 | + set |
| 216 | + attempts = attempts + 1, |
| 217 | + locked_by = worker_id, |
| 218 | + locked_at = v_now |
| 219 | + where id = v_job_id |
| 220 | + returning * into v_row; |
| 221 | + return v_row; |
| 222 | +end; |
| 223 | +$$; |
| 224 | +ALTER FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[], job_expiry interval) OWNER TO graphile_worker_role; |
| 225 | +CREATE FUNCTION graphile_worker.jobs__decrease_job_queue_count() RETURNS trigger |
| 226 | + LANGUAGE plpgsql |
| 227 | + AS $$ |
| 228 | +declare |
| 229 | + v_new_job_count int; |
| 230 | +begin |
| 231 | + update "graphile_worker".job_queues |
| 232 | + set job_count = job_queues.job_count - 1 |
| 233 | + where queue_name = old.queue_name |
| 234 | + returning job_count into v_new_job_count; |
| 235 | + if v_new_job_count <= 0 then |
| 236 | + delete from "graphile_worker".job_queues where queue_name = old.queue_name and job_count <= 0; |
| 237 | + end if; |
| 238 | + return old; |
| 239 | +end; |
| 240 | +$$; |
| 241 | +ALTER FUNCTION graphile_worker.jobs__decrease_job_queue_count() OWNER TO graphile_worker_role; |
| 242 | +CREATE FUNCTION graphile_worker.jobs__increase_job_queue_count() RETURNS trigger |
| 243 | + LANGUAGE plpgsql |
| 244 | + AS $$ |
| 245 | +begin |
| 246 | + insert into "graphile_worker".job_queues(queue_name, job_count) |
| 247 | + values(new.queue_name, 1) |
| 248 | + on conflict (queue_name) |
| 249 | + do update |
| 250 | + set job_count = job_queues.job_count + 1; |
| 251 | + return new; |
| 252 | +end; |
| 253 | +$$; |
| 254 | +ALTER FUNCTION graphile_worker.jobs__increase_job_queue_count() OWNER TO graphile_worker_role; |
| 255 | +CREATE FUNCTION graphile_worker.permanently_fail_jobs(job_ids bigint[], error_message text DEFAULT NULL::text) RETURNS SETOF graphile_worker.jobs |
| 256 | + LANGUAGE sql |
| 257 | + AS $$ |
| 258 | + update "graphile_worker".jobs |
| 259 | + set |
| 260 | + last_error = coalesce(error_message, 'Manually marked as failed'), |
| 261 | + attempts = max_attempts |
| 262 | + where id = any(job_ids) |
| 263 | + and ( |
| 264 | + locked_by is null |
| 265 | + or |
| 266 | + locked_at < NOW() - interval '4 hours' |
| 267 | + ) |
| 268 | + returning *; |
| 269 | +$$; |
| 270 | +ALTER FUNCTION graphile_worker.permanently_fail_jobs(job_ids bigint[], error_message text) OWNER TO graphile_worker_role; |
| 271 | +CREATE FUNCTION graphile_worker.remove_job(job_key text) RETURNS graphile_worker.jobs |
| 272 | + LANGUAGE sql STRICT |
| 273 | + AS $$ |
| 274 | + delete from "graphile_worker".jobs |
| 275 | + where key = job_key |
| 276 | + and locked_at is null |
| 277 | + returning *; |
| 278 | +$$; |
| 279 | +ALTER FUNCTION graphile_worker.remove_job(job_key text) OWNER TO graphile_worker_role; |
| 280 | +CREATE FUNCTION graphile_worker.reschedule_jobs(job_ids bigint[], run_at timestamp with time zone DEFAULT NULL::timestamp with time zone, priority integer DEFAULT NULL::integer, attempts integer DEFAULT NULL::integer, max_attempts integer DEFAULT NULL::integer) RETURNS SETOF graphile_worker.jobs |
| 281 | + LANGUAGE sql |
| 282 | + AS $$ |
| 283 | + update "graphile_worker".jobs |
| 284 | + set |
| 285 | + run_at = coalesce(reschedule_jobs.run_at, jobs.run_at), |
| 286 | + priority = coalesce(reschedule_jobs.priority, jobs.priority), |
| 287 | + attempts = coalesce(reschedule_jobs.attempts, jobs.attempts), |
| 288 | + max_attempts = coalesce(reschedule_jobs.max_attempts, jobs.max_attempts) |
| 289 | + where id = any(job_ids) |
| 290 | + and ( |
| 291 | + locked_by is null |
| 292 | + or |
| 293 | + locked_at < NOW() - interval '4 hours' |
| 294 | + ) |
| 295 | + returning *; |
| 296 | +$$; |
| 297 | +ALTER FUNCTION graphile_worker.reschedule_jobs(job_ids bigint[], run_at timestamp with time zone, priority integer, attempts integer, max_attempts integer) OWNER TO graphile_worker_role; |
| 298 | +CREATE FUNCTION graphile_worker.tg__update_timestamp() RETURNS trigger |
| 299 | + LANGUAGE plpgsql |
| 300 | + AS $$ |
| 301 | +begin |
| 302 | + new.updated_at = greatest(now(), old.updated_at + interval '1 millisecond'); |
| 303 | + return new; |
| 304 | +end; |
| 305 | +$$; |
| 306 | +ALTER FUNCTION graphile_worker.tg__update_timestamp() OWNER TO graphile_worker_role; |
| 307 | +CREATE FUNCTION graphile_worker.tg_jobs__notify_new_jobs() RETURNS trigger |
| 308 | + LANGUAGE plpgsql |
| 309 | + AS $$ |
| 310 | +begin |
| 311 | + perform pg_notify('jobs:insert', ''); |
| 312 | + return new; |
| 313 | +end; |
| 314 | +$$; |
| 315 | +ALTER FUNCTION graphile_worker.tg_jobs__notify_new_jobs() OWNER TO graphile_worker_role; |
| 316 | +CREATE TABLE graphile_worker.job_queues ( |
| 317 | + queue_name text NOT NULL, |
| 318 | + job_count integer NOT NULL, |
| 319 | + locked_at timestamp with time zone, |
| 320 | + locked_by text |
| 321 | +); |
| 322 | +ALTER TABLE graphile_worker.job_queues OWNER TO graphile_worker_role; |
| 323 | +CREATE SEQUENCE graphile_worker.jobs_id_seq |
| 324 | + START WITH 1 |
| 325 | + INCREMENT BY 1 |
| 326 | + NO MINVALUE |
| 327 | + NO MAXVALUE |
| 328 | + CACHE 1; |
| 329 | +ALTER TABLE graphile_worker.jobs_id_seq OWNER TO graphile_worker_role; |
| 330 | +ALTER SEQUENCE graphile_worker.jobs_id_seq OWNED BY graphile_worker.jobs.id; |
| 331 | +CREATE TABLE graphile_worker.migrations ( |
| 332 | + id integer NOT NULL, |
| 333 | + ts timestamp with time zone DEFAULT now() NOT NULL |
| 334 | +); |
| 335 | +ALTER TABLE graphile_worker.migrations OWNER TO graphile_worker_role; |
| 336 | +ALTER TABLE ONLY graphile_worker.jobs ALTER COLUMN id SET DEFAULT nextval('graphile_worker.jobs_id_seq'::regclass); |
| 337 | +ALTER TABLE ONLY graphile_worker.job_queues |
| 338 | + ADD CONSTRAINT job_queues_pkey PRIMARY KEY (queue_name); |
| 339 | +ALTER TABLE ONLY graphile_worker.jobs |
| 340 | + ADD CONSTRAINT jobs_key_key UNIQUE (key); |
| 341 | +ALTER TABLE ONLY graphile_worker.jobs |
| 342 | + ADD CONSTRAINT jobs_pkey PRIMARY KEY (id); |
| 343 | +ALTER TABLE ONLY graphile_worker.migrations |
| 344 | + ADD CONSTRAINT migrations_pkey PRIMARY KEY (id); |
| 345 | +CREATE INDEX jobs_priority_run_at_id_idx ON graphile_worker.jobs USING btree (priority, run_at, id); |
| 346 | +CREATE TRIGGER _100_timestamps BEFORE UPDATE ON graphile_worker.jobs FOR EACH ROW EXECUTE PROCEDURE graphile_worker.tg__update_timestamp(); |
| 347 | +CREATE TRIGGER _500_decrease_job_queue_count AFTER DELETE ON graphile_worker.jobs FOR EACH ROW WHEN ((old.queue_name IS NOT NULL)) EXECUTE PROCEDURE graphile_worker.jobs__decrease_job_queue_count(); |
| 348 | +CREATE TRIGGER _500_decrease_job_queue_count_update AFTER UPDATE OF queue_name ON graphile_worker.jobs FOR EACH ROW WHEN (((new.queue_name IS DISTINCT FROM old.queue_name) AND (old.queue_name IS NOT NULL))) EXECUTE PROCEDURE graphile_worker.jobs__decrease_job_queue_count(); |
| 349 | +CREATE TRIGGER _500_increase_job_queue_count AFTER INSERT ON graphile_worker.jobs FOR EACH ROW WHEN ((new.queue_name IS NOT NULL)) EXECUTE PROCEDURE graphile_worker.jobs__increase_job_queue_count(); |
| 350 | +CREATE TRIGGER _500_increase_job_queue_count_update AFTER UPDATE OF queue_name ON graphile_worker.jobs FOR EACH ROW WHEN (((new.queue_name IS DISTINCT FROM old.queue_name) AND (new.queue_name IS NOT NULL))) EXECUTE PROCEDURE graphile_worker.jobs__increase_job_queue_count(); |
| 351 | +CREATE TRIGGER _900_notify_worker AFTER INSERT ON graphile_worker.jobs FOR EACH STATEMENT EXECUTE PROCEDURE graphile_worker.tg_jobs__notify_new_jobs(); |
| 352 | +ALTER TABLE graphile_worker.job_queues ENABLE ROW LEVEL SECURITY; |
| 353 | +ALTER TABLE graphile_worker.jobs ENABLE ROW LEVEL SECURITY; |
0 commit comments