Callbacks¶
Overview¶
Executor
provides a rich
pipeline which manages stages, their failures and actions between stages.
A simplified representation of the pipeline (omitting any failures) looks like this:
Deserialization, handling, serialization and publishing are provided by Stages.
Each step of the pipeline emits an event which can be handled by the corresponding
callback.
Base classes (Executor
and BaseListener
) do nothing but logging inside
their callbacks. You can customize any step by overriding any callback in a child class:
class MyExecutor(happyly.Executor):
def on_received(original_message):
super().on_received(original_message)
original_message.ack()
def on_handling_failed(
self,
original_message: Any,
deserialized_message: Mapping[str, Any],
error: Exception,
):
super().on_handling_failed(
original_message,
deserialized_message,
error,
)
if isinstance(error, NeedToRetry):
original_message.nack()
Always invoke base class implementation first, unless you are 100% sure what you are doing.
The example above uses on_handling_failed
which is called whenever
handler raises an exception.
Actually, here’s the full picture with failures:
Note that in case deserialization fails, handling is not conducted.
Instead executor tries to get a fallback result via
Deserializer.build_error_result
and
this result is used instead of the result of handling.
What if I need an emergency stop?¶
You can raise happyly.StopPipeline
inside any callback - and the pipeline will
be stopped immediately.
Well, actually on_stopped
will be invoked then, as the last resort to finish up.
At the rest of the cases, i.e. if pipeline is not stopped, on_finished
is guaranteed to be called at the very end.