Skip to content
Stand with Ukraine flag

Kafka Integration

Kafka Integration connects ThingsBoard to Apache Kafka message brokers. It subscribes to one or more Kafka topics, consumes incoming messages, and transforms them into ThingsBoard telemetry and attributes via an uplink converter.

Kafka Integration architecture diagram — devices and IoT platforms publish messages to Kafka brokers; the Kafka Integration consumes topics, passes payloads through an uplink converter, and pushes telemetry and attributes to ThingsBoard; a separate Kafka Rule Node handles downlink

A running Kafka broker is required before creating the integration.

Use a local installation or a cloud-managed cluster such as Confluent Cloud.

The uplink converter decodes the Kafka message body and maps it to the ThingsBoard data model. Kafka uses a generic uplink converter.

The decoder function receives:

  • payload — raw bytes of the Kafka message value
  • metadataintegrationName and any key-value pairs configured in the integration’s Metadata settings

It must return deviceName, deviceType, telemetry, and attributes. Both telemetry and attributes are flat key-value maps — nested objects are not supported.

In this example, device 43T1YH-REE sends a JSON payload where telemetry readings are packed as a 4-byte hex string in the data field:

{
"EUI": "43T1YH-REE",
"ts": 1638876127000,
"data": "3d1f0059",
"port": 10,
"freq": 24300,
"rssi": -130,
"serial": "230165HRT"
}

EUI is used as the device name. Each 2-character hex segment of data encodes one sensor reading:

  • 3d ⇾ Temperature
  • 1f ⇾ Humidity
  • 00 ⇾ Fan speed
  • 59 ⇾ Pressure
/** Decoder **/
var payloadJson = decodeToJson(payload);
var deviceName = payloadJson.EUI;
var deviceType = 'Monitoring-sensor';
var result = {
deviceName: deviceName,
deviceType: deviceType,
attributes: {},
telemetry: {
ts: payloadJson.ts,
values: {
Temperature: hexToInt(payloadJson.data.substring(0, 2)),
Humidity: hexToInt(payloadJson.data.substring(2, 4)),
Fan: hexToInt(payloadJson.data.substring(4, 6)),
Port: payloadJson.port,
Freq: payloadJson.freq,
Pressure: hexToInt(payloadJson.data.substring(6, 8)),
rssi: payloadJson.rssi,
serial: payloadJson.serial
}
}
};
function hexToInt(hex) {
return parseInt(hex, 16);
}
/** Helper function 'decodeToJson' is already built-in **/
return result;
  1. Go to Integrations center → Integrations and click + Add integration.
  2. Basic settings:
    • Set Integration type to Kafka and enter a name (e.g. “Kafka integration”).
    • Enable integration and Allow create devices or assets are on by default.
    • Click Next.
  1. Uplink data converter:
    • Click Create new, enter a name, and paste the decoder script from the section above. Alternatively, click Select existing to reuse a previously created converter.
    • Click Next.
  1. Connection:
    • Fill in Group ID, Client ID, Bootstrap servers, Poll interval, and Topics.
    • Toggle Auto create topics if the topic does not exist yet.
    • For Confluent Cloud or other secured brokers, expand Advanced settings and add SASL/SSL properties under Other properties.
    Read more about each parameter in connection settings.
  2. Click Add.

Group ID

A string you define — it identifies the consumer group this integration belongs to. You can use any value (e.g. tb-kafka-group). All integration instances sharing the same Group ID form a consumer group and split the partition load, so each message is delivered to only one instance.

Client ID

A string you define — it labels this consumer client for monitoring and logging purposes (e.g. tb-kafka-client). Must be unique per running consumer if you need to distinguish instances in Kafka metrics.

Bootstrap servers

Comma-separated list of Kafka broker addresses in host:port format, e.g. localhost:9092. Used to establish the initial connection to the cluster.

Poll interval (ms)

How often the integration polls Kafka for new messages. Default: 5000 ms.

Topics

Comma-separated list of Kafka topics to subscribe to, e.g. my-topic.

Auto create topics

When enabled, ThingsBoard automatically creates any subscribed topics that do not already exist on the broker.

Other properties

Additional Kafka consumer properties as key-value pairs. Use this for security configuration such as SASL/SSL when connecting to Confluent Cloud or other secured brokers.

For Confluent Cloud, add the following four entries:

KeyValue
ssl.endpoint.identification.algorithmhttps
sasl.mechanismPLAIN
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET";
security.protocolSASL_SSL

Replace CLUSTER_API_KEY and CLUSTER_API_SECRET with the API key and secret from your Confluent Cloud cluster’s Data integration → API keys section.

Execute remotely

When enabled, ThingsBoard generates an Integration key and Integration secret. These credentials allow the integration to run as a separate process outside the ThingsBoard cluster — useful for deployments where the integration must reside in a DMZ or on-premises gateway.

  1. Get the Bootstrap server address. Open your Confluent Cloud environment, click the cluster to enter it, then open Cluster settings → General. Copy the Bootstrap server value (YOUR_DOMAIN.confluent.cloud:9092) — paste it into the Bootstrap servers field in ThingsBoard.
  1. Create an API key. Go to Data integration → API keys and click Create key. Select Global access (development) or Granular access (production), complete the wizard, then copy the generated key and secret. Use them in sasl.jaas.config:

    org.apache.kafka.common.security.plain.PlainLoginModule required username="YOUR_KEY" password="YOUR_SECRET";
  1. Create a topic. Go to Topics and click Create topic. Enter a topic name (e.g. my-topic) and click Create with defaults. Use this name in the ThingsBoard integration Topics field.

  1. Add SASL/SSL properties in ThingsBoard. In the integration Connection step, expand Other properties and add the four entries from the table above, replacing CLUSTER_API_KEY and CLUSTER_API_SECRET with the values from step 2.

Use kafka-console-producer.sh to publish a test message. Replace URL_OF_YOUR_BOOTSTRAP_SERVER with your broker address and my-topic with your configured topic:

Terminal window
echo "{\"EUI\":\"43T1YH-REE\",\"ts\":1638876127000,\"data\":\"3d1f0059\",\"port\":10,\"freq\":24300,\"rssi\":-130,\"serial\":\"230165HRT\"}" | \
/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list URL_OF_YOUR_BOOTSTRAP_SERVER:9092 \
--topic my-topic > /dev/null

Device 43T1YH-REE is auto-created on the first uplink. Go to Entities ⇾ Devices, open the device, and check the Latest telemetry tab to confirm all decoded fields arrived correctly.

The Kafka Integration does not have a dedicated downlink converter. To publish messages from ThingsBoard back to a Kafka topic, use the Kafka Rule Node in the rule engine.

Configure the Kafka Rule Node with the same bootstrap servers and security properties as the integration, but point it to a separate downlink topic.

Enable Debug mode on the node. To verify messages are being published, open the node’s Events tab — each processed message appears as a Debug entry. Expand an entry to inspect the payload. The Confluent Cloud topic Metrics will show a Production throughput spike when messages arrive.