Callbacks

Overview

Executor provides a rich pipeline which manages stages, their failures and actions between stages.

A simplified representation of the pipeline (omitting any failures) looks like this:

_images/callbacks.png

Deserialization, handling, serialization and publishing are provided by Stages.

Each step of the pipeline emits an event which can be handled by the corresponding callback. Base classes (Executor and BaseListener) do nothing but logging inside their callbacks. You can customize any step by overriding any callback in a child class:

class MyExecutor(happyly.Executor):

    def on_received(original_message):
        super().on_received(original_message)
        original_message.ack()

    def on_handling_failed(
        self,
        original_message: Any,
        deserialized_message: Mapping[str, Any],
        error: Exception,
    ):
        super().on_handling_failed(
            original_message,
            deserialized_message,
            error,
        )
        if isinstance(error, NeedToRetry):
            original_message.nack()

Always invoke base class implementation first, unless you are 100% sure what you are doing.

The example above uses on_handling_failed which is called whenever handler raises an exception. Actually, here’s the full picture with failures:

_images/callbacks_with_failures.png

Note that in case deserialization fails, handling is not conducted. Instead executor tries to get a fallback result via Deserializer.build_error_result and this result is used instead of the result of handling.

What if I need an emergency stop?

You can raise happyly.StopPipeline inside any callback - and the pipeline will be stopped immediately. Well, actually on_stopped will be invoked then, as the last resort to finish up.

_images/stop.png

At the rest of the cases, i.e. if pipeline is not stopped, on_finished is guaranteed to be called at the very end.