Stages

Deserializer

The simplest deserializer is a function which takes a received message and returns a dict of attributes.

Here is an imaginary example:

def get_attributes_from_my_message(message):
    data = message.get_bytes().decode('utf-8')
    return json.loads(data)

You’ll need a different deserializer for different message transport technologies or serialization formats.

The same deserializer can be written as a class:

class MyDeserializer(happyly.Deserializer):
    def deserialize(self, message):
        data = message.get_bytes().decode('utf-8')
        return json.loads(data)

A class-based deserializer can implement a fallback method that constructs an error result:

class MyDeserializer(happyly.Deserializer):
    def deserialize(self, message):
        data = message.get_bytes().decode('utf-8')
        return json.loads(data)

    def build_error_result(self, message, error):
        return {'status': 'failed', 'error': repr(error)}

Note that if deserialization fails, then handling is skipped and the return value of build_error_result is used as a result of handling.

Class-based deserializer are also useful for parametrization, e.g. with message schemas.

Serializer

Serialization happens to the result provided by handler. This step is optional. It is useful when publishing occurs, or when the value is retrieved with Executor.run_for_result().

The simplest serializer is a function that takes dict as an input and returns… well, whatever you need.

def prepare_response(message_attributes):
    resp = flask.jsonify(message_attributes)
    if 'error' in attributes:
        resp.status = 400
    return resp

As usual, there is a class-based approach:

class MySerializer(happyly.Serializer):

    def serialize(message_attributes):
        resp = flask.jsonify(message_attributes)
        if 'error' in attributes:
            resp.status = 400
        return resp

Publisher

After result is serialized it can be either returned (if Executor.run_for_result() is used) or published (if Executor.run() is used). Note that publishing is an optional step - executor that just does the things without sending a message is a valid one too.

Publisher can be defined as a function which takes the only argument - a serialized message.

def publish_my_result(serialized_message):
    my_client.publish_a_message(serialized_message)

If you’d like a class-based approach, please subclass happyly.BasePublisher(). Here’s how one of the Happyly’s components is implemented:

class GooglePubSubPublisher(happyly.BasePublisher):
    def publish(self, serialized_message: Any):
        future = self._publisher_client.publish(
            f'projects/{self.project}/topics/{self.to_topic}', serialized_message
        )
        try:
            future.result()
            return
        except Exception as e:
            raise e

    def __init__(self, project: str, to_topic: str):
        super().__init__()
        self.project = project
        self.to_topic = to_topic
        self._publisher_client = pubsub_v1.PublisherClient()