RabbitMQ
Use this node to publish incoming message data to a RabbitMQ exchange — for example, routing device telemetry to microservices via a topic exchange, delivering alarms to severity-specific queues, or integrating with legacy AMQP-based systems.
Configuration
Section titled “Configuration”Exchange and routing
Section titled “Exchange and routing”- Exchange name pattern — target exchange. Leave empty to use the default exchange. Supports templatization.
- Routing key pattern — routing key for message distribution. Supports templatization. Leave empty for fanout exchanges.
- Message properties — predefined AMQP properties:
| Value | Persistence | Content type |
|---|---|---|
| (empty) | Non-persistent | None |
BASIC | Non-persistent | None |
TEXT_PLAIN | Non-persistent | text/plain |
MINIMAL_BASIC | Non-persistent | None |
MINIMAL_PERSISTENT_BASIC | Persistent | None |
PERSISTENT_BASIC | Persistent | None |
PERSISTENT_TEXT_PLAIN | Persistent | text/plain |
Connection
Section titled “Connection”- Host — required. RabbitMQ server hostname or IP.
- Port — AMQP port. Default:
5672. - Virtual host — RabbitMQ virtual host. Default:
/. - Username / Password — broker authentication. Default:
guest/guest.
Advanced
Section titled “Advanced”- Automatic recovery — reconnect and restore topology automatically after network failures. Default:
false. - Connection timeout (ms) — TCP connection establishment timeout. Default:
60000. - Handshake timeout (ms) — AMQP handshake timeout after TCP connects. Default:
10000. - Client properties — custom key-value metadata sent to RabbitMQ during connection (appears in the management UI).
Outgoing message format
Section titled “Outgoing message format”On Success: message data and metadata are not modified.
On Failure: the error key is added to message metadata in the format class ExceptionClass: message.
Message processing algorithm
Section titled “Message processing algorithm”- Resolve exchange and routing key patterns from message data and metadata.
- Convert message data to bytes (UTF-8) and publish to RabbitMQ with the configured properties.
- On success: forward the unchanged message via
Success. - On error: add
errorto metadata; route viaFailure.
Output connections
Section titled “Output connections”| Connection | Condition |
|---|---|
Success | Message published. Outgoing message is unchanged. |
Failure | Broker rejected the message, invalid exchange/routing, connection error, or unexpected error. |
Examples
Section titled “Examples”Example 1 — Publish telemetry with device-type routing
Section titled “Example 1 — Publish telemetry with device-type routing”Data: { "temperature": 25.5, "humidity": 60.2 } | Metadata: { "deviceType": "TH-Sensor" }.
{ "exchangeNamePattern": "telemetry", "routingKeyPattern": "${deviceType}", "messageProperties": "PERSISTENT_TEXT_PLAIN", "host": "rabbitmq.example.com", "port": 5672, "virtualHost": "/", "username": "iot-user", "password": "secure-password", "automaticRecoveryEnabled": true, "connectionTimeout": 60000, "handshakeTimeout": 10000, "clientProperties": {}}Result: message published to telemetry exchange with routing key TH-Sensor, persistent. Routes via Success.
Example 2 — Dynamic exchange and routing key from alarm data
Section titled “Example 2 — Dynamic exchange and routing key from alarm data”Data: { "type": "HighTemperature", "severity": "CRITICAL" } | Metadata: { "deviceType": "temperature-sensor" }.
{ "exchangeNamePattern": "alarms-$[severity]", "routingKeyPattern": "alarm.$[type].${deviceType}", "messageProperties": "PERSISTENT_BASIC", "host": "rabbitmq.example.com", "port": 5672, "virtualHost": "/iot", "username": "alarm-publisher", "password": "secure-password", "automaticRecoveryEnabled": true, "connectionTimeout": 60000, "handshakeTimeout": 10000, "clientProperties": {}}Result: published to exchange alarms-CRITICAL with routing key alarm.HighTemperature.temperature-sensor. Routes via Success.
JSON schema
Section titled “JSON schema”{ "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "TbRabbitMqNodeConfiguration", "type": "object", "required": ["host"], "additionalProperties": false, "properties": { "exchangeNamePattern": { "type": "string" }, "routingKeyPattern": { "type": "string" }, "messageProperties": { "type": ["string", "null"], "enum": [null, "BASIC", "TEXT_PLAIN", "MINIMAL_BASIC", "MINIMAL_PERSISTENT_BASIC", "PERSISTENT_BASIC", "PERSISTENT_TEXT_PLAIN"] }, "host": { "type": "string" }, "port": { "type": "integer" }, "virtualHost": { "type": "string" }, "username": { "type": "string" }, "password": { "type": "string" }, "automaticRecoveryEnabled": { "type": "boolean" }, "connectionTimeout": { "type": "integer" }, "handshakeTimeout": { "type": "integer" }, "clientProperties": { "type": "object", "additionalProperties": { "type": "string" } } }}