Skip to content

Commit

Permalink
Feature: Add node event hook #214 (#218)
Browse files Browse the repository at this point in the history
Signed-off-by: benero <[email protected]>
Co-authored-by: 韩数 <[email protected]>
  • Loading branch information
benero and hanshuaikang authored Jan 4, 2024
1 parent b40a935 commit ef7102c
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ celerybeat-schedule

# dotenv
.env
.envrc

# virtualenv
venv/
Expand All @@ -99,4 +100,3 @@ webpack_cache

# Editor setting file
.vscode/

4 changes: 2 additions & 2 deletions bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ def schedule(
# engine context prepare
set_node_info(CurrentNodeInfo(node_id=node_id, version=state.version, loop=state.loop))

# schedule alredy finished
# schedule already finished
if schedule.finished:
logger.warning(
"root pipeline[%s] schedule(%s) %s with version %s already finished",
Expand Down Expand Up @@ -1190,7 +1190,7 @@ def schedule(
interrupter.check_and_set(ScheduleKeyPoint.APPLY_LOCK_DONE, lock_get=lock_get)

if not lock_get:
# only retry at multiple calback type
# only retry at multiple callback type
if schedule.type is not ScheduleType.MULTIPLE_CALLBACK:
logger.info(
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, "
Expand Down
38 changes: 37 additions & 1 deletion bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

# plugin interface

__version__ = "7.1.0"
__version__ = "7.1.1"


def version():
Expand Down Expand Up @@ -598,6 +598,15 @@ class TaskMixin:
引擎任务派发相关接口
"""

def pre_execute(self, root_pipeline_id: str, node_id: str):
"""
执行节点前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def execute(
self,
Expand Down Expand Up @@ -625,6 +634,24 @@ def execute(
:type headers: Optional[dict]
"""

def post_execute(self, root_pipeline_id: str, node_id: str):
"""
执行节点后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

def pre_schedule(self, root_pipeline_id: str, node_id: str):
"""
派发调度任务前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def schedule(
self,
Expand Down Expand Up @@ -652,6 +679,15 @@ def schedule(
:type headers: Optional[dict]
"""

def post_schedule(self, root_pipeline_id: str, node_id: str):
"""
派发调度任务后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""

@abstractmethod
def set_next_schedule(
self,
Expand Down
8 changes: 8 additions & 0 deletions bamboo_engine/eri/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ class HookType(Enum):
EXECUTE = "execute"
# 节点 schedule
SCHEDULE = "schedule"
# 节点 execute 前
PRE_EXECUTE = "pre_execute"
# 节点 execute 后
POST_EXECUTE = "post_execute"
# 节点 schedule 前
PRE_SCHEDULE = "pre_schedule"
# 节点 schedule 后
POST_SCHEDULE = "post_schedule"


class InterruptEvent:
Expand Down
36 changes: 36 additions & 0 deletions bamboo_engine/handlers/service_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,25 @@ def execute(
root_pipeline_data=root_pipeline_data,
)
try:
self.runtime.pre_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.PRE_EXECUTE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
execute_success = service.execute(data=service_data, root_pipeline_data=root_pipeline_data)
# 只有执行成功才会调用执行 post_execute
if execute_success:
self.runtime.post_execute(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.POST_EXECUTE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
except Exception:
ENGINE_EXECUTE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc()
ex_data = traceback.format_exc()
Expand Down Expand Up @@ -512,12 +530,30 @@ def schedule(
)
else:
try:
self.runtime.pre_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.PRE_SCHEDULE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
schedule_success = service.schedule(
schedule=schedule,
data=service_data,
root_pipeline_data=root_pipeline_data,
callback_data=callback_data,
)
# 只有调度成功才会执行 post_schedule
if schedule_success:
self.runtime.post_schedule(root_pipeline_id=root_pipeline_id, node_id=self.node.id)
self.hook_dispatch(
hook=HookType.POST_SCHEDULE,
root_pipeline_id=root_pipeline_id,
service=service,
service_data=service_data,
root_pipeline_data=root_pipeline_data,
)
except Exception:
ENGINE_SCHEDULE_EXCEPTION_COUNT.labels(type=node_type, hostname=self._hostname).inc()
service_data.outputs.ex_data = traceback.format_exc()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bamboo-engine"
version = "2.10.1"
version = "2.10.2"
description = "Bamboo-engine is a general-purpose workflow engine"
authors = ["homholueng <[email protected]>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ def node_finish(self, data, parent_data):
"""节点执行结束"""
return True

def pre_execute(self, data, parent_data):
"""节点执行前"""
return True

def post_execute(self, data, parent_data):
"""节点执行后"""
return True

def pre_schedule(self, data, parent_data):
"""节点调度前"""
return True

def post_schedule(self, data, parent_data):
"""节点调度后"""
return True


class ServiceActivity(Activity):
result_bit = "_result"
Expand Down
56 changes: 56 additions & 0 deletions runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,59 @@ def node_finish(self, root_pipeline_id: str, node_id: str):
event_type=self.node_finish.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def pre_execute(self, root_pipeline_id: str, node_id: str):
"""
节点执行前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.pre_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def post_execute(self, root_pipeline_id: str, node_id: str):
"""
节点执行后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.post_execute.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def pre_schedule(self, root_pipeline_id: str, node_id: str):
"""
节点调度前需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.pre_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)

def post_schedule(self, root_pipeline_id: str, node_id: str):
"""
节点调度后需要执行的钩子
:param root_pipeline_id: 任务ID
:type node_id: str
:param node_id: 节点ID
:type node_id: str
"""
self._send_event(
PipelineEvent(
event_type=self.post_schedule.__name__, data={"root_pipeline_id": root_pipeline_id, "node_id": node_id}
)
)
2 changes: 1 addition & 1 deletion runtime/bamboo-pipeline/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "bamboo-pipeline"
version = "3.29.1"
version = "3.29.2"
description = "runtime for bamboo-engine base on Django and Celery"
authors = ["homholueng <[email protected]>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@

from pipeline.eri.runtime import BambooDjangoRuntime

from bamboo_engine.builder import * # noqa
from bamboo_engine.builder import (
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)
from bamboo_engine.engine import Engine

from ..utils import * # noqa
from ..utils import (
assert_all_finish,
assert_all_running,
assert_exec_data_equal,
assert_schedule_finish,
assert_schedule_not_finish,
sleep,
)


def test_callback_node_success():
Expand Down Expand Up @@ -38,10 +50,22 @@ def test_callback_node_success():
"_inner_loop": 1,
"_loop": 1,
"_result": True,
"hook_call_order": ["node_enter", "execute", "schedule"],
"hook_call_order": [
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"post_schedule",
],
"pre_execute": 1,
"execute": 1,
"post_execute": 1,
"node_enter": 1,
"pre_schedule": 1,
"schedule": 1,
"post_schedule": 1,
},
}
}
Expand Down Expand Up @@ -88,16 +112,32 @@ def test_multi_callback_node_success():
"_result": True,
"hook_call_order": [
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"post_schedule",
"pre_schedule",
"schedule",
"post_schedule",
"pre_schedule",
"schedule",
"post_schedule",
"pre_schedule",
"schedule",
"post_schedule",
"pre_schedule",
"schedule",
"post_schedule",
],
"pre_execute": 1,
"execute": 1,
"post_execute": 1,
"node_enter": 1,
"pre_schedule": 5,
"schedule": 5,
"post_schedule": 5,
"_scheduled_times": 5,
},
}
Expand Down Expand Up @@ -139,14 +179,20 @@ def test_callback_node_fail_and_skip():
"node_enter": 1,
"hook_call_order": [
"node_enter",
"pre_execute",
"execute",
"post_execute",
"pre_schedule",
"schedule",
"node_schedule_fail",
],
"pre_execute": 1,
"execute": 1,
"post_execute": 1,
"_result": False,
"_loop": 1,
"_inner_loop": 1,
"pre_schedule": 1,
"schedule": 1,
"node_schedule_fail": 1,
},
Expand Down
Loading

0 comments on commit ef7102c

Please sign in to comment.