Skip to content
Stand with Ukraine flag

RabbitMQ

Use this node to publish incoming message data to a RabbitMQ exchange — for example, routing device telemetry to microservices via a topic exchange, delivering alarms to severity-specific queues, or integrating with legacy AMQP-based systems.

  • Exchange name pattern — target exchange. Leave empty to use the default exchange. Supports templatization.
  • Routing key pattern — routing key for message distribution. Supports templatization. Leave empty for fanout exchanges.
  • Message properties — predefined AMQP properties:
ValuePersistenceContent type
(empty)Non-persistentNone
BASICNon-persistentNone
TEXT_PLAINNon-persistenttext/plain
MINIMAL_BASICNon-persistentNone
MINIMAL_PERSISTENT_BASICPersistentNone
PERSISTENT_BASICPersistentNone
PERSISTENT_TEXT_PLAINPersistenttext/plain
  • Host — required. RabbitMQ server hostname or IP.
  • Port — AMQP port. Default: 5672.
  • Virtual host — RabbitMQ virtual host. Default: /.
  • Username / Password — broker authentication. Default: guest/guest.
  • Automatic recovery — reconnect and restore topology automatically after network failures. Default: false.
  • Connection timeout (ms) — TCP connection establishment timeout. Default: 60000.
  • Handshake timeout (ms) — AMQP handshake timeout after TCP connects. Default: 10000.
  • Client properties — custom key-value metadata sent to RabbitMQ during connection (appears in the management UI).

On Success: message data and metadata are not modified.

On Failure: the error key is added to message metadata in the format class ExceptionClass: message.

  1. Resolve exchange and routing key patterns from message data and metadata.
  2. Convert message data to bytes (UTF-8) and publish to RabbitMQ with the configured properties.
  3. On success: forward the unchanged message via Success.
  4. On error: add error to metadata; route via Failure.
ConnectionCondition
SuccessMessage published. Outgoing message is unchanged.
FailureBroker rejected the message, invalid exchange/routing, connection error, or unexpected error.

Example 1 — Publish telemetry with device-type routing

Section titled “Example 1 — Publish telemetry with device-type routing”

Data: { "temperature": 25.5, "humidity": 60.2 } | Metadata: { "deviceType": "TH-Sensor" }.

{
"exchangeNamePattern": "telemetry",
"routingKeyPattern": "${deviceType}",
"messageProperties": "PERSISTENT_TEXT_PLAIN",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/",
"username": "iot-user",
"password": "secure-password",
"automaticRecoveryEnabled": true,
"connectionTimeout": 60000,
"handshakeTimeout": 10000,
"clientProperties": {}
}

Result: message published to telemetry exchange with routing key TH-Sensor, persistent. Routes via Success.


Example 2 — Dynamic exchange and routing key from alarm data

Section titled “Example 2 — Dynamic exchange and routing key from alarm data”

Data: { "type": "HighTemperature", "severity": "CRITICAL" } | Metadata: { "deviceType": "temperature-sensor" }.

{
"exchangeNamePattern": "alarms-$[severity]",
"routingKeyPattern": "alarm.$[type].${deviceType}",
"messageProperties": "PERSISTENT_BASIC",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/iot",
"username": "alarm-publisher",
"password": "secure-password",
"automaticRecoveryEnabled": true,
"connectionTimeout": 60000,
"handshakeTimeout": 10000,
"clientProperties": {}
}

Result: published to exchange alarms-CRITICAL with routing key alarm.HighTemperature.temperature-sensor. Routes via Success.

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TbRabbitMqNodeConfiguration",
"type": "object",
"required": ["host"],
"additionalProperties": false,
"properties": {
"exchangeNamePattern": { "type": "string" },
"routingKeyPattern": { "type": "string" },
"messageProperties": { "type": ["string", "null"], "enum": [null, "BASIC", "TEXT_PLAIN", "MINIMAL_BASIC", "MINIMAL_PERSISTENT_BASIC", "PERSISTENT_BASIC", "PERSISTENT_TEXT_PLAIN"] },
"host": { "type": "string" },
"port": { "type": "integer" },
"virtualHost": { "type": "string" },
"username": { "type": "string" },
"password": { "type": "string" },
"automaticRecoveryEnabled": { "type": "boolean" },
"connectionTimeout": { "type": "integer" },
"handshakeTimeout": { "type": "integer" },
"clientProperties": { "type": "object", "additionalProperties": { "type": "string" } }
}
}