Skip to content
Stand with Ukraine flag

Deduplication

Use this node to suppress duplicate messages from the same originator within a configurable time window — for example, collecting all telemetry updates received in a 5-second burst from a single device and forwarding only the most recent value, or merging all updates into a single combined message for batch processing.

FieldDescription
IntervalRequired. Duration of the deduplication window in seconds. All messages from the same originator that arrive within this window are buffered before one is emitted.
StrategyRequired. Determines which message(s) to emit when the interval expires. See strategies below.
Max pending messagesMaximum number of messages to buffer per originator. Additional messages received after the limit is reached are dropped.
Max retriesMaximum number of attempts to acknowledge and deliver the output message before routing to Failure.
Out message typeMessage type to set on the outgoing merged message. Applies only when Strategy is ALL; for FIRST and LAST the original message type is preserved.
QueueOptional. Output queue for the emitted message.
StrategyBehavior
FIRSTEmit the first message that arrived during the window. All subsequent messages from the same originator are discarded.
LASTEmit the last message that arrived during the window. Earlier messages are discarded.
ALLMerge all buffered messages into a single combined message. The output message data is a JSON array where each element has msg (original data) and metadata properties.
  1. Receive a message from an originator.
  2. If no deduplication window is active for this originator, start a new window of interval seconds and buffer the message.
  3. If a window is already active, buffer the message (subject to Max pending messages limit; excess messages are dropped).
  4. When the window expires, apply the strategy:
    • FIRST: retrieve the first buffered message and emit it as-is.
    • LAST: retrieve the last buffered message and emit it as-is.
    • ALL: construct a new message whose data is a JSON array of all buffered {msg, metadata} pairs, with message type set to Out message type.
  5. Route the emitted message via Success.
  6. On delivery failure after Max retries exhausted: route via Failure.
ConnectionCondition
SuccessMessage emitted after the deduplication window expired.
FailureDelivery failed after maximum retries, or an unexpected error occurred.

Scenario: A temperature sensor sends readings every second. Deduplicate over a 10-second window using the LAST strategy so only the most recent reading is forwarded.

Incoming messages (multiple, same originator):

{ "temperature": 22.1 }
{ "temperature": 22.4 }
{ "temperature": 22.7 }

Node configuration:

{
"interval": 10,
"strategy": "LAST",
"maxPendingMsgs": 100,
"maxRetries": 3
}

Outgoing message (emitted once, 10 seconds after the first message):

{ "temperature": 22.7 }

Routed via Success. Only the last temperature reading within the 10-second window was forwarded.


Scenario (ALL strategy): Collect all readings from the same device over 5 seconds into a single batch message.

Outgoing message data (ALL strategy):

[
{ "msg": { "temperature": 22.1 }, "metadata": { "deviceName": "Sensor-01" } },
{ "msg": { "temperature": 22.4 }, "metadata": { "deviceName": "Sensor-01" } },
{ "msg": { "temperature": 22.7 }, "metadata": { "deviceName": "Sensor-01" } }
]

Routed via Success with the configured Out message type.

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TbMsgDeduplicationNodeConfiguration",
"type": "object",
"additionalProperties": false,
"properties": {
"interval": { "type": "integer", "description": "Deduplication window duration in seconds." },
"strategy": { "type": "string", "enum": ["FIRST", "LAST", "ALL"] },
"maxPendingMsgs": { "type": "integer", "description": "Maximum messages to buffer per originator." },
"maxRetries": { "type": "integer", "description": "Maximum delivery retry attempts." },
"outMsgType": { "type": "string", "description": "Message type for the combined output message (ALL strategy only)." },
"queueName": { "type": "string", "description": "Output queue name." }
},
"required": ["interval", "strategy"]
}