Async and Sync Python Pub/Sub with Redis

· Sakti's blog

Using redis-py for Pub/Sub with support both async and sync mode

Redis pub/sub feature #

Usually, Redis is used for caching layer, but also can be leveraged for indexing, communication, data storage, and time series patterns.

In this communication pattern, senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Rather, published messages send into channels, without knowledge of what subscribers there may be.

In Redis, these messages are fire-and-forget, in that if a message is published and no subscribers exists, the message evaporates and cannot be recovered, so no queue.

For more reliable message delivery using a queue, the command RPOPLPUSH can be utilized. But all logic moved into the client side, responsible to do message queue management, e.g: https://github.com/adjust/rmq

Example pub/sub client, listen to command channel using the command subscribe, using command-line:

1$ redis-cli
2127.0.0.1:6379> subscribe command
3Reading messages... (press Ctrl-C to quit)
41) "subscribe"
52) "command"
63) (integer) 1

then try sending a message using the publish command

1$ redis-cli publish command whoami
2(integer) 1

The publish command returns an integer reply: the number of clients that received the message.

Note that in a Redis Cluster, only clients that are connected to the same node as the publishing client is included in the count.

On client console will get:

11) "message"
22) "command"
33) "whoami"

Python async #

Since Python 3.4, async concurrent programming added to the standard library using asyncio module, before in Python ecosystem to use lightweight thread (goroutine in Golang, task in Rust Tokio) Python program/app need to set up Stackless, greenlet, or using epoll (Tornado, Twisted).

Using async makes the Python program more scalable and handles better concurrency, since Python use GIL only one thread can be run in one interpreter process.

For comparison, let's use httpx library both support sync and async to do HTTP get request to my website:

> GET /test/1 HTTP/2
> Host: saktidwicahyono.name
> accept: */*

< HTTP/2 200 
< content-type: text/html
< vary: Accept-Encoding
< date: Sat, 30 Jul 2022 13:51:42 GMT
< content-length: 633
< server: deno/asia-southeast1-a

The program will fetch GET request 100x times in /test/{i} path and assert response status code should be 200. Here is the sync code:

1import httpx
2
3def main():
4    for i in range(1, 101):
5        r = httpx.get(f"https://saktidwicahyono.name/test/{i}")
6        assert r.status_code == 200
7
8if __name__ == "__main__":
9    main()

Async version:

 1import asyncio
 2import httpx
 3
 4async def main():
 5    async with httpx.AsyncClient() as client:
 6        for i in range(1, 101):
 7            r = await client.get(f"https://saktidwicahyono.name/test/{i}")
 8            assert r.status_code == 200
 9
10
11if __name__ == "__main__":
12    asyncio.run(main())

To do benchmarking hyperfine is used, and close others program to remove noise and keep resource usage not affected by other programs (network connection also should be conditioned e.g: using cabled and no other network activity, but I use shared internet and WiFi from my laptop)

 1$ hyperfine 'python request_async.py' 'python request_sync.py'
 2Benchmark 1: python request_async.py
 3  Time (mean ± σ):      3.432 s ±  0.189 s    [User: 0.777 s, System: 0.145 s]
 4  Range (min … max):    3.179 s …  3.785 s    10 runs
 5
 6Benchmark 2: python request_sync.py
 7  Time (mean ± σ):     12.569 s ±  0.257 s    [User: 2.835 s, System: 0.301 s]
 8  Range (min … max):   12.334 s … 13.091 s    10 runs
 9
10Summary
11  'python request_async.py' ran
12    3.66 ± 0.22 times faster than 'python request_sync.py'

In this use case, async run 3.66x faster than sync since sync will do HTTP requests sequentially but async will do interleave between requests.

Note: since the comparison program is IO-bound, using async gives significant performance improvement, but if the program is CPU-bound better use thread-pool with work-stealing scheduler (in real multi-thread). In Python use multiprocessing module.

redis-py #

redis-py is one of many Redis clients available in Python, and listed by Redis documentation, also this lib used by Django if you are using Django cache framework using Redis which officially supported in Django v4.

This library support pub/sub feature both in sync and async mode. For this post scenario, there is a use case to communicate between web app using sync mode and bot app using async mode. The web app act as the publisher and the bot app act as the subscriber, web app want to send notification as a chat message using functionality available in bot app, so these apps need to communicate with each others.

Let's define a schema, assume the bot app needs only user id and notification content and encoded it using JSON. Since the purpose is for notification, simply name the channel as notification.

1{
2  "user_id": 42,
3  "message": "Your order is on its way!"
4}

web app publisher #

 1import json
 2import redis
 3
 4CHANNEL_NAME = "notification"
 5r = redis.Redis()
 6
 7
 8def send_notification(user_id: int, message: str):
 9    """
10    Send a notification to a user.
11    note: this maybe located in services.py
12    """
13    ...
14    r.publish(CHANNEL_NAME, json.dumps({"user_id": user_id, "message": message}))
15
16
17def main():
18    # imagine this in view / API endpoint or signals
19    send_notification(42, "Your order is on its way!")
20
21
22if __name__ == "__main__":
23    main()
24

bot app subscriber #

 1import asyncio
 2import async_timeout
 3import json
 4import logging
 5import redis.asyncio as redis
 6
 7CHANNEL_NAME = "notification"
 8
 9logging.basicConfig(level=logging.DEBUG)
10
11
12async def send_message(user_id: int, message: str):
13    # write real code here
14    logging.debug(f"Sending message to user {user_id}: {message}")
15    await asyncio.sleep(0.2)
16
17
18async def handle_notification():
19    r = redis.Redis()
20    pubsub = r.pubsub()
21    await pubsub.subscribe(CHANNEL_NAME)
22    while True:
23        try:
24            async with async_timeout.timeout(1):
25                message = await pubsub.get_message()
26                if message and message["type"] == "message":
27                    payload = json.loads(message["data"])
28                    # TODO: do validation on payload
29                    await send_message(payload["user_id"], payload["message"])
30        except (asyncio.TimeoutError, json.decoder.JSONDecodeError) as e:
31            logging.error(e)
32
33
34if __name__ == "__main__":
35    asyncio.run(handle_notification())
36

Summary #

With the simple pub/sub feature provided by Redis, two apps can communicate to do some use cases. As stated above in Redis, the messages are fire-and-forget so not as reliable as using dedicated message broker, distributed commit log or dedicated task queue.

But, depends on the requirements if your app only needs simple functionality maybe Redis pub/sub solution is enough, and don't bother with additional setup/operational costs dedicated to those services.


         

古池や 蛙飛び込む 水の音

last updated: