Skip to content

Commit

Permalink
WIP: elasticsearch history support
Browse files Browse the repository at this point in the history
  • Loading branch information
jheld committed Nov 22, 2024
1 parent d689888 commit 789688c
Show file tree
Hide file tree
Showing 16 changed files with 1,079 additions and 43 deletions.
45 changes: 45 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,48 @@ Default: None
Sets the URI to which an OAuth 2.0 server redirects the user after successful authentication and authorization.

`oauth2_redirect_uri` option should be used with :ref:`auth`, :ref:`auth_provider`, :ref:`oauth2_key` and :ref:`oauth2_secret` options.

.. _elasticsearch:

elasticsearch
~~~~~~~~~~~~~

Signals the process that it should use elasticsearch for history


.. _elasticsearch_index_bulk_size:

elasticsearch_index_bulk_size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

How many documents will the elasticsearch indexer allow in a single bulk index API call.

.. _elasticsearch_index_timeout:

elasticsearch_index_timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~

How long should the background thread wait for the queue to fill up to elasticsearch_index_bulk_size

.. _elasticsearch_day_retention:

elasticsearch_day_retention
~~~~~~~~~~~~~~~~~~~~~~~~~~~

For projects that require data retention management, this will specify how many days can have indexes at once.

So if the value is 21, then any indexes older than 21 days will be deleted. This happens at startup and on day change.

.. _elasticsearch_url:

elasticsearch_url
~~~~~~~~~~~~~~~~~

Which URL is elasticsearch at?

.. _elasticsearch_dashboard:

elasticsearch_dashboard
~~~~~~~~~~~~~~~~~~~~~~~

Will the dashboard initially get its counter values from elasticsearch?
15 changes: 15 additions & 0 deletions flower/__indexer__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import absolute_import
from __future__ import print_function
import sys
from celery.bin.celery import main as _main, celery
from flower.command import indexer
from flower.utils import bugreport


def main():
celery.add_command(indexer)
sys.exit(_main())


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion flower/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
VERSION = (2, 0, 0)
VERSION = (2, 0, 0, 1)
__version__ = '.'.join(map(str, VERSION)) + '-dev'
115 changes: 115 additions & 0 deletions flower/api/elasticsearch_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import absolute_import

import logging

from tornado import web

try:
from elasticsearch import Elasticsearch, TransportError
except ImportError:
Elasticsearch = None
TransportError = None

from tornado.web import HTTPError

from ..options import options

from ..views import BaseHandler

logger = logging.getLogger(__name__)


class ElasticSearchHistoryHandler(BaseHandler):
def __init__(self):
elasticsearch_url = options.elasticsearch_url
if elasticsearch_url:
self.es = Elasticsearch([elasticsearch_url, ])
else:
self.es = None

@web.authenticated
def post(self, index_name=None):
index_name = index_name or 'task'
try:
self.es.indices.refresh(index_name)
except TransportError as e:
raise HTTPError(400, 'Invalid option: {}'.format(e))
else:
response = u'Successful refresh on index: {}'.format(index_name)
self.write(response)


class AlternativeBackendError(Exception):
pass


def list_tasks_elastic_search(argument_getter):
from elasticsearch import Elasticsearch, TransportError

elasticsearch_url = options.elasticsearch_url

es = Elasticsearch([elasticsearch_url, ])
limit = argument_getter.get_argument('limit', None)
worker = argument_getter.get_argument('workername', None)
task_name = argument_getter.get_argument('taskname', None)
state = argument_getter.get_argument('state', None)
received_start = argument_getter.get_argument('received_start', None)
received_end = argument_getter.get_argument('received_end', None)
sort_by = argument_getter.get_argument('sort_by', None)
search = argument_getter.get_argument('search', None)
started_start = argument_getter.get_argument('started_start', None)
started_end = argument_getter.get_argument('started_end', None)
root_id = argument_getter.get_argument('root_id', None)
parent_id = argument_getter.get_argument('parent_id', None)
runtime_lt = argument_getter.get_argument('runtime_lt', None)
runtime_gt = argument_getter.get_argument('runtime_gt', None)
result = []
limit = limit and int(limit)
worker = worker if worker != 'All' else None
task_name = task_name if task_name != 'All' else None
state = state if state != 'All' else None
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import Term, Range
s = Search(using=es, index='task')
try:
if worker:
s = s.filter(Term(hostname=worker))
if type:
s = s.filter(Term(name=task_name))
if state:
s = s.filter(Term(state=state))
if root_id:
s = s.filter(Term(root_id=root_id))
if parent_id:
s = s.filter(Term(parent_id=parent_id))
if received_start:
s = s.filter(Range(received_time=dict(gt=received_start)))
if received_end:
s = s.filter(Range(received_time=dict(lt=received_end)))
if started_start:
s = s.filter(Range(started_time=dict(gt=started_start)))
if started_end:
s = s.filter(Range(started_time=dict(lt=started_end)))
if runtime_lt is not None:
s = s.query(Range(runtime=dict(lt=float(runtime_lt))))
if runtime_gt is not None:
s = s.query(Range(runtime=dict(gt=float(runtime_gt))))
if limit is not None:
s = s.extra(size=limit)
if sort_by is not None:
reverse = False
if sort_by.startswith('-'):
sort_by = sort_by.lstrip('-')
reverse = True
sort_keys = {'name': str, 'state': str, 'received': float, 'started': float}
if sort_by in sort_keys:
sort_key_alias = {'name': 'name', 'state': 'state', 'received': 'received_time', 'started': 'started_time'}
s = s.sort({sort_key_alias.get(sort_by, sort_by): {"order": "desc" if reverse else "asc"}})
hit_dicts = s.execute().hits.hits
for hit_dict in hit_dicts:
result.append((hit_dict['_id'], hit_dict['_source']))
except TransportError as e:
logger.warning("Issue querying task API via Elasticsearch", exc_info=True)
raise AlternativeBackendError()
else:
return result
53 changes: 40 additions & 13 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
from tornado.ioloop import IOLoop
from tornado.web import HTTPError


try:
from flower.api.elasticsearch_history import AlternativeBackendError
except ImportError:
AlternativeBackendError = None

try:
from flower.api.elasticsearch_history import list_tasks_elastic_search
except ImportError:
list_tasks_elastic_search = None

from ..options import options
from ..utils import tasks
from ..utils.broker import Broker
from . import BaseApiHandler
Expand Down Expand Up @@ -497,36 +509,51 @@ def get(self):
:statuscode 200: no error
:statuscode 401: unauthorized request
"""
USE_ES = options.elasticsearch

app = self.application
limit = self.get_argument('limit', None)
offset = self.get_argument('offset', default=0, type=int)
worker = self.get_argument('workername', None)
type = self.get_argument('taskname', None)
state = self.get_argument('state', None)
use_es = self.get_argument('es', USE_ES)
received_start = self.get_argument('received_start', None)
received_end = self.get_argument('received_end', None)
sort_by = self.get_argument('sort_by', None)
search = self.get_argument('search', None)
started_start = self.get_argument('started_start', None)
started_end = self.get_argument('started_end', None)
root_id = self.get_argument('root_id', None)
parent_id = self.get_argument('parent_id', None)

limit = limit and int(limit)
offset = max(offset, 0)
worker = worker if worker != 'All' else None
type = type if type != 'All' else None
state = state if state != 'All' else None

result = []
for task_id, task in tasks.iter_tasks(
app.events, limit=limit, offset=offset, sort_by=sort_by, type=type,
worker=worker, state=state,
received_start=received_start,
received_end=received_end,
search=search
):
task = tasks.as_dict(task)
worker = task.pop('worker', None)
if worker is not None:
task['worker'] = worker.hostname
result.append((task_id, task))

if use_es:
try:
result = list_tasks_elastic_search(self)
except AlternativeBackendError:
use_es = False
if not use_es:
for task_id, task in tasks.iter_tasks(
app.events, limit=limit, offset=offset, sort_by=sort_by, type=type,
worker=worker, state=state,
received_start=received_start,
received_end=received_end,
search=search,
started_start=started_start, started_end=started_end,
root_id=root_id, parent_id=parent_id
):
task = tasks.as_dict(task)
worker = task.pop('worker', None)
if worker is not None:
task['worker'] = worker.hostname
result.append((task_id, task))
self.write(OrderedDict(result))


Expand Down
52 changes: 51 additions & 1 deletion flower/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import atexit
import signal
import logging
import time

from pprint import pformat

Expand All @@ -14,6 +15,8 @@
from tornado.log import enable_pretty_logging
from celery.bin.base import CeleryCommand

from flower.indexer_app import IndexerApp
from . import __version__
from .app import Flower
from .urls import settings
from .utils import abs_path, prepend_url, strtobool
Expand Down Expand Up @@ -43,8 +46,16 @@ def flower(ctx, tornado_argv):

extract_settings()
setup_logging()

app = ctx.obj.app
custom_es_setup = True
if custom_es_setup:
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()
flower_app = Flower(capp=app, options=options, **settings)


flower_app = Flower(capp=app, options=options, **settings)

atexit.register(flower_app.stop)
Expand Down Expand Up @@ -108,6 +119,7 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args):
'Please specify them after celery command instead following this template: '
'celery [celery args] flower [flower args].', incorrectly_used_args
)
logger.debug('Settings: %s', pformat(settings))


def setup_logging():
Expand Down Expand Up @@ -179,3 +191,41 @@ def print_banner(app, ssl):
pformat(sorted(app.tasks.keys()))
)
logger.debug('Settings: %s', pformat(settings))



@click.command(cls=CeleryCommand,
context_settings={
'ignore_unknown_options': True
})
@click.argument("tornado_argv", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def indexer(ctx, tornado_argv):
"""Tool for alternative task indexing in a Celery cluster."""
warn_about_celery_args_used_in_flower_command(ctx, tornado_argv)
apply_env_options()
apply_options(sys.argv[0], tornado_argv)

extract_settings()
setup_logging()
app = ctx.obj.app
custom_es_setup = True
if custom_es_setup:
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()


indexer_app = IndexerApp(capp=app, options=options, **settings)

atexit.register(indexer_app.stop)
signal.signal(signal.SIGTERM, sigterm_handler)

if not ctx.obj.quiet:
print_banner(app, 'ssl_options' in settings)

try:
indexer_app.start()
except (KeyboardInterrupt, SystemExit):
pass
Loading

0 comments on commit 789688c

Please sign in to comment.