Welcome to Happyly’s documentation!¶
Happyly is a scalable solution for systems which handle any kind of messages.
Happyly helps to abstract your business logic from messaging stuff, so that your code is maintainable and ensures separation of concerns.
Have you ever seen a codebase where serialization, message queue managing and business logic are mixed together like a spaghetti? I have. Imagine switching between Google Pub/Sub and Django REST Framework. Or Celery. This shouldn’t be a nightmare but it often is.
Here’s the approach of Happyly:
Write you business logic in universal Handlers, which don’t care at all how you serialize things or send them over network etc.
Describe your schemas using ORM/Framework-agnostic technology.
Plug-in any details of messaging protocol, serialization and networking. Change them with different drop-in replacements at any time.
Happyly can be used with Flask, Celery, Django, Kafka or whatever technology which can be utilized for messaging. Happyly also provides first-class support of Google Pub/Sub.
Use cases¶
Google Pub/Sub¶
Let’s be honest, the official
Python client library
is too low-level.
You must serialize and deserialize things manually,
as well as to ack
and nack
messages.
Usual way:
def callback(message):
attributes = json.loads(message.data)
try:
result = process_things(attributes['ID'])
encoded = json.dumps(result).encode('utf-8')
PUBLISHER.publish(TOPIC, encoded)
except NeedToRetry:
_LOGGER.info('Not acknowledging, will retry later.')
except Exception:
_LOGGER.error('An error occured')
message.ack()
else:
message.ack()
Happyly way:
def handle_my_stuff(message: dict):
try:
return process_things(message['ID'])
except NeedToRetry as error:
raise error from error
except Exception:
_LOGGER.error('An error occured')
handle_my_stuff
is now also usable with Celery or Flask.
Or with yaml serialization.
Or with message.attributes
instead of message.data
.
Without any change.
Painless transport change¶
Let’s say you are prototyping your project with Flask and are planning to move to Celery for better fault tolerance then. Or to Google Pub/Sub. You just haven’t decided yet.
Easy! Here’s how Happyly can help.
Define your message schemas.
class MyInputSchema(happyly.Schema):
request_id = marshmallow.fields.Str(required=True)
class MyOutputSchema(happyly.Schema):
request_id = marshmallow.fields.Str(required=True)
result = marshmallow.fields.Str(required=True)
error = marshmallow.fields.Str()
Define your handler
def handle_things(message: dict):
try:
req_id = message['request_id']
if req_id in ALLOWED:
result = get_result_for_id(req_id)
else:
result = 'not allowed'
return {
'request_id': req_id
'result': result
}
except Exception as error:
return {
'request_id': message['request_id']
'result': 'error',
'error': str(error)
}
Plug it into Flask:
@app.route('/', methods=['POST'])
def root():
executor = happyly.Executor(
handler=handle_things,
deserializer=DummyValidator(schema=MyInputSchema()),
serializer=JsonifyForSchema(schema=MyOutputSchema()),
)
request_data = request.get_json()
return executor.run_for_result(request_data)
Painlessly switch to Celery when you need:
@celery.task('hello')
def hello(message):
result = happyly.Executor(
handler=ProcessThings(),
serializer=happyly.DummyValidator(schema=MyInputSchema()),
deserializer=happyly.DummyValidator(schema=MyOutputSchema()),
).run_for_result(
message
)
return result
Or to Google Pub/Sub:
happyly.Listener(
subscriber=happyly.google_pubsub.GooglePubSubSubscriber(
project='my_project',
subscription_name='my_subscription',
),
handler=ProcessThings(),
deserializer=happyly.google_pubsub.JSONDeserializerWithRequestIdRequired(
schema=MyInputSchema()
),
serializer=happyly.google_pubsub.BinaryJSONSerializer(
schema=MyOutputSchema()
),
publisher=happyly.google_pubsub.GooglePubSubPublisher(
topic='my_topic',
project='my_project',
),
).start_listening()
5. Move to any other technology. Or swap serializer to another. Do whatever you need while your handler and schemas remain absolutely the same.
Installation¶
Happyly is hosted on PyPI, so you can use:
pip install happyly
There are extra dependencies for some components. If you want to use Happyly’s components for Flask, install it like this:
pip install happyly[flask]
There is also an extra dependency which enables cached components via Redis. If you need it, install Happyly like this:
pip install happyly[redis]
Key Concepts¶
Handler¶
Handler is the main concept of all Happyly library. Basically a handler is a callable which implements business logic, and nothing else:
No serialization/deserialiation here
No sending stuff over the network
No message queues’ related stuff
Let the handler do its job!
To create a handler you can simply define a function which takes a dict
as an input
and returns a dict
:
def handle_my_stuff(message: dict):
try
db.update(message['user'], message['status'])
return {
'request_id': message['request_id'],
'action': 'updated',
}
except Exception:
return {
'action': 'failed'
}
Done! This handler can be plugged into your application: whether it uses Flask or Celery or whatever.
Note that you are allowed to return nothing if you don’t actually need a result from your handler. This handler is also valid:
def handle_another_stuff(message: dict):
try
neural_net.start_job(message['id'])
_LOGGER.info('Job created')
except Exception:
_LOGGER.warning('Failed to create a job')
If you prefer class-based approach, Happyly can satisfy you too.
Subclass happyly.Handler()
and implement the following methods:
class MyHandler(happyly.Handler):
def handle(message: dict)
db.update(message['user'], message['status'])
return {
'request_id': message['request_id'],
'action': 'updated',
}
def on_handling_failed(message: dict, error)
return {
'action': 'failed'
}
Instance of MyHandler
is equivalent to handle_my_stuff
Executor¶
To plug a handler into your application you will need happyly.Executor()
(or one of its subclasses).
Executor brings the handler into a context of more pipeline steps:
deserialization
handling itself
serialization (optional)
publishing (optional)
So a typical construction of an Executor looks like this:
my_executor = Executor(
deserializer=...
handler=...
serializer=...
publisher=...
)
Executor implements two crucial methods: run()
and run_for_result()
.
run(message)
starts an execution pipeline for the provided message.
run()
returns nothing but can optionally publish a serialized result of
handling.

If you’d like to deal with the result by yourself, use run_for_result()
which returns a serialized result of handling.

Executor manages all the stages of the pipeline, including situation when some stage fails. But the implementation of any stage itself (deserialization, handling, serialization, publishing) is provided to a constructor during executor instantiation.
You can use pre-made implementation of stages provided by Happyly or create you own (see Stages)
To customize what happens between the stages use Callbacks.
Listener¶
Probably you don’t want to invoke run()
each time.
You can bind an executor to some event by creating a BaseListener()
.
BaseListener
is a subsclass of Executor
which is all the same
but has two additions:
the constructor requires one more parameter - subscriber;
one more method added -
BaseListener.start_listening()
.
Stages¶
Deserializer¶
The simplest deserializer is a function which takes a received message and returns a dict of attributes.
Here is an imaginary example:
def get_attributes_from_my_message(message):
data = message.get_bytes().decode('utf-8')
return json.loads(data)
You’ll need a different deserializer for different message transport technologies or serialization formats.
The same deserializer can be written as a class:
class MyDeserializer(happyly.Deserializer):
def deserialize(self, message):
data = message.get_bytes().decode('utf-8')
return json.loads(data)
A class-based deserializer can implement a fallback method that constructs an error result:
class MyDeserializer(happyly.Deserializer):
def deserialize(self, message):
data = message.get_bytes().decode('utf-8')
return json.loads(data)
def build_error_result(self, message, error):
return {'status': 'failed', 'error': repr(error)}
Note that if deserialization fails, then handling is skipped
and the return value of build_error_result
is used
as a result of handling.
Class-based deserializer are also useful for parametrization, e.g. with message schemas.
Serializer¶
Serialization happens to the result provided by handler.
This step is optional.
It is useful when publishing occurs, or when the value is retrieved
with Executor.run_for_result()
.
The simplest serializer is a function that takes
dict
as an input and returns… well, whatever you need.
def prepare_response(message_attributes):
resp = flask.jsonify(message_attributes)
if 'error' in attributes:
resp.status = 400
return resp
As usual, there is a class-based approach:
class MySerializer(happyly.Serializer):
def serialize(message_attributes):
resp = flask.jsonify(message_attributes)
if 'error' in attributes:
resp.status = 400
return resp
Publisher¶
After result is serialized it can be either returned
(if Executor.run_for_result()
is used) or published
(if Executor.run()
is used).
Note that publishing is an optional step - executor that just does the things
without sending a message is a valid one too.
Publisher can be defined as a function which takes the only argument - a serialized message.
def publish_my_result(serialized_message):
my_client.publish_a_message(serialized_message)
If you’d like a class-based approach,
please subclass happyly.BasePublisher()
.
Here’s how one of the Happyly’s components is implemented:
class GooglePubSubPublisher(happyly.BasePublisher):
def publish(self, serialized_message: Any):
future = self._publisher_client.publish(
f'projects/{self.project}/topics/{self.to_topic}', serialized_message
)
try:
future.result()
return
except Exception as e:
raise e
def __init__(self, project: str, to_topic: str):
super().__init__()
self.project = project
self.to_topic = to_topic
self._publisher_client = pubsub_v1.PublisherClient()
Callbacks¶
Overview¶
Executor
(as well as BaseListener
) provides a rich
pipeline which manages stages, their failures and actions between stages.
A simplified representation of the pipeline (omitting any failures) looks like this:

Deserialization, handling, serialization and publishing are provided by Stages.
Each step of the pipeline emits an event which can be handled by the corresponding
callback.
Base classes (Executor
and BaseListener
) do nothing but logging inside
their callbacks. You can customize any step by overriding any callback in a child class:
class MyExecutor(happyly.Executor):
def on_received(original_message):
original_message.ack()
def on_handling_failed(
self,
original_message: Any,
deserialized_message: Mapping[str, Any],
error: Exception,
):
if isinstance(error, NeedToRetry):
original_message.nack()
The example above uses on_handling_failed
which is called whenever
handler raises an exception.
Actually, here’s the full picture with failures:

Note that in case deserialization fails, handling is not conducted.
Instead executor tries to get a fallback result via
Deserializer.build_error_result
and
this result is used instead of the result of handling.
What if I need an emergency stop?¶
You can raise happyly.StopPipeline
inside any callback - and the pipeline will
be stopped immediately.
Well, actually on_stopped
will be invoked then, as the last resort to finish up.

At the rest of the cases, i.e. if pipeline is not stopped, on_finished
is guaranteed to be called at the very end.
API Reference¶
|
|
|
|