Welcome to Happyly’s documentation!

Happyly is a scalable solution for systems which handle any kind of messages.

Happyly helps to abstract your business logic from messaging stuff, so that your code is maintainable and ensures separation of concerns.

Have you ever seen a codebase where serialization, message queue managing and business logic are mixed together like a spaghetti? I have. Imagine switching between Google Pub/Sub and Django REST Framework. Or Celery. This shouldn’t be a nightmare but it often is.

Here’s the approach of Happyly:

  • Write you business logic in universal Handlers, which don’t care at all how you serialize things or send them over network etc.

  • Describe your schemas using ORM/Framework-agnostic technology.

  • Plug-in any details of messaging protocol, serialization and networking. Change them with different drop-in replacements at any time.

Happyly can be used with Flask, Celery, Django, Kafka or whatever technology which can be utilized for messaging. Happyly also provides first-class support of Google Pub/Sub.

Use cases

Google Pub/Sub

Let’s be honest, the official Python client library is too low-level. You must serialize and deserialize things manually, as well as to ack and nack messages.

Usual way:

def callback(message):
    attributes = json.loads(message.data)
    try:
        result = process_things(attributes['ID'])
        encoded = json.dumps(result).encode('utf-8')
        PUBLISHER.publish(TOPIC, encoded)
    except NeedToRetry:
        _LOGGER.info('Not acknowledging, will retry later.')
    except Exception:
        _LOGGER.error('An error occured')
        message.ack()
    else:
        message.ack()

Happyly way:

def handle_my_stuff(message: dict):
    try:
        return process_things(message['ID'])
    except NeedToRetry as error:
        raise error from error
    except Exception:
        _LOGGER.error('An error occured')

handle_my_stuff is now also usable with Celery or Flask. Or with yaml serialization. Or with message.attributes instead of message.data. Without any change.

Painless transport change

Let’s say you are prototyping your project with Flask and are planning to move to Celery for better fault tolerance then. Or to Google Pub/Sub. You just haven’t decided yet.

Easy! Here’s how Happyly can help.

  1. Define your message schemas.

class MyInputSchema(happyly.Schema):
    request_id = marshmallow.fields.Str(required=True)

class MyOutputSchema(happyly.Schema):
    request_id = marshmallow.fields.Str(required=True)
    result = marshmallow.fields.Str(required=True)
    error = marshmallow.fields.Str()
  1. Define your handler

def handle_things(message: dict):
    try:
        req_id = message['request_id']
        if req_id in ALLOWED:
            result = get_result_for_id(req_id)
        else:
            result = 'not allowed'
        return {
            'request_id': req_id,
            'result': result,
        }
    except Exception as error:
        return {
            'request_id': message['request_id'],
            'result': 'error',
            'error': str(error),
        }
  1. Plug it into Flask:

@app.route('/', methods=['POST'])
def root():
    executor = happyly.Executor(
        handler=handle_things,
        deserializer=DummyValidator(schema=MyInputSchema()),
        serializer=JsonifyForSchema(schema=MyOutputSchema()),
    )
    request_data = request.get_json()
    return executor.run_for_result(request_data)
  1. Painlessly switch to Celery when you need:

@celery.task('hello')
def hello(message):
    result = happyly.Executor(
        handler=ProcessThings(),
        serializer=happyly.DummyValidator(schema=MyInputSchema()),
        deserializer=happyly.DummyValidator(schema=MyOutputSchema()),
    ).run_for_result(
        message
    )
    return result
  1. Or to Google Pub/Sub:

happyly.LateAckExecutor(
    subscriber=happyly.google_pubsub.GooglePubSubSubscriber(
        project='my_project',
        subscription_name='my_subscription',
    ),
    handler=ProcessThings(),
    deserializer=happyly.google_pubsub.JSONDeserializerWithRequestIdRequired(
        schema=MyInputSchema()
    ),
    serializer=happyly.google_pubsub.BinaryJSONSerializer(
        schema=MyOutputSchema()
    ),
    publisher=happyly.google_pubsub.GooglePubSubPublisher(
        topic='my_topic',
        project='my_project',
    ),
 ).start_listening()

5. Move to any other technology. Or swap serializer to another. Do whatever you need while your handler and schemas remain absolutely the same.

Installation

Happyly is hosted on PyPI, so you can use:

pip install happyly

There are extra dependencies for some components.

Happyly with Google Pub/Sub components are to be installed this way:

pip install happyly[google-cloud-pubsub]

If you want to use Happyly’s components for Flask, install it like this:

pip install happyly[flask]

There is also an extra dependency which enables cached components via Redis. If you need it, install Happyly like this:

pip install happyly[redis]

Key Concepts

Handler

Handler is the main concept of all Happyly library. Basically a handler is a callable which implements business logic, and nothing else:

  • No serialization/deserialiation here

  • No sending stuff over the network

  • No message queues’ related stuff

Let the handler do its job!

To create a handler you can simply define a function which takes a dict as an input and returns a dict:

def handle_my_stuff(message: dict):
    try
        db.update(message['user'], message['status'])
        return {
            'request_id': message['request_id'],
            'action': 'updated',
        }
    except Exception:
        return {
            'action': 'failed'
        }

Done! This handler can be plugged into your application: whether it uses Flask or Celery or whatever.

Note that you are allowed to return nothing if you don’t actually need a result from your handler. This handler is also valid:

def handle_another_stuff(message: dict):
    try
        neural_net.start_job(message['id'])
        _LOGGER.info('Job created')
    except Exception:
        _LOGGER.warning('Failed to create a job')

If you prefer class-based approach, Happyly can satisfy you too. Subclass happyly.Handler() and implement the following methods:

class MyHandler(happyly.Handler):

    def handle(message: dict)
        db.update(message['user'], message['status'])
        return {
            'request_id': message['request_id'],
            'action': 'updated',
        }

    def on_handling_failed(message: dict, error)
        return {
            'action': 'failed'
        }

Instance of MyHandler is equivalent to handle_my_stuff

Executor

To plug a handler into your application you will need happyly.Executor() (or one of its subclasses).

Executor brings the handler into a context of more pipeline steps:

  • deserialization

  • handling itself

  • serialization (optional)

  • publishing (optional)

So a typical construction of an Executor looks like this:

my_executor = Executor(
  deserializer=...
  handler=...
  serializer=...
  publisher=...
)

Executor implements two crucial methods: run() and run_for_result(). run(message) starts an execution pipeline for the provided message. run() returns nothing but can optionally publish a serialized result of handling.

_images/run.png

If you’d like to deal with the result by yourself, use run_for_result() which returns a serialized result of handling.

images/run_for_result.png

Executor manages all the stages of the pipeline, including situation when some stage fails. But the implementation of any stage itself (deserialization, handling, serialization, publishing) is provided to a constructor during executor instantiation.

You can use pre-made implementation of stages provided by Happyly or create you own (see Stages)

To customize what happens between the stages use Callbacks.

Probably you don’t want to invoke run() each time. You can bind an executor to some event by passing a subscriber to Executor()’s constructor.

There used to be a special component - Listener - for that, but it is deprecated now.

Stages

Deserializer

The simplest deserializer is a function which takes a received message and returns a dict of attributes.

Here is an imaginary example:

def get_attributes_from_my_message(message):
    data = message.get_bytes().decode('utf-8')
    return json.loads(data)

You’ll need a different deserializer for different message transport technologies or serialization formats.

The same deserializer can be written as a class:

class MyDeserializer(happyly.Deserializer):
    def deserialize(self, message):
        data = message.get_bytes().decode('utf-8')
        return json.loads(data)

A class-based deserializer can implement a fallback method that constructs an error result:

class MyDeserializer(happyly.Deserializer):
    def deserialize(self, message):
        data = message.get_bytes().decode('utf-8')
        return json.loads(data)

    def build_error_result(self, message, error):
        return {'status': 'failed', 'error': repr(error)}

Note that if deserialization fails, then handling is skipped and the return value of build_error_result is used as a result of handling.

Class-based deserializer are also useful for parametrization, e.g. with message schemas.

Serializer

Serialization happens to the result provided by handler. This step is optional. It is useful when publishing occurs, or when the value is retrieved with Executor.run_for_result().

The simplest serializer is a function that takes dict as an input and returns… well, whatever you need.

def prepare_response(message_attributes):
    resp = flask.jsonify(message_attributes)
    if 'error' in attributes:
        resp.status = 400
    return resp

As usual, there is a class-based approach:

class MySerializer(happyly.Serializer):

    def serialize(message_attributes):
        resp = flask.jsonify(message_attributes)
        if 'error' in attributes:
            resp.status = 400
        return resp

Publisher

After result is serialized it can be either returned (if Executor.run_for_result() is used) or published (if Executor.run() is used). Note that publishing is an optional step - executor that just does the things without sending a message is a valid one too.

Publisher can be defined as a function which takes the only argument - a serialized message.

def publish_my_result(serialized_message):
    my_client.publish_a_message(serialized_message)

If you’d like a class-based approach, please subclass happyly.BasePublisher(). Here’s how one of the Happyly’s components is implemented:

class GooglePubSubPublisher(happyly.BasePublisher):
    def publish(self, serialized_message: Any):
        future = self._publisher_client.publish(
            f'projects/{self.project}/topics/{self.to_topic}', serialized_message
        )
        try:
            future.result()
            return
        except Exception as e:
            raise e

    def __init__(self, project: str, to_topic: str):
        super().__init__()
        self.project = project
        self.to_topic = to_topic
        self._publisher_client = pubsub_v1.PublisherClient()

Callbacks

Overview

Executor provides a rich pipeline which manages stages, their failures and actions between stages.

A simplified representation of the pipeline (omitting any failures) looks like this:

_images/callbacks.png

Deserialization, handling, serialization and publishing are provided by Stages.

Each step of the pipeline emits an event which can be handled by the corresponding callback. Base classes (Executor and BaseListener) do nothing but logging inside their callbacks. You can customize any step by overriding any callback in a child class:

class MyExecutor(happyly.Executor):

    def on_received(original_message):
        super().on_received(original_message)
        original_message.ack()

    def on_handling_failed(
        self,
        original_message: Any,
        deserialized_message: Mapping[str, Any],
        error: Exception,
    ):
        super().on_handling_failed(
            original_message,
            deserialized_message,
            error,
        )
        if isinstance(error, NeedToRetry):
            original_message.nack()

Always invoke base class implementation first, unless you are 100% sure what you are doing.

The example above uses on_handling_failed which is called whenever handler raises an exception. Actually, here’s the full picture with failures:

_images/callbacks_with_failures.png

Note that in case deserialization fails, handling is not conducted. Instead executor tries to get a fallback result via Deserializer.build_error_result and this result is used instead of the result of handling.

What if I need an emergency stop?

You can raise happyly.StopPipeline inside any callback - and the pipeline will be stopped immediately. Well, actually on_stopped will be invoked then, as the last resort to finish up.

_images/stop.png

At the rest of the cases, i.e. if pipeline is not stopped, on_finished is guaranteed to be called at the very end.

Indices and tables