Skip to content
Stand with Ukraine flag

Aggregate Stream Node

Use this node when you need to compute rolling statistics over a stream of incoming messages — for example, hourly average temperature, daily total energy consumption, or the maximum pressure reading per shift. It groups messages by originator and aggregation interval, computes the result in memory, and emits a POST_TELEMETRY_REQUEST at the end of each interval.

Intervals can be persisted to storage periodically to avoid data loss on restart, and a default-value mechanism lets you fill in gaps for entities that stopped reporting.


FieldRequiredDescription
Aggregation functionYesMIN, MAX, SUM, AVG, COUNT, or UNIQUE
IntervalYesThe time window over which values are aggregated (e.g., 1 hour)
Queue nameNoThe rule engine queue to use for emitting result messages
FieldDescription
Interval persistence policyControls when in-progress intervals are written to storage: After each message, Periodically, or On interval end
Interval check valueInterval in seconds between persistence writes (for the Periodically policy)
Interval TTL valueHow long completed intervals are cached in memory before eviction (seconds)
State persistence policyControls how often the aggregation state is saved: After each message or Periodically
State persistence valueInterval in seconds between state saves (for the Periodically policy)
FieldDescription
Create intervals automaticallyWhen enabled, default-value messages are emitted for entities in the configured entity group even if no data arrives during an interval
Interval entitiesEntity group whose members receive default-value messages when no data is reported

At the end of each aggregation interval the node emits a POST_TELEMETRY_REQUEST message with the aggregated result:

{
"ts": <interval end timestamp in ms>,
"values": {
"<telemetry key>": <aggregated value>
}
}

The originator of the output message is the same as the originator of the input messages.


ConnectionCondition
SuccessAggregated result message emitted at interval end
FailureUnexpected error during processing

Configuration: Function = AVG, Interval = 1 hour, Interval persistence = On interval end.

Incoming messages: a stream of POST_TELEMETRY_REQUEST messages from a temperature sensor with key temperature.

Result: at the end of each hour, one message is emitted with the average temperature for that hour:

{
"ts": 1700006400000,
"values": { "temperature": 22.3 }
}

This message can be piped directly into a Save time series node to persist the hourly average alongside raw readings.