-
-
Notifications
You must be signed in to change notification settings - Fork 302
Expand file tree
/
Copy pathschedules.py
More file actions
117 lines (92 loc) · 3.65 KB
/
schedules.py
File metadata and controls
117 lines (92 loc) · 3.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import datetime
import logging
import django_rq
from redis.exceptions import ConnectionError
from vulnerabilities.tasks import enqueue_pipeline
log = logging.getLogger(__name__)
scheduler = django_rq.get_scheduler()
def schedule_execution(pipeline_schedule, execute_now=False):
"""
Takes a `PackageSchedule` object as input and schedule a
recurring job using `rq_scheduler` to execute the pipeline.
"""
queue_name = pipeline_schedule.get_run_priority_display()
first_execution = datetime.datetime.now(tz=datetime.timezone.utc)
if not execute_now:
first_execution = pipeline_schedule.next_run_date
interval_in_seconds = pipeline_schedule.run_interval * 60 * 60
job = scheduler.schedule(
scheduled_time=first_execution,
func=enqueue_pipeline,
args=[pipeline_schedule.pipeline_id],
interval=interval_in_seconds,
repeat=None,
queue_name=queue_name,
)
return job._id
def scheduled_job_exists(job_id):
"""
Check if a scheduled job with the given job ID exists.
"""
return job_id and (job_id in scheduler)
def clear_job(job):
"""
Take a job object or job ID as input
and cancel the corresponding scheduled job.
"""
return scheduler.cancel(job)
def clear_zombie_pipeline_schedules(logger=log):
"""
Clear scheduled jobs not associated with any PackageSchedule object.
"""
from vulnerabilities.models import PipelineSchedule
schedule_ids = PipelineSchedule.objects.all().values_list("schedule_work_id", flat=True)
for job in scheduler.get_jobs():
if job._id not in schedule_ids:
logger.info(f"Deleting scheduled job {job}")
clear_job(job)
def is_redis_running(logger=log):
"""
Check the status of the Redis server.
"""
try:
connection = django_rq.get_connection()
return connection.ping()
except ConnectionError as e:
error_message = f"Error checking Redis status: {e}. Redis is not reachable."
logger.error(error_message)
return False
def update_pipeline_schedule():
"""Create schedules for new pipelines and delete schedules for removed pipelines."""
from vulnerabilities.importers import IMPORTERS_REGISTRY
from vulnerabilities.improvers import IMPROVERS_REGISTRY
from vulnerabilities.models import PipelineSchedule
from vulnerabilities.pipelines.exporters import EXPORTERS_REGISTRY
pipelines = IMPORTERS_REGISTRY | IMPROVERS_REGISTRY | EXPORTERS_REGISTRY
PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete()
for id, pipeline_class in pipelines.items():
run_once = getattr(pipeline_class, "run_once", False)
run_interval = getattr(pipeline_class, "run_interval", 24)
run_priority = getattr(
pipeline_class, "run_priority", PipelineSchedule.ExecutionPriority.DEFAULT
)
pipeline, created = PipelineSchedule.objects.get_or_create(
pipeline_id=id,
defaults={
"is_run_once": run_once,
"run_interval": run_interval,
"run_priority": run_priority,
},
)
if not created:
pipeline.run_priority = run_priority
pipeline.run_interval = run_interval
pipeline.save()