Aggregate Stream Node
Use this node when you need to compute rolling statistics over a stream of incoming messages — for example, hourly average temperature, daily total energy consumption, or the maximum pressure reading per shift. It groups messages by originator and aggregation interval, computes the result in memory, and emits a POST_TELEMETRY_REQUEST at the end of each interval.
Intervals can be persisted to storage periodically to avoid data loss on restart, and a default-value mechanism lets you fill in gaps for entities that stopped reporting.
Configuration
Section titled “Configuration”Aggregation
Section titled “Aggregation”| Field | Required | Description |
|---|---|---|
| Aggregation function | Yes | MIN, MAX, SUM, AVG, COUNT, or UNIQUE |
| Interval | Yes | The time window over which values are aggregated (e.g., 1 hour) |
| Queue name | No | The rule engine queue to use for emitting result messages |
Persistence
Section titled “Persistence”| Field | Description |
|---|---|
| Interval persistence policy | Controls when in-progress intervals are written to storage: After each message, Periodically, or On interval end |
| Interval check value | Interval in seconds between persistence writes (for the Periodically policy) |
| Interval TTL value | How long completed intervals are cached in memory before eviction (seconds) |
| State persistence policy | Controls how often the aggregation state is saved: After each message or Periodically |
| State persistence value | Interval in seconds between state saves (for the Periodically policy) |
Default values
Section titled “Default values”| Field | Description |
|---|---|
| Create intervals automatically | When enabled, default-value messages are emitted for entities in the configured entity group even if no data arrives during an interval |
| Interval entities | Entity group whose members receive default-value messages when no data is reported |
Output message
Section titled “Output message”At the end of each aggregation interval the node emits a POST_TELEMETRY_REQUEST message with the aggregated result:
{ "ts": <interval end timestamp in ms>, "values": { "<telemetry key>": <aggregated value> }}The originator of the output message is the same as the originator of the input messages.
Output connections
Section titled “Output connections”| Connection | Condition |
|---|---|
Success | Aggregated result message emitted at interval end |
Failure | Unexpected error during processing |
Example — Hourly average temperature
Section titled “Example — Hourly average temperature”Configuration: Function = AVG, Interval = 1 hour, Interval persistence = On interval end.
Incoming messages: a stream of POST_TELEMETRY_REQUEST messages from a temperature sensor with key temperature.
Result: at the end of each hour, one message is emitted with the average temperature for that hour:
{ "ts": 1700006400000, "values": { "temperature": 22.3 }}This message can be piped directly into a Save time series node to persist the hourly average alongside raw readings.