Skip to content
Stand with Ukraine flag

Kafka

Use this node to publish incoming message data to an Apache Kafka topic — for example, streaming device telemetry to a data lake pipeline, forwarding alarms to a real-time analytics system, or integrating with event-driven microservices.

  • Topic pattern — required. Target Kafka topic. Supports templatization.
  • Key pattern — partition key for deterministic routing. Supports templatization. Leave empty for default distribution.
  • Automatically retry times if fails (retries) — number of retries on transient errors. 0 disables retries.
  • Produces batch size in bytes (batch.size) — maximum batch size for accumulation. 0 disables batching.
  • Time to buffer locally (ms) (linger.ms) — additional wait time to accumulate more records into a batch.
  • Client buffer max size in bytes (buffer.memory) — total producer buffer memory.
  • Number of acknowledgments (acks):
ValueMeaning
0No acknowledgment — lowest latency, messages may be lost
1Leader write acknowledged — balanced durability
-1 / allAll in-sync replicas acknowledged — strongest durability
  • Bootstrap servers — comma-separated list of Kafka broker addresses (host:port).
  • Other properties — custom Kafka producer key-value properties (e.g., SSL/TLS configuration).
  • Add Message metadata key-value pairs to Kafka record headers — when enabled, all metadata entries are added as Kafka record headers with the prefix tb_msg_md_.
  • Charset encoding — encoding for metadata header values (applies when headers are enabled).

The following fields are added to message metadata:

KeyDescription
offsetPartition offset of the published record
partitionPartition number
topicTopic name
KeyDescription
errorException class and message in the format class ExceptionClass: message

Message data is not modified.

  1. Resolve the topic and key patterns from message data and metadata.
  2. If metadata headers are enabled, encode all metadata entries as Kafka headers with the tb_msg_md_ prefix.
  3. Publish message data as the record value (UTF-8 string) to Kafka asynchronously.
  4. On broker acknowledgment: add offset, partition, and topic to metadata; route via Success.
  5. On error: add error to metadata; route via Failure.
ConnectionCondition
SuccessRecord published and acknowledged. Metadata includes offset, partition, topic.
FailureBroker rejected the message, topic missing, producer timeout, init error, or unexpected error.

Example 1 — Publish telemetry to a topic

Section titled “Example 1 — Publish telemetry to a topic”

Data: { "temperature": 25.5, "humidity": 60.2 } | Metadata: { "deviceType": "TH-Sensor", "deviceName": "Sensor-001" }.

{
"topicPattern": "telemetry",
"keyPattern": "",
"bootstrapServers": "kafka.example.com:9092",
"retries": 3,
"batchSize": 16384,
"linger": 10,
"bufferMemory": 33554432,
"acks": "-1",
"otherProperties": {},
"addMetadataKeyValuesAsKafkaHeaders": false,
"kafkaHeadersCharset": "UTF-8"
}

Outgoing metadata adds: { "offset": "12345", "partition": "2", "topic": "telemetry" }. Routes via Success.

{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "TbKafkaNodeConfiguration",
"type": "object",
"required": ["topicPattern", "bootstrapServers"],
"additionalProperties": false,
"properties": {
"topicPattern": { "type": "string" },
"keyPattern": { "type": ["string", "null"] },
"bootstrapServers": { "type": "string" },
"retries": { "type": "integer", "minimum": 0 },
"batchSize": { "type": "integer", "minimum": 0 },
"linger": { "type": "integer", "minimum": 0 },
"bufferMemory": { "type": "integer", "minimum": 0 },
"acks": { "type": "string", "enum": ["-1", "0", "1", "all"] },
"otherProperties": { "type": "object", "additionalProperties": { "type": "string" } },
"addMetadataKeyValuesAsKafkaHeaders": { "type": "boolean" },
"kafkaHeadersCharset": { "type": "string" }
}
}