Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ def _process_inputs(self, inputs: Dict[str, Any], action: Action) -> Dict[str, A
raise ValueError(
BASE_ERROR_MESSAGE
+ f"Inputs starting with a double underscore ({starting_with_double_underscore}) "
f"are reserved for internal use/injected inputs. "
f"are reserved for internal use/injected inputs."
"Please do not directly pass keys starting with a double underscore."
)
inputs = inputs.copy()
Expand Down Expand Up @@ -945,12 +945,13 @@ async def _astep(self, inputs: Optional[Dict[str, Any]], _run_hooks: bool = True
return None
if inputs is None:
inputs = {}
action_inputs = self._process_inputs(inputs, next_action)
if _run_hooks:
await self._adapter_set.call_all_lifecycle_hooks_sync_and_async(
"pre_run_step",
action=next_action,
state=self._state,
inputs=inputs,
inputs=action_inputs,
sequence_id=self.sequence_id,
app_id=self._uid,
partition_key=self._partition_key,
Expand All @@ -965,12 +966,10 @@ async def _astep(self, inputs: Optional[Dict[str, Any]], _run_hooks: bool = True
# TODO -- add an option/configuration to launch a thread (yikes, not super safe, but for a pure function
# which this is supposed to be its OK).
# this delegates hooks to the synchronous version, so we'll call all of them as well
# In this case we allow the self._step to do input processing
return self._step(
inputs=inputs, _run_hooks=False
next_action, result, new_state = self._step(
inputs=action_inputs, _run_hooks=False
) # Skip hooks as we already ran all of them/will run all of them in this function's finally
# In this case we want to process inputs because we run the function directly
action_inputs = self._process_inputs(inputs, next_action)
return next_action, result, new_state
if next_action.single_step:
result, new_state = await _arun_single_step_action(
next_action, self._state, inputs=action_inputs
Expand Down
Loading