Kafka
Use this node to publish incoming message data to an Apache Kafka topic — for example, streaming device telemetry to a data lake pipeline, forwarding alarms to a real-time analytics system, or integrating with event-driven microservices.
Configuration
Section titled “Configuration”Topic and key
Section titled “Topic and key”- Topic pattern — required. Target Kafka topic. Supports templatization.
- Key pattern — partition key for deterministic routing. Supports templatization. Leave empty for default distribution.
Producer settings
Section titled “Producer settings”- Automatically retry times if fails (
retries) — number of retries on transient errors.0disables retries. - Produces batch size in bytes (
batch.size) — maximum batch size for accumulation.0disables batching. - Time to buffer locally (ms) (
linger.ms) — additional wait time to accumulate more records into a batch. - Client buffer max size in bytes (
buffer.memory) — total producer buffer memory. - Number of acknowledgments (
acks):
| Value | Meaning |
|---|---|
0 | No acknowledgment — lowest latency, messages may be lost |
1 | Leader write acknowledged — balanced durability |
-1 / all | All in-sync replicas acknowledged — strongest durability |
Connection
Section titled “Connection”- Bootstrap servers — comma-separated list of Kafka broker addresses (
host:port).
Advanced
Section titled “Advanced”- Other properties — custom Kafka producer key-value properties (e.g., SSL/TLS configuration).
- Add Message metadata key-value pairs to Kafka record headers — when enabled, all metadata entries are added as Kafka record headers with the prefix
tb_msg_md_. - Charset encoding — encoding for metadata header values (applies when headers are enabled).
Outgoing message format
Section titled “Outgoing message format”On Success
Section titled “On Success”The following fields are added to message metadata:
| Key | Description |
|---|---|
offset | Partition offset of the published record |
partition | Partition number |
topic | Topic name |
On Failure
Section titled “On Failure”| Key | Description |
|---|---|
error | Exception class and message in the format class ExceptionClass: message |
Message data is not modified.
Message processing algorithm
Section titled “Message processing algorithm”- Resolve the topic and key patterns from message data and metadata.
- If metadata headers are enabled, encode all metadata entries as Kafka headers with the
tb_msg_md_prefix. - Publish message data as the record value (UTF-8 string) to Kafka asynchronously.
- On broker acknowledgment: add
offset,partition, andtopicto metadata; route viaSuccess. - On error: add
errorto metadata; route viaFailure.
Output connections
Section titled “Output connections”| Connection | Condition |
|---|---|
Success | Record published and acknowledged. Metadata includes offset, partition, topic. |
Failure | Broker rejected the message, topic missing, producer timeout, init error, or unexpected error. |
Examples
Section titled “Examples”Example 1 — Publish telemetry to a topic
Section titled “Example 1 — Publish telemetry to a topic”Data: { "temperature": 25.5, "humidity": 60.2 } | Metadata: { "deviceType": "TH-Sensor", "deviceName": "Sensor-001" }.
{ "topicPattern": "telemetry", "keyPattern": "", "bootstrapServers": "kafka.example.com:9092", "retries": 3, "batchSize": 16384, "linger": 10, "bufferMemory": 33554432, "acks": "-1", "otherProperties": {}, "addMetadataKeyValuesAsKafkaHeaders": false, "kafkaHeadersCharset": "UTF-8"}Outgoing metadata adds: { "offset": "12345", "partition": "2", "topic": "telemetry" }. Routes via Success.
JSON schema
Section titled “JSON schema”{ "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "TbKafkaNodeConfiguration", "type": "object", "required": ["topicPattern", "bootstrapServers"], "additionalProperties": false, "properties": { "topicPattern": { "type": "string" }, "keyPattern": { "type": ["string", "null"] }, "bootstrapServers": { "type": "string" }, "retries": { "type": "integer", "minimum": 0 }, "batchSize": { "type": "integer", "minimum": 0 }, "linger": { "type": "integer", "minimum": 0 }, "bufferMemory": { "type": "integer", "minimum": 0 }, "acks": { "type": "string", "enum": ["-1", "0", "1", "all"] }, "otherProperties": { "type": "object", "additionalProperties": { "type": "string" } }, "addMetadataKeyValuesAsKafkaHeaders": { "type": "boolean" }, "kafkaHeadersCharset": { "type": "string" } }}