Persistent DEVICE client
This page explains how TBMQ stores and delivers messages for persistent DEVICE clients, covering the original PostgreSQL-based design, its limitations, and the Redis-based architecture introduced in v2.0.
In TBMQ 1.x, PostgreSQL handled message persistence and retrieval for persistent DEVICE clients. While it performed well initially, PostgreSQL could only scale vertically. As the number of persistent MQTT sessions grew, its architecture became a bottleneck. Redis was chosen as the replacement due to its horizontal scalability, native clustering support, and widespread adoption.
PostgreSQL — original design
Section titled “PostgreSQL — original design”In TBMQ 1.x, persistent DEVICE clients used PostgreSQL for message persistence. Two tables were central to this design.
device_session_ctx — maintained session state per persistent MQTT client:
Table "public.device_session_ctx" Column | Type | Nullable--------------------+------------------------+--------- client_id | character varying(255) | not null last_updated_time | bigint | not null last_serial_number | bigint | last_packet_id | integer |Indexes: "device_session_ctx_pkey" PRIMARY KEY, btree (client_id)last_packet_id— packet ID of the last MQTT message received.last_serial_number— ever-increasing counter; prevents ordering issues when the MQTT packet ID wraps around at 65535.
device_publish_msg — stored messages pending delivery to persistent subscribers:
Table "public.device_publish_msg" Column | Type | Nullable--------------------------+------------------------+--------- client_id | character varying(255) | not null serial_number | bigint | not null topic | character varying | not null time | bigint | not null packet_id | integer | packet_type | character varying(255) | qos | integer | not null payload | bytea | not null user_properties | character varying | retain | boolean | msg_expiry_interval | integer | payload_format_indicator | integer | content_type | character varying(255) | response_topic | character varying(255) | correlation_data | bytea |Indexes: "device_publish_msg_pkey" PRIMARY KEY, btree (client_id, serial_number) "idx_device_publish_msg_packet_id" btree (client_id, packet_id)Key columns:
time— system timestamp when the message was stored; used for periodic cleanup of expired messages.msg_expiry_interval— expiration time (seconds) for MQTT 5.0 messages with an expiry property. If absent, the message remains valid until removed by time or size-based cleanup.
When messages arrive for a client from the shared Kafka topic, the broker queries device_session_ctx to fetch
the latest last_packet_id and last_serial_number. These are incremented and assigned to each message before
insertion into device_publish_msg.
Limitation: based on the TimescaleDB blog, vanilla PostgreSQL handles up to ~300k inserts per second under ideal conditions, but this depends on hardware, schema, and workload. Vertical scaling can extend this ceiling, but a per-table hard limit is eventually reached. For an MQTT broker expecting millions of persistent sessions, this was a fundamental constraint.
Redis — scalable architecture (v2.0+)
Section titled “Redis — scalable architecture (v2.0+)”Redis was chosen as the replacement due to in-memory operation (low latency), native cluster support, and horizontal scalability. The migration also enabled replacing periodic time-based cleanup with per-message TTL.
Redis Cluster constraints
Section titled “Redis Cluster constraints”Using multiple Redis data structures per client means using multiple keys per persistent session. Redis Cluster distributes keys across hash slots for horizontal scaling, but multi-key operations must target the same slot — otherwise a cross-slot error is raised.
TBMQ uses the persistent MQTT client ID as a hash tag
in key names — the client ID is enclosed in {}. Redis hashes all keys sharing the same {} content to the same
slot, guaranteeing that all keys for one client reside together and multi-key operations proceed without errors.
Atomic operations via Lua scripts
Section titled “Atomic operations via Lua scripts”In high-throughput environments, multiple messages can arrive simultaneously for the same client. Without atomic operations, sequential updates to multiple data structures risk race conditions and partial updates, leading to message loss or incorrect ordering.
Lua scripts execute as a single isolated unit — no other commands can run concurrently during execution. TBMQ uses a separate Lua script for each operation (save messages, retrieve undelivered messages on reconnect), ensuring all keys accessed within a script reside in the same hash slot and all updates are atomic.
Redis data structures
Section titled “Redis data structures”TBMQ uses two Redis data structures per client:
Sorted sets (ZSETs) — maintain message order using the score as a continuously increasing counter (equivalent
to serial_number in PostgreSQL). The sorted set stores references to message payload keys, not the payloads
themselves, to avoid memory overhead and enable per-message TTL.
Strings — store the full message payload with a TTL (EX), providing O(1) write, read, and delete
complexity without affecting the sorted set.
Example — adding messages to the sorted set:
# Message with MQTT packet ID 65534ZADD {client_id}_messages 65534 {client_id}_messages_65534
# Message with packet ID 65535 (maximum)ZADD {client_id}_messages 65535 {client_id}_messages_65535
# Packet ID wraps around to 1 — score continues growing to 65536ZADD {client_id}_messages 65536 {client_id}_messages_1The score grows monotonically even when the MQTT packet ID wraps at 65535, preserving correct message ordering.
Example — storing a message payload with TTL:
SET {client_id}_messages_1 "{ \"packetType\":\"PUBLISH\", \"payload\":\"eyJkYXRhIjoidGJtcWlzYXdlc29tZSJ9\", \"time\":1736333110026, \"clientId\":\"client\", \"retained\":false, \"packetId\":1, \"topicName\":\"europe/ua/kyiv/client/0\", \"qos\":1}" EX 600Message payloads can also be retrieved or removed individually with O(1) complexity, without affecting the sorted set:
GET {client_id}_messages_1DEL {client_id}_messages_1Last packet ID — a separate string key stores the last MQTT packet ID assigned:
GET {client_id}_last_packet_id"1"This is needed on reconnect to determine the correct packet ID for the next message. The sorted set cannot be relied on for this because it may be empty or fully removed.
Managing sorted set size
Section titled “Managing sorted set size”The maximum number of stored messages per persistent DEVICE client is controlled by:
# Maximum number of PUBLISH messages stored for each persisted DEVICE clientlimit: "${MQTT_PERSISTENT_SESSION_DEVICE_PERSISTED_MESSAGES_LIMIT:10000}"This limit controls memory allocation per persistent client. For example, a client might connect, register a persistent session, and then disconnect immediately. Without a limit, messages would accumulate indefinitely while waiting for a potential reconnection, causing unbounded memory usage.
The maximum configurable value is 65535 (MQTT protocol limit). When new messages arrive and the limit is reached, the oldest messages are trimmed from the sorted set and their payload string keys are deleted:
-- Get the number of elements to be removedlocal numElementsToRemove = redis.call('ZCARD', messagesKey) - maxMessagesSize-- Check if trimming is neededif numElementsToRemove > 0 then -- Get the elements to be removed (oldest ones) local trimmedElements = redis.call('ZRANGE', messagesKey, 0, numElementsToRemove - 1) -- Iterate over the elements and remove them for _, key in ipairs(trimmedElements) do -- Remove the message from the string data structure redis.call('DEL', key) -- Remove the message reference from the sorted set redis.call('ZREM', messagesKey, key) endendMessage retrieval and cleanup on reconnect
Section titled “Message retrieval and cleanup on reconnect”When a DEVICE client reconnects, undelivered messages are retrieved and references to expired messages are cleaned up at the same time:
local messagesKey = KEYS[1]local maxMessagesSize = tonumber(ARGV[1])local elements = redis.call('ZRANGE', messagesKey, 0, -1)local messages = {}for _, key in ipairs(elements) do if redis.call('EXISTS', key) == 1 then local msgJson = redis.call('GET', key) table.insert(messages, msgJson) else -- Expired — remove the stale reference from the sorted set redis.call('ZREM', messagesKey, key) endendreturn messagesMigration from Jedis to Lettuce
Section titled “Migration from Jedis to Lettuce”To validate Redis scalability, TBMQ was tested with a P2P MQTT pattern — one publisher per subscriber, each pair creating its own persistent session. This stresses exactly the per-session write path Redis needs to handle.
A pre-migration prototype test established a 30k msg/sec ceiling with PostgreSQL. After migrating to Redis with the existing Jedis client (synchronous), throughput reached only 40k msg/sec — a modest improvement because Jedis processes each Redis command sequentially, blocking until completion before issuing the next.
Migrating to Lettuce, an asynchronous client built on Netty, raised throughput to 60k msg/sec by enabling parallel, non-blocking Redis operations.
For the full P2P performance test methodology and results, see the 1M msg/sec P2P performance test.