Basic Redis Usage Example Part 2: PUB/SUB in Depth using Redis & Python
Introduction
In this article, I’m going to show you how you can utilize Redis and combine it with python to implement the Publish-Subscribe Pattern. We’ll learn and use this pattern with a simple Redis usage example. I’m assuming you are familiar with Redis, and also have a good understanding of Python basics. Before, we begin I would like to outline the prerequisites.
Basic Redis Usage Example Part 2: PUB/SUB in Depth using Redis & Python
Explore publish-subscribe pattern in depth using python’s popular Redis Wrapper
In our last article, we discussed the basics of the pub/sub pattern, also showed how to install Redis and other prerequisites to learn. This article is the continuation of that exploration so, you might want to take a look into the last article and install all the required dependencies before proceeding with this one! Okay, I’m assuming you’ve read it and installed all the necessary dependencies i.e. Redis-Server, redis-py (Python’s Redis client)
so, without further ado let’s jump right into it.
Thorough look at Pub/Sub of Redis-py
We already know that there are three core properties of PUB/SUB pattern. These are sender, receiver & broker. Using these three properties we can solve various real-life problems. We already know how to connect with redis using python(details of installation are given in the previous article). The code is given below:
1 2 3 | import redis r = redis.Redis(host='localhost', port=6379, db=0) |
The above code will connect to redis and provide us the connection object as r. Using r we will explore some more functionalities of redis-py.
Another Method for Subscribing in Channels
First, let’s publish multiple channels using the object that we’ve created just a moment ago:
1 2 3 | >>>r.publish('hello', 'Test Message 1') >>>r.publish('hello2', 'Test Message 2') |
So, we now have two channels ‘hello’, ‘hello2’. Notice that, both are starting with the word ‘hello’. Redis-py has a feature called subscribe using pattern. This feature allows a user to subscribe to multiple channels with just a single instruction using pattern matching!
1 2 3 4 | import redis r = redis.Redis(host='localhost', port=6379, db=0) p = r.pubsub() p.psubscribe('hello*') |
The above command will subscribe, the object p to all the channels that starts with hello. In our case, both hello1 and hello2. To see the patterns in use we can simply write the following:
1 | p.patterns |
To verify the technique is working we can try publishing new messages using r. After that we can simply call the get_message() method to fetch new messages from both channels:
1 2 3 4 5 6 | >>>r.publish('hello', 'New Test Message 1') >>>r.publish('hello2', 'New Test Message 2') >>> p.get_message() {'type': 'pmessage', 'pattern': b'hello*', 'channel': b'hello', 'data': b'New Test Message 1'} >>> p.get_message() {'type': 'pmessage', 'pattern': b'hello*', 'channel': b'hello2', 'data': b'New Test Message 2'} |
Unsubscribing from a Subscribed Channel
Unsubscribing from a channel is very easy! It is similar to the way we subscribe. See below:
1 | >>> p.unsubscribe() |
And, that’s it! The subscriber object p will successfully unsubscribe from all the published channel that it was listening to.
Note: object p won’t be able to access the new messages from those until it subscribes back (just like the typical pub/sub pattern).
Explore Different Parts of Redis Message Object
Every Message of Pub/Sub Instance of Redis-py client is a dictionary object. Let’s inspect it using a python built-in named type:
1 2 | getmsg = p.get_message() print(type(getmsg)) |
The output will be something similar to this:
1 | <class 'dict'> |
We can also use the built-in dir to inspect the available methods and objects like below:
1 2 | >>>dir(getmsg) ['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'] |
All the dict methods are available here for example keys, items etc, lets go ahead and print all the keys available:
1 2 | >>>getmsg.keys() dict_keys(['type', 'pattern', 'channel', 'data']) |
As, you can see there are four keys available i.e. type, pattern, channel & data. Now, I will briefly introduce you to all of them:
Type: It indicates the type of message. There are six types: ‘subscribe’, ‘unsubscribe’, ‘psubscribe’, ‘punsubscribe’, ‘message’, ‘pmessage’
Pattern: By default, it will be None, however, if a pattern matches a published channel then it will store that pattern
Channel: The channel denotes the name of the channel.
Data: The actual message data. If the message type is pmessage than it will contain the published message! We will be mostly using data, it is better to check the type whenever listening to a published message from the subscribed channel.
So far, we have explored various functionality of pub/sub pattern with the help of the python redis wrapper redis-py. We learned how to publish, subscribe, unsubscribe, send or, get message. However, there is still one last thing that we need to explore before finishing this exploration.
Strategies for Continuously Listening to Subscribed Channel
There are multiple ways to accomplish this in redis-py. If we need to do something very simple, we can use the method called listen(). This is a generator that blocks the execution and waits for new messages on the channel. The code is like below:
1 2 | >>> for new_message in p.listen(): do_something(new_message) |
Here, whenever a new message appears do_something() message will process it but there is a catch. The whole script will get blocked if there are no new messages. So it will wait forever for new messages. This is okay for basic use case however, I would recommend using a separate thread with an event loop to run this in the background so that the script can do something else instead of waiting for the messages. This can be achieved using the method run_in_thread()
like below:
1 2 3 4 | >>>thread = p.run_in_thread(sleep_time=0.001) >>># do some other stuff >>> x = 1 + 1 2 |
To terminate the above thread you can use the stop method whenever you want:
1 | >>>thread.stop() |
As run_in_thread()
is running the whole task in separate thread hence, we must have to handle the incoming messages with custom handlers. In other words, run_in_thread won’t work on channels without a custom handler. Assigning custom handler is easy:
1 2 3 4 5 | >>>def custom_handler(message): # do_something with the message print(message) >>>p.psubscribe(**{'hello*':custom_handler}) >>>thread = p.run_in_thread(sleep_time=0.001) |
Conclusion
We have checked out various functionalities of Redis-py and explored PUB/SUB Pattern. In our next article, we will be looking at the different ways you can use Redis to do cache management in python. Till then stay tuned!
Pilot the ObjectRocket Platform Free!
Try Fully-Managed CockroachDB, Elasticsearch, MongoDB, PostgreSQL (Beta) or Redis.
Get Started