Deduplication
Use this node to suppress duplicate messages from the same originator within a configurable time window — for example, collecting all telemetry updates received in a 5-second burst from a single device and forwarding only the most recent value, or merging all updates into a single combined message for batch processing.
Configuration
Section titled “Configuration”| Field | Description |
|---|---|
| Interval | Required. Duration of the deduplication window in seconds. All messages from the same originator that arrive within this window are buffered before one is emitted. |
| Strategy | Required. Determines which message(s) to emit when the interval expires. See strategies below. |
| Max pending messages | Maximum number of messages to buffer per originator. Additional messages received after the limit is reached are dropped. |
| Max retries | Maximum number of attempts to acknowledge and deliver the output message before routing to Failure. |
| Out message type | Message type to set on the outgoing merged message. Applies only when Strategy is ALL; for FIRST and LAST the original message type is preserved. |
| Queue | Optional. Output queue for the emitted message. |
Deduplication strategies
Section titled “Deduplication strategies”| Strategy | Behavior |
|---|---|
FIRST | Emit the first message that arrived during the window. All subsequent messages from the same originator are discarded. |
LAST | Emit the last message that arrived during the window. Earlier messages are discarded. |
ALL | Merge all buffered messages into a single combined message. The output message data is a JSON array where each element has msg (original data) and metadata properties. |
Message processing algorithm
Section titled “Message processing algorithm”- Receive a message from an originator.
- If no deduplication window is active for this originator, start a new window of
intervalseconds and buffer the message. - If a window is already active, buffer the message (subject to Max pending messages limit; excess messages are dropped).
- When the window expires, apply the strategy:
FIRST: retrieve the first buffered message and emit it as-is.LAST: retrieve the last buffered message and emit it as-is.ALL: construct a new message whose data is a JSON array of all buffered{msg, metadata}pairs, with message type set to Out message type.
- Route the emitted message via
Success. - On delivery failure after Max retries exhausted: route via
Failure.
Output connections
Section titled “Output connections”| Connection | Condition |
|---|---|
Success | Message emitted after the deduplication window expired. |
Failure | Delivery failed after maximum retries, or an unexpected error occurred. |
Examples
Section titled “Examples”Scenario: A temperature sensor sends readings every second. Deduplicate over a 10-second window using the LAST strategy so only the most recent reading is forwarded.
Incoming messages (multiple, same originator):
{ "temperature": 22.1 }{ "temperature": 22.4 }{ "temperature": 22.7 }Node configuration:
{ "interval": 10, "strategy": "LAST", "maxPendingMsgs": 100, "maxRetries": 3}Outgoing message (emitted once, 10 seconds after the first message):
{ "temperature": 22.7 }Routed via Success. Only the last temperature reading within the 10-second window was forwarded.
Scenario (ALL strategy): Collect all readings from the same device over 5 seconds into a single batch message.
Outgoing message data (ALL strategy):
[ { "msg": { "temperature": 22.1 }, "metadata": { "deviceName": "Sensor-01" } }, { "msg": { "temperature": 22.4 }, "metadata": { "deviceName": "Sensor-01" } }, { "msg": { "temperature": 22.7 }, "metadata": { "deviceName": "Sensor-01" } }]Routed via Success with the configured Out message type.
JSON schema
Section titled “JSON schema”{ "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "TbMsgDeduplicationNodeConfiguration", "type": "object", "additionalProperties": false, "properties": { "interval": { "type": "integer", "description": "Deduplication window duration in seconds." }, "strategy": { "type": "string", "enum": ["FIRST", "LAST", "ALL"] }, "maxPendingMsgs": { "type": "integer", "description": "Maximum messages to buffer per originator." }, "maxRetries": { "type": "integer", "description": "Maximum delivery retry attempts." }, "outMsgType": { "type": "string", "description": "Message type for the combined output message (ALL strategy only)." }, "queueName": { "type": "string", "description": "Output queue name." } }, "required": ["interval", "strategy"]}