Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add custom RQ job class capability with --job-class #392

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ environment variables with prefix `RQ_DASHBOARD_*`:
- RQ_DASHBOARD_REDIS_URL=redis://<redis:6379>
- RQ_DASHBOARD_USERNAME=rq
- RQ_DASHBOARD_PASSWORD=password
- RQ_DASHBOARD_JOB_CLASS=mypackage.CustomJobClass

See more info on how to pass environment variables in [Docker documentation](https://docs.docker.com/engine/reference/commandline/run/#set-environment-variables--e---env---env-file)

Expand Down Expand Up @@ -79,6 +80,8 @@ Options:
path)
-u, --redis-url TEXT Redis URL. Can be specified multiple times.
Default: redis://127.0.0.1:6379
-j, --job-class TEXT RQ Job class to use. It will be imported at
runtime, so it must be installed.
--poll-interval, --interval INTEGER
Refresh interval in ms
--extra-path TEXT Append specified directories to sys.path
Expand Down
9 changes: 9 additions & 0 deletions rq_dashboard/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ def make_flask_app(config, username, password, url_prefix, compatibility_mode=Tr
multiple=True,
help="Redis URL. Can be specified multiple times. Default: redis://127.0.0.1:6379",
)
@click.option(
"-j",
"--job-class",
default=None,
help="RQ Job class to use. It will be imported at runtime, so it must be installed.",
)
@click.option(
"--redis-sentinels",
default=None,
Expand Down Expand Up @@ -166,6 +172,7 @@ def run(
redis_password,
redis_database,
redis_url,
job_class,
redis_sentinels,
redis_master_name,
poll_interval,
Expand Down Expand Up @@ -196,6 +203,8 @@ def run(
app.config["RQ_DASHBOARD_REDIS_URL"] = redis_url
else:
app.config["RQ_DASHBOARD_REDIS_URL"] = "redis://127.0.0.1:6379"
if job_class:
app.config["RQ_DASHBOARD_JOB_CLASS"] = job_class
if redis_host:
app.config["DEPRECATED_OPTIONS"].append("--redis-host")
if redis_port:
Expand Down
99 changes: 62 additions & 37 deletions rq_dashboard/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,14 @@
url_for,
)
from redis_sentinel_url import connect as from_url
from rq import (
VERSION as rq_version,
Queue,
Worker,
pop_connection,
push_connection,
requeue_job,
)
from rq.job import Job
from rq import VERSION as rq_version, Queue, Worker, pop_connection, push_connection
from rq.registry import (
DeferredJobRegistry,
FailedJobRegistry,
FinishedJobRegistry,
StartedJobRegistry,
)
from rq.utils import import_attribute
from six import string_types

from .legacy_config import upgrade_config
Expand Down Expand Up @@ -86,6 +79,10 @@ def push_rq_connection():
new_instance = current_app.redis_conn
push_connection(new_instance)
current_app.redis_conn = new_instance
job_class = current_app.config.get("RQ_DASHBOARD_JOB_CLASS", "rq.job.Job")
if isinstance(job_class, string_types):
job_class = import_attribute(job_class)
current_app.job_class = job_class


@blueprint.teardown_request
Expand Down Expand Up @@ -117,7 +114,9 @@ def serialize_queues(instance_number, queues):
per_page="8",
page="1",
),
failed_job_registry_count=FailedJobRegistry(q.name).count,
failed_job_registry_count=FailedJobRegistry(
q.name, job_class=current_app.job_class
).count,
failed_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -126,7 +125,9 @@ def serialize_queues(instance_number, queues):
per_page="8",
page="1",
),
started_job_registry_count=StartedJobRegistry(q.name).count,
started_job_registry_count=StartedJobRegistry(
q.name, job_class=current_app.job_class
).count,
started_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -135,7 +136,9 @@ def serialize_queues(instance_number, queues):
per_page="8",
page="1",
),
deferred_job_registry_count=DeferredJobRegistry(q.name).count,
deferred_job_registry_count=DeferredJobRegistry(
q.name, job_class=current_app.job_class
).count,
deferred_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -144,7 +147,9 @@ def serialize_queues(instance_number, queues):
per_page="8",
page="1",
),
finished_job_registry_count=FinishedJobRegistry(q.name).count,
finished_job_registry_count=FinishedJobRegistry(
q.name, job_class=current_app.job_class
).count,
finished_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand Down Expand Up @@ -212,19 +217,27 @@ def favicon():


def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page):
queue = Queue(queue_name)
queue = Queue(queue_name, job_class=current_app.job_class)
if registry_name != "queued":
if per_page >= 0:
per_page = offset + (per_page - 1)

if registry_name == "failed":
current_queue = FailedJobRegistry(queue_name)
current_queue = FailedJobRegistry(
queue_name, job_class=current_app.job_class
)
elif registry_name == "deferred":
current_queue = DeferredJobRegistry(queue_name)
current_queue = DeferredJobRegistry(
queue_name, job_class=current_app.job_class
)
elif registry_name == "started":
current_queue = StartedJobRegistry(queue_name)
current_queue = StartedJobRegistry(
queue_name, job_class=current_app.job_class
)
elif registry_name == "finished":
current_queue = FinishedJobRegistry(queue_name)
current_queue = FinishedJobRegistry(
queue_name, job_class=current_app.job_class
)
else:
current_queue = queue
total_items = current_queue.count
Expand Down Expand Up @@ -254,7 +267,7 @@ def queues_overview(instance_number):
"rq_dashboard/queues.html",
current_instance=instance_number,
instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"),
queues=Queue.all(),
queues=Queue.all(job_class=current_app.job_class),
rq_url_prefix=url_for(".queues_overview"),
rq_dashboard_version=rq_dashboard_version,
rq_version=rq_version,
Expand All @@ -275,7 +288,7 @@ def workers_overview(instance_number):
"rq_dashboard/workers.html",
current_instance=instance_number,
instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"),
workers=Worker.all(),
workers=Worker.all(job_class=current_app.job_class),
rq_url_prefix=url_for(".queues_overview"),
rq_dashboard_version=rq_dashboard_version,
rq_version=rq_version,
Expand Down Expand Up @@ -303,15 +316,15 @@ def workers_overview(instance_number):
)
def jobs_overview(instance_number, queue_name, registry_name, per_page, page):
if queue_name is None:
queue = Queue()
queue = Queue(job_class=current_app.job_class)
else:
queue = Queue(queue_name)
queue = Queue(queue_name, job_class=current_app.job_class)
r = make_response(
render_template(
"rq_dashboard/jobs.html",
current_instance=instance_number,
instance_list=current_app.config.get("RQ_DASHBOARD_REDIS_URL"),
queues=Queue.all(),
queues=Queue.all(job_class=current_app.job_class),
queue=queue,
per_page=per_page,
page=page,
Expand All @@ -331,7 +344,7 @@ def jobs_overview(instance_number, queue_name, registry_name, per_page, page):

@blueprint.route("/<int:instance_number>/view/job/<job_id>")
def job_view(instance_number, job_id):
job = Job.fetch(job_id)
job = current_app.job_class.fetch(job_id)
r = make_response(
render_template(
"rq_dashboard/job.html",
Expand All @@ -353,49 +366,59 @@ def job_view(instance_number, job_id):
@blueprint.route("/job/<job_id>/delete", methods=["POST"])
@jsonify
def delete_job_view(job_id):
job = Job.fetch(job_id)
job = current_app.job_class.fetch(job_id)
job.delete()
return dict(status="OK")


@blueprint.route("/job/<job_id>/requeue", methods=["POST"])
@jsonify
def requeue_job_view(job_id):
requeue_job(job_id, connection=current_app.redis_conn)
job = current_app.job_class.fetch(job_id, connection=current_app.redis_conn)
job.requeue()
return dict(status="OK")


@blueprint.route("/requeue/<queue_name>", methods=["GET", "POST"])
@jsonify
def requeue_all(queue_name):
fq = Queue(queue_name).failed_job_registry
fq = Queue(queue_name, job_class=current_app.job_class).failed_job_registry
job_ids = fq.get_job_ids()
count = len(job_ids)
for job_id in job_ids:
requeue_job(job_id, connection=current_app.redis_conn)
job = current_app.job_class.fetch(job_id, connection=current_app.redis_conn)
job.requeue()
return dict(status="OK", count=count)


@blueprint.route("/queue/<queue_name>/<registry_name>/empty", methods=["POST"])
@jsonify
def empty_queue(queue_name, registry_name):
if registry_name == "queued":
q = Queue(queue_name)
q = Queue(queue_name, job_class=current_app.job_class)
q.empty()
elif registry_name == "failed":
ids = FailedJobRegistry(queue_name).get_job_ids()
ids = FailedJobRegistry(
queue_name, job_class=current_app.job_class
).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "deferred":
ids = DeferredJobRegistry(queue_name).get_job_ids()
ids = DeferredJobRegistry(
queue_name, job_class=current_app.job_class
).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "started":
ids = StartedJobRegistry(queue_name).get_job_ids()
ids = StartedJobRegistry(
queue_name, job_class=current_app.job_class
).get_job_ids()
for id in ids:
delete_job_view(id)
elif registry_name == "finished":
ids = FinishedJobRegistry(queue_name).get_job_ids()
ids = FinishedJobRegistry(
queue_name, job_class=current_app.job_class
).get_job_ids()
for id in ids:
delete_job_view(id)
return dict(status="OK")
Expand All @@ -404,15 +427,17 @@ def empty_queue(queue_name, registry_name):
@blueprint.route("/queue/<queue_name>/compact", methods=["POST"])
@jsonify
def compact_queue(queue_name):
q = Queue(queue_name)
q = Queue(queue_name, job_class=current_app.job_class)
q.compact()
return dict(status="OK")


@blueprint.route("/<int:instance_number>/data/queues.json")
@jsonify
def list_queues(instance_number):
queues = serialize_queues(instance_number, sorted(Queue.all()))
queues = serialize_queues(
instance_number, sorted(Queue.all(job_class=current_app.job_class)),
)
return dict(queues=queues)


Expand Down Expand Up @@ -512,7 +537,7 @@ def list_jobs(instance_number, queue_name, registry_name, per_page, page):
@blueprint.route("/<int:instance_number>/data/job/<job_id>.json")
@jsonify
def job_info(instance_number, job_id):
job = Job.fetch(job_id)
job = current_app.job_class.fetch(job_id)
return dict(
id=job.id,
created_at=serialize_date(job.created_at),
Expand Down Expand Up @@ -542,7 +567,7 @@ def serialize_queue_names(worker):
version=getattr(worker, "version", ""),
python_version=getattr(worker, "python_version", ""),
)
for worker in Worker.all()
for worker in Worker.all(job_class=current_app.job_class)
),
key=lambda w: (w["state"], w["queues"], w["name"]),
)
Expand Down
16 changes: 16 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ def test_worker_version_field(self):
w.register_death()


class CustomRqClassTestCase(BasicTestCase):
def setUp(self):
super().setUp()
self.app.config['RQ_DASHBOARD_JOB_CLASS'] = 'rq.job.Job'

def test_invalid_job_class(self):
self.app.config['RQ_DASHBOARD_JOB_CLASS'] = 'rq.job.NotAJobClass'

with self.assertRaises(AttributeError) as ae:
self.client.get('/0/data/queues.json')

self.assertIn('NotAJobClass', str(ae.exception))



__all__ = [
'BasicTestCase',
'CustomRqClassTestCase'
]