Redis keyspace notifications in python
Introduction
Redis is an in-memory data structure store used for caching, high speed data ingest, handling message queues, distributed locking and much more.
The advantages of using Redis over other in-memory stores is that Redis offers persistence and data structures such as lists, sets, sorted sets, and hashes.
In this article I would like to give you a short overview of Redis keyspace notifications. I will explain what keyspace notifications are and demonstrate how to configure Redis to receive them. Then I will show you how to subscribe to Redis notifications in python.
Before we start please install and start Redis server as described here: https://redis.io/topics/quickstart.
Enable keyspace notifications
By default, keyspace events notifications are disabled. We can enable them in redis.conf or redis-cli as below:
$ redis-cli config set notify-keyspace-events KEA
OK
The KEA
string means that every possible event is enabled. To see the meaning of each character check the documentation.
The CLI can work in special mode, which allows you to subscribe to a channel in order to receive messages.
Now let's check if events are working:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
psubscribe '*'
means that we want to subscribe to all events with pattern *
.
In a new terminal enter redis-cli and SET key1
to value1
.
127.0.0.1:6379> set key1 value1
OK
In the first terminal you will see:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
"pmessage","*","__keyspace@0__:key1","set"
"pmessage","*","__keyevent@0__:set","key1
Notifications are working :)
Redis keyspace notifications
Redis keyspace notifications have been available since version 2.8.0. For each action altering any Redis key, we can configure Redis to publish a message to a Pub/Sub. Then we can subscribe to those notifications. It is worth mentioning that events are generated only if a key has really been modified. Deleting a non-existent key, for instance, will not generate an event.
Above you received three events:
"psubscribe","*",1
"pmessage","*","__keyspace@0__:key1","set"
"pmessage","*","__keyevent@0__:set","key1
The first event means that we have successfully subscribed to the channel given as the second element in the reply. 1 represents the number of channels we are currently subscribed to. The second event is a key-space notification. In the keyspace channel we received the name of the event set
as a message. The third event is a key-event notification. In the keyevent channel we received the name of the key key1
as a message.
Redis Pub/Sub
Events are delivered using Redis's Pub/Sub layer.
In order to subscribe to channels channel1
and channel2
, the client issues a SUBSCRIBE command with the names of the channels:
SUBSCRIBE channel1 channel2
Messages sent by other clients(publishers) to these channels will be pushed by Redis to all the subscribed clients (subscribers).
The Redis Pub/Sub implementation supports pattern matching. Clients may subscribe to glob-style patterns in order to receive all messages sent to channel names matching a given pattern using PSUBSCRIBE.
For instance:
PSUBSCRIBE channel*
will receive all the messages sent to channel1
, channel.b
, etc.
If your Pub/Sub client disconnects and reconnects later, all the events delivered during the time the client was disconnected are lost.
Redis maintains a client output buffer for each client. The default limits for the client output buffer for Pub/Sub are set as:
client-output-buffer-limit pubsub 32mb 8mb 60
Redis will force clients to disconnect under two conditions: if the output buffer grows beyond 32MB, or if the output buffer holds 8MB of data consistently for 60 seconds.
These are indications that clients are consuming data more slowly than it is being published.
In the future there are plans to allow for more reliable delivering of events, but probably this will be addressed at a more general level either bringing reliability to Pub/Sub itself, or allowing Lua scripts to intercept Pub/Sub messages to perform operations like pushing the events into a list.
Subscribe to notifications in python
First we need the python client for Redis redis-py, so lets install it:
$ pip install redis
Event loop
Take a look at the following code. It subscribes to all keyspace notifications and prints any that are received.
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
pubsub.psubscribe('__keyspace@0__:*')
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
This is how we create a Redis connection:
redis = StrictRedis(host='localhost', port=6379)
By default, all responses are returned as bytes. The user is responsible for decoding them. If all string responses from a client should be decoded, the user can specify decode_responses=True to StrictRedis. In this case, any Redis command that returns a string type will be decoded with the specified encoding.
Next we create a pubsub object that subscribes to a channel and listens for new messages:
pubsub = redis.pubsub()
pubsub.psubscribe('__keyspace@0__:*')
Then we wait for events by means of an infinite loop:
while True:
message = pubsub.get_message()
...
If there's data, get_message() will read and return it. If there's no data, the method will return None.
Every message read from a pubsub instance is a dictionary with the following keys:
- type: One of the following:
subscribe
,unsubscribe
,psubscribe
,punsubscribe
,message
,pmessage
- channel: the channel subscribed to or the channel a message was published to
- pattern: the pattern that matched a published message's channel (None in all cases except
pmessage
types) - data: the message data
Now start the python script and in another terminal enter redis-cli and SET key mykey
with value myvalue
127.0.0.1:6379> set mykey myvalue
OK
You will see following output from script:
$ python subscribe.py
Starting message loop
{'type': 'psubscribe', 'data': 1, 'channel': b'__keyspace@0__:*', 'pattern': None}
{'type': 'pmessage', 'data': b'set', 'channel': b'__keyspace@0__:mykey', 'pattern': b'__keyspace@0__:*'}
Callbacks
It is also possible to register callback functions to handle published messages. Message handlers take a single argument, the message. To subscribe to a channel or pattern with a message handler, pass the channel or pattern name as a keyword argument with its value being the callback function. When a message is read on a channel or pattern with a message handler, the message dictionary is created and passed to the message handler. In this case, a None value is returned from get_message() since the message has already been already handled.
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
def event_handler(msg):
print('Handler', msg)
pubsub.psubscribe(**{'__keyspace@0__:*': event_handler})
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
127.0.0.1:6379> set mykey myvalue
OK
As you can see, the set event for mykey
was handled by the event_handler callback.
$ python subscribe2.py
Starting message loop
{'pattern': None, 'channel': b'__keyspace@0__:*', 'data': 1, 'type': 'psubscribe'}
Handler {'pattern': b'__keyspace@0__:*', 'channel': b'__keyspace@0__:mykey', 'data': b'set', 'type': 'pmessage'}
Event loop in separate thread
Another option is to run an event loop in a separate thread:
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
def event_handler(msg):
print(msg)
thread.stop()
pubsub = redis.pubsub()
pubsub.psubscribe(**{'__keyevent@0__:expired': event_handler})
thread = pubsub.run_in_thread(sleep_time=0.01)
The above code creates a new thread and starts the event loop. After handling the first expired event we use the thread.stop()
method to shut down the event loop and thread.
Behind the scenes, this is simply a wrapper around get_message() that runs in a separate thread. run_in_thread()
takes an optional sleep_time
argument. If specified, the event loop will call time.sleep() with the value in each iteration of the loop.
127.0.0.1:6379> set mykey myvalue ex 1
OK
Expected output:
$ python subscribe3.py
{'type': 'pmessage', 'channel': b'__keyevent@0__:expired', 'pattern': b'__keyevent@0__:expired', 'data': b'mykey'}
Summary
A common use case for Redis is when an application needs to be able to respond to changes that may occur to the value stored in a particular key or keys. Thanks to keyspace notifications and Pub/Sub we can respond to changes in the Redis data. Notifications are quite easy to use, while events processors may be distributed geographically.
The biggest disadvantage is that Pub/Sub implementation requires the publishers and subscribers to be up all the time. Subscribers lose data when stopped, or when the connection is lost.
Links
- https://redis.io/topics/notifications - Redis keyspace notifications documentation
- https://redis.io/topics/pubsub - Redis Pub/Sub documentation
- https://github.com/andymccurdy/redis-py - Python client for Redis
- https://www.infoworld.com/article/3212768/database/how-to-use-redis-for-real-time-stream-processing.html - How to use Redis for real-time stream processing
- https://matt.sh/advanced-redis-pubsub-scripts - Subscribe script to Pub/Sub channel