happyly.listening.executor.Executor

class happyly.listening.executor.Executor(handler=<happyly.handling.dummy_handler._DummyHandler object>, deserializer=None, publisher=None, serializer=None, subscriber=None)

Bases: typing.Generic

Component which is able to run handler as a part of more complex pipeline.

Implements managing of stages inside the pipeline (deserialization, handling, serialization, publishing) and introduces callbacks between the stages which can be easily overridden.

Executor does not implement stages themselves, it takes internal implementation of stages from corresponding components: Handler, Deserializer, Publisher.

It means that Executor is universal and can work with any serialization/messaging technology depending on concrete components provided to executor’s constructor.

on_deserialization_failed(original_message, …)

Callback which is called right after deserialization failure.

on_deserialized(original_message, …)

Callback which is called right after message was deserialized successfully.

on_finished(original_message, error)

Callback which is called when pipeline finishes its execution.

on_handled(original_message, …)

Callback which is called right after message was handled (successfully or not, but without raising an exception).

on_handling_failed(original_message, …)

Callback which is called if handler’s on_handling_failed raises an exception.

on_published(original_message, …)

Callback which is called right after message was published successfully.

on_publishing_failed(original_message, …)

Callback which is called when publisher fails to publish.

on_received(original_message)

Callback which is called as soon as pipeline is run.

on_serialization_failed(original, …)

on_serialized(original_message, …)

on_stopped(original_message[, reason])

Callback which is called when pipeline is stopped via StopPipeline

run([message])

Method that starts execution of pipeline stages.

run_for_result([message])

start_listening()

handler = None

Provides implementation of handling stage to Executor.

Type: Union[Handler, Callable[[Mapping[str, Any]], Optional[Mapping[str, Any]]]]

deserializer = None

Provides implementation of deserialization stage to Executor.

If not present, no deserialization is performed.

Type: ~D

publisher = None

Provides implementation of serialization and publishing stages to Executor.

If not present, no publishing is performed.

Type: Optional[~P]

on_received(original_message)

Callback which is called as soon as pipeline is run.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters

original_message (Any) – Message as it has been received, without any deserialization

on_deserialized(original_message, deserialized_message)

Callback which is called right after message was deserialized successfully.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • deserialized_message (Mapping[str, Any]) – Message attributes after deserialization

on_deserialization_failed(original_message, error)

Callback which is called right after deserialization failure.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • error (Exception) – exception object which was raised

on_handled(original_message, deserialized_message, result)

Callback which is called right after message was handled (successfully or not, but without raising an exception).

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • deserialized_message (Mapping[str, Any]) – Message attributes after deserialization

  • result (Optional[Mapping[str, Any]]) – Result fetched from handler

on_handling_failed(original_message, deserialized_message, error)

Callback which is called if handler’s on_handling_failed raises an exception.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • deserialized_message (Mapping[str, Any]) – Message attributes after deserialization

  • error (Exception) – exception object which was raised

on_published(original_message, deserialized_message, result, serialized_message)

Callback which is called right after message was published successfully.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • deserialized_message (Optional[Mapping[str, Any]]) – Message attributes after deserialization

  • result (Optional[Mapping[str, Any]]) – Result fetched from handler

on_publishing_failed(original_message, deserialized_message, result, serialized_message, error)

Callback which is called when publisher fails to publish.

Override it in your custom Executor/Listener if needed, but don’t forget to call implementation from base class.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • deserialized_message (Optional[Mapping[str, Any]]) – Message attributes after deserialization

  • result (Optional[Mapping[str, Any]]) – Result fetched from handler

  • error (Exception) – exception object which was raised

on_finished(original_message, error)

Callback which is called when pipeline finishes its execution. Is guaranteed to be called unless pipeline is stopped via StopPipeline.

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • error (Optional[Exception]) – exception object which was raised or None

on_stopped(original_message, reason='')

Callback which is called when pipeline is stopped via StopPipeline

Parameters
  • original_message (Any) – Message as it has been received, without any deserialization

  • reason (str) – message describing why the pipeline stopped

run(message=None)

Method that starts execution of pipeline stages.

To stop the pipeline raise StopPipeline inside any callback.

Parameters

message (Optional[Any]) – Message as is, without deserialization. Or message attributes if the executor was instantiated with neither a deserializer nor a handler (useful to quickly publish message attributes by hand)