Skip to content
Stand with Ukraine flag

Persistent DEVICE client

This page explains how TBMQ stores and delivers messages for persistent DEVICE clients, covering the original PostgreSQL-based design, its limitations, and the Redis-based architecture introduced in v2.0.

In TBMQ 1.x, PostgreSQL handled message persistence and retrieval for persistent DEVICE clients. While it performed well initially, PostgreSQL could only scale vertically. As the number of persistent MQTT sessions grew, its architecture became a bottleneck. Redis was chosen as the replacement due to its horizontal scalability, native clustering support, and widespread adoption.

In TBMQ 1.x, persistent DEVICE clients used PostgreSQL for message persistence. Two tables were central to this design.

device_session_ctx — maintained session state per persistent MQTT client:

Table "public.device_session_ctx"
Column | Type | Nullable
--------------------+------------------------+---------
client_id | character varying(255) | not null
last_updated_time | bigint | not null
last_serial_number | bigint |
last_packet_id | integer |
Indexes:
"device_session_ctx_pkey" PRIMARY KEY, btree (client_id)
  • last_packet_id — packet ID of the last MQTT message received.
  • last_serial_number — ever-increasing counter; prevents ordering issues when the MQTT packet ID wraps around at 65535.

device_publish_msg — stored messages pending delivery to persistent subscribers:

Table "public.device_publish_msg"
Column | Type | Nullable
--------------------------+------------------------+---------
client_id | character varying(255) | not null
serial_number | bigint | not null
topic | character varying | not null
time | bigint | not null
packet_id | integer |
packet_type | character varying(255) |
qos | integer | not null
payload | bytea | not null
user_properties | character varying |
retain | boolean |
msg_expiry_interval | integer |
payload_format_indicator | integer |
content_type | character varying(255) |
response_topic | character varying(255) |
correlation_data | bytea |
Indexes:
"device_publish_msg_pkey" PRIMARY KEY, btree (client_id, serial_number)
"idx_device_publish_msg_packet_id" btree (client_id, packet_id)

Key columns:

  • time — system timestamp when the message was stored; used for periodic cleanup of expired messages.
  • msg_expiry_interval — expiration time (seconds) for MQTT 5.0 messages with an expiry property. If absent, the message remains valid until removed by time or size-based cleanup.

When messages arrive for a client from the shared Kafka topic, the broker queries device_session_ctx to fetch the latest last_packet_id and last_serial_number. These are incremented and assigned to each message before insertion into device_publish_msg.

Limitation: based on the TimescaleDB blog, vanilla PostgreSQL handles up to ~300k inserts per second under ideal conditions, but this depends on hardware, schema, and workload. Vertical scaling can extend this ceiling, but a per-table hard limit is eventually reached. For an MQTT broker expecting millions of persistent sessions, this was a fundamental constraint.

Redis was chosen as the replacement due to in-memory operation (low latency), native cluster support, and horizontal scalability. The migration also enabled replacing periodic time-based cleanup with per-message TTL.

Using multiple Redis data structures per client means using multiple keys per persistent session. Redis Cluster distributes keys across hash slots for horizontal scaling, but multi-key operations must target the same slot — otherwise a cross-slot error is raised.

TBMQ uses the persistent MQTT client ID as a hash tag in key names — the client ID is enclosed in {}. Redis hashes all keys sharing the same {} content to the same slot, guaranteeing that all keys for one client reside together and multi-key operations proceed without errors.

In high-throughput environments, multiple messages can arrive simultaneously for the same client. Without atomic operations, sequential updates to multiple data structures risk race conditions and partial updates, leading to message loss or incorrect ordering.

Lua scripts execute as a single isolated unit — no other commands can run concurrently during execution. TBMQ uses a separate Lua script for each operation (save messages, retrieve undelivered messages on reconnect), ensuring all keys accessed within a script reside in the same hash slot and all updates are atomic.

TBMQ uses two Redis data structures per client:

Sorted sets (ZSETs) — maintain message order using the score as a continuously increasing counter (equivalent to serial_number in PostgreSQL). The sorted set stores references to message payload keys, not the payloads themselves, to avoid memory overhead and enable per-message TTL.

Strings — store the full message payload with a TTL (EX), providing O(1) write, read, and delete complexity without affecting the sorted set.

Example — adding messages to the sorted set:

Terminal window
# Message with MQTT packet ID 65534
ZADD {client_id}_messages 65534 {client_id}_messages_65534
# Message with packet ID 65535 (maximum)
ZADD {client_id}_messages 65535 {client_id}_messages_65535
# Packet ID wraps around to 1 — score continues growing to 65536
ZADD {client_id}_messages 65536 {client_id}_messages_1

The score grows monotonically even when the MQTT packet ID wraps at 65535, preserving correct message ordering.

Example — storing a message payload with TTL:

Terminal window
SET {client_id}_messages_1 "{
\"packetType\":\"PUBLISH\",
\"payload\":\"eyJkYXRhIjoidGJtcWlzYXdlc29tZSJ9\",
\"time\":1736333110026,
\"clientId\":\"client\",
\"retained\":false,
\"packetId\":1,
\"topicName\":\"europe/ua/kyiv/client/0\",
\"qos\":1
}" EX 600

Message payloads can also be retrieved or removed individually with O(1) complexity, without affecting the sorted set:

Terminal window
GET {client_id}_messages_1
Terminal window
DEL {client_id}_messages_1

Last packet ID — a separate string key stores the last MQTT packet ID assigned:

Terminal window
GET {client_id}_last_packet_id
"1"

This is needed on reconnect to determine the correct packet ID for the next message. The sorted set cannot be relied on for this because it may be empty or fully removed.

The maximum number of stored messages per persistent DEVICE client is controlled by:

# Maximum number of PUBLISH messages stored for each persisted DEVICE client
limit: "${MQTT_PERSISTENT_SESSION_DEVICE_PERSISTED_MESSAGES_LIMIT:10000}"

This limit controls memory allocation per persistent client. For example, a client might connect, register a persistent session, and then disconnect immediately. Without a limit, messages would accumulate indefinitely while waiting for a potential reconnection, causing unbounded memory usage.

The maximum configurable value is 65535 (MQTT protocol limit). When new messages arrive and the limit is reached, the oldest messages are trimmed from the sorted set and their payload string keys are deleted:

-- Get the number of elements to be removed
local numElementsToRemove = redis.call('ZCARD', messagesKey) - maxMessagesSize
-- Check if trimming is needed
if numElementsToRemove > 0 then
-- Get the elements to be removed (oldest ones)
local trimmedElements = redis.call('ZRANGE', messagesKey, 0, numElementsToRemove - 1)
-- Iterate over the elements and remove them
for _, key in ipairs(trimmedElements) do
-- Remove the message from the string data structure
redis.call('DEL', key)
-- Remove the message reference from the sorted set
redis.call('ZREM', messagesKey, key)
end
end

Message retrieval and cleanup on reconnect

Section titled “Message retrieval and cleanup on reconnect”

When a DEVICE client reconnects, undelivered messages are retrieved and references to expired messages are cleaned up at the same time:

local messagesKey = KEYS[1]
local maxMessagesSize = tonumber(ARGV[1])
local elements = redis.call('ZRANGE', messagesKey, 0, -1)
local messages = {}
for _, key in ipairs(elements) do
if redis.call('EXISTS', key) == 1 then
local msgJson = redis.call('GET', key)
table.insert(messages, msgJson)
else
-- Expired — remove the stale reference from the sorted set
redis.call('ZREM', messagesKey, key)
end
end
return messages

To validate Redis scalability, TBMQ was tested with a P2P MQTT pattern — one publisher per subscriber, each pair creating its own persistent session. This stresses exactly the per-session write path Redis needs to handle.

A pre-migration prototype test established a 30k msg/sec ceiling with PostgreSQL. After migrating to Redis with the existing Jedis client (synchronous), throughput reached only 40k msg/sec — a modest improvement because Jedis processes each Redis command sequentially, blocking until completion before issuing the next.

Migrating to Lettuce, an asynchronous client built on Netty, raised throughput to 60k msg/sec by enabling parallel, non-blocking Redis operations.

For the full P2P performance test methodology and results, see the 1M msg/sec P2P performance test.