Aggregate Stream Node
Use this node when you need to compute rolling statistics over a stream of incoming messages — for example, hourly average temperature, daily total energy consumption, or the maximum pressure reading per shift. It groups messages by originator and aggregation interval, computes the result in memory, and emits a POST_TELEMETRY_REQUEST (or a custom message type) according to the configured interval persistence policy.
The running aggregator can be persisted to storage so values are not lost on restart, and a default‑value mechanism lets you fill in gaps for entities that stopped reporting.
This node is order-sensitive: delayed or out-of-order messages that arrive after their interval has already ended are silently ignored and not included in the aggregated result. Duplicate messages are also counted.
If your data sources produce late-arriving telemetry or you need the ability to re-process historical data, consider using Time Series Data Aggregation or Related Entities Aggregation (Calculated Fields) instead — they aggregate from stored data on demand and are not affected by message ordering or duplicates.
Concepts
Section titled “Concepts”A few terms recur throughout the configuration; defining them up front makes the rest of the page short.
Interval. A bucket keyed by (originatorId, intervalStartTs). For each originator the node maintains an in‑memory map of intervals — one per time window (hour, day, custom, …). Each bucket holds a small aggregator object (e.g. {sum, count} for Average, the smallest value seen for Min, a set of values for Count unique). An interval is created lazily, the first time a message for that originator falls into the window. Old intervals are evicted from memory after a TTL of 2 × the interval duration.
State. The aggregator object inside an interval. It is the small “scratch‑pad” the node uses to accumulate the result — not the user’s telemetry. Every incoming message updates the matching state.
Reporting. Emitting an outgoing message (default POST_TELEMETRY_REQUEST) with the current aggregated value on the Success branch. Reporting does not write to the database by itself — to store the result you must wire Success to a Save Time Series node.
State persistence. Writing the in‑memory state back to the time‑series DB so the node can resume aggregation after a restart, partition switch, or rule‑chain reload. The state is stored as a JSON string under a special key prefix (ruleNodeState_<ruleNodeId>) and uses saveWithoutLatest, so it never appears in the entity’s latest telemetry. This is independent of, and additional to, any user‑level Save Time Series step you might do downstream.
Check. A periodic internal self‑tick re‑scheduled every Interval check value units (minimum 1 minute). At each check the node walks the interval table, decides which intervals to report, and evicts intervals past TTL. A separate internal tick drives state persistence when the Periodically policy is selected.
Configuration
Section titled “Configuration”The fields below follow the order of the rule node editor.
General
Section titled “General”| Field | Required | Description |
|---|---|---|
| Name | Yes | Display name of the rule node. |
| Output message type | Yes | Type used for outgoing aggregate messages. Defaults to POST_TELEMETRY_REQUEST. Selecting a built‑in type fills Message type value automatically; selecting Custom lets you type any string. |
| Input value key | Yes | JSON key inside incoming messages whose numeric value is fed into the aggregator. |
| Output value key | Yes | JSON key written to the outgoing aggregate message under the originator entity. |
Aggregation
Section titled “Aggregation”| Field | Required | Description |
|---|---|---|
| Aggregation function | Yes | Min, Max, Sum, Average, Count, or Count unique. |
| Aggregate interval type | Yes | Hour, Day, Week, Week (Sun–Sat), Month, Year, or Custom. Fixed types are aligned to the start of the period in the configured time zone. Custom lets you specify any duration. |
| Time zone | Conditional | Shown for fixed interval types. Determines the local moment at which the period boundary falls (for example, when “day” actually starts). |
| Interval value / time unit | Conditional | Shown when Aggregate interval type is Custom. The duration of one aggregation window. |
Interval persistence
Section titled “Interval persistence”Controls when an aggregate result is emitted downstream. Despite the field name, this setting does not by itself write to the database — it only triggers an outgoing message on the Success branch.
| Field | Description |
|---|---|
| Interval persistence policy | On each message — emit a result right after every incoming message that updated the state (rolling result). On each check — on every check tick, emit a result for every interval that has changed since the last tick, including intervals whose window is still open. On each check after interval end (default) — on every check tick, emit a result only for intervals whose window has fully closed (one final result per closed interval). |
| Interval check value / time unit | Period of the internal check tick; minimum 1 minute. Used by the two On each check… policies. |
State persistence
Section titled “State persistence”Controls when the in‑memory aggregator is written to the time‑series DB so values are not lost on restart. Each write goes to a special internal key prefix (ruleNodeState_<ruleNodeId>) and never appears in the entity’s latest telemetry.
| Field | Description |
|---|---|
| State persistence policy | On each change (default) — every accepted incoming message synchronously writes the state to the DB and is acked only after the write succeeds. Most durable, highest write rate. Periodically — incoming messages only mark the state dirty; a separate scheduled tick writes all dirty states in batch. Cheaper on the DB; updates between ticks are lost on crash. |
| State persistence value / time unit | Shown only when policy is Periodically. Period of the state‑persist tick; minimum 1 minute. |
Create intervals automatically
Section titled “Create intervals automatically”By default an interval is created only when a message for that originator arrives. If a device goes silent, no interval and no result message are produced for the missing windows. Tick this checkbox to emit results for silent entities.
When enabled the node periodically iterates the configured entity set and fills any missing intervals between the last seen one and “now” with a default‑valued state. Those default intervals follow the normal report path, so a result is emitted even when no telemetry arrived.
Ticking the checkbox reveals an Interval entities card with the following structure.
Interval entities
Section titled “Interval entities”| Field | Description |
|---|---|
| Execution period value / time unit | How often the entity list is re‑resolved and missing intervals are back‑filled. Default 5 minutes, minimum 1 minute. New members start receiving defaults automatically; removed members stop. This is independent of the Interval check period configured under Interval persistence. |
Entities
Section titled “Entities”A nested card under Interval entities. Selects the universe of entities for which default‑value intervals are generated. Pick one mode via the radio button.
| Mode | Behavior |
|---|---|
| Single entity | One fixed entity. Configure Type and the entity itself. Only that entity receives default intervals. |
| Group of entities | Every member of a specific entity group becomes an originator that may receive default intervals. Configure Type (e.g. Device) and Select entity group. The list of members is refreshed on each Execution period tick. |
| Relations query | For each parent entity returned by a root‑and‑direction relations query, the configured child‑entities relations query is run, and each resolved child becomes an originator. Configure the root entity, direction, max relation level, and relation type filters. |
Optional. Selects a rule engine queue for tick and outgoing messages. If empty, the node uses the queue of the rule chain.
Output message
Section titled “Output message”At each emission the node produces a message of the configured Output message type (default POST_TELEMETRY_REQUEST). The originator is the entity whose interval is being reported.
{ "<output value key>": <aggregated value>}Metadata: ts is set to the interval start timestamp, and overwriteValue=true so a downstream Save Time Series node replaces any earlier write at the same timestamp.
Output connections
Section titled “Output connections”| Connection | Condition |
|---|---|
Success | Aggregated result message emitted at interval end (or per the configured Interval persistence policy). |
Failure | Unexpected error during processing — for example, the input message is not a JSON object, or the configured input key is missing or null. |
Effects on time‑series DB write volume
Section titled “Effects on time‑series DB write volume”Three sources of writes to consider when sizing the DB:
- Reporting never writes to the DB by itself. If you wire Success to a Save Time Series node you get one telemetry write per emission, per originator, per output key.
- State persistence:
On each change≈ 1 internal write per accepted incoming message.Periodically≈ 1 internal write per dirty(entity, interval)per state‑persist tick (batched).
- Create intervals automatically multiplies emissions by the size of the configured entity set, regardless of how many of those entities actually sent data.
Example — Hourly average temperature
Section titled “Example — Hourly average temperature”Configuration: Aggregation function = Average, Aggregate interval type = Hour, Interval persistence policy = On each check after interval end, State persistence policy = On each change.
Incoming messages: a stream of POST_TELEMETRY_REQUEST messages from a temperature sensor with key temperature.
Result: at the end of each hour, one message is emitted with the average temperature for that hour:
{ "avgHourlyTemperature": 22.3}This message can be piped directly into a Save time series node to persist the hourly average alongside raw readings.