Kafka Integration
Kafka Integration connects ThingsBoard to Apache Kafka message brokers. It subscribes to one or more Kafka topics, consumes incoming messages, and transforms them into ThingsBoard telemetry and attributes via an uplink converter.
Prerequisites
Section titled “Prerequisites”A running Kafka broker is required before creating the integration.
Use a local installation or a cloud-managed cluster such as Confluent Cloud.
ThingsBoard integration setup
Section titled “ThingsBoard integration setup”Create an uplink converter
Section titled “Create an uplink converter”The uplink converter decodes the Kafka message body and maps it to the ThingsBoard data model. Kafka uses a generic uplink converter.
The decoder function receives:
payload— raw bytes of the Kafka message valuemetadata—integrationNameand any key-value pairs configured in the integration’s Metadata settings
It must return deviceName, deviceType, telemetry, and attributes. Both telemetry and attributes are flat key-value maps — nested objects are not supported.
In this example, device 43T1YH-REE sends a JSON payload where telemetry readings are packed as a 4-byte hex string in the data field:
{ "EUI": "43T1YH-REE", "ts": 1638876127000, "data": "3d1f0059", "port": 10, "freq": 24300, "rssi": -130, "serial": "230165HRT"}EUI is used as the device name. Each 2-character hex segment of data encodes one sensor reading:
3d⇾ Temperature1f⇾ Humidity00⇾ Fan speed59⇾ Pressure
/** Decoder **/
var payloadJson = decodeToJson(payload);var deviceName = payloadJson.EUI;var deviceType = 'Monitoring-sensor';
var result = { deviceName: deviceName, deviceType: deviceType, attributes: {}, telemetry: { ts: payloadJson.ts, values: { Temperature: hexToInt(payloadJson.data.substring(0, 2)), Humidity: hexToInt(payloadJson.data.substring(2, 4)), Fan: hexToInt(payloadJson.data.substring(4, 6)), Port: payloadJson.port, Freq: payloadJson.freq, Pressure: hexToInt(payloadJson.data.substring(6, 8)), rssi: payloadJson.rssi, serial: payloadJson.serial } }};
function hexToInt(hex) { return parseInt(hex, 16);}
/** Helper function 'decodeToJson' is already built-in **/
return result;/** Decoder **/
var payloadJson = decodeToJson(payload);var deviceName = payloadJson.EUI;var deviceType = 'Monitoring-sensor';
var result = { deviceName: deviceName, deviceType: deviceType, attributes: {}, telemetry: { ts: payloadJson.ts, values: { Temperature: hexToInt(payloadJson.data.substring(0, 2)), Humidity: hexToInt(payloadJson.data.substring(2, 4)), Fan: hexToInt(payloadJson.data.substring(4, 6)), Port: payloadJson.port, Freq: payloadJson.freq, Pressure: hexToInt(payloadJson.data.substring(6, 8)), rssi: payloadJson.rssi, serial: payloadJson.serial } }};
/** Helper functions **/
function decodeToString(payload) { return String.fromCharCode.apply(String, payload);}
function decodeToJson(payload) { var str = decodeToString(payload); return JSON.parse(str);}
function hexToInt(value) { return parseInt('0x' + value.match(/../g).reverse().join(''));}
return result;Create the Kafka integration
Section titled “Create the Kafka integration”- Go to Integrations center → Integrations and click + Add integration.
- Basic settings:
- Set Integration type to Kafka and enter a name (e.g. “Kafka integration”).
- Enable integration and Allow create devices or assets are on by default.
- Click Next.
- Uplink data converter:
- Click Create new, enter a name, and paste the decoder script from the section above. Alternatively, click Select existing to reuse a previously created converter.
- Click Next.
- Connection:
- Fill in Group ID, Client ID, Bootstrap servers, Poll interval, and Topics.
- Toggle Auto create topics if the topic does not exist yet.
- For Confluent Cloud or other secured brokers, expand Advanced settings and add SASL/SSL properties under Other properties.
- Click Add.
Connection settings
Section titled “Connection settings”Group ID
A string you define — it identifies the consumer group this integration belongs to. You can use any value (e.g. tb-kafka-group). All integration instances sharing the same Group ID form a consumer group and split the partition load, so each message is delivered to only one instance.
Client ID
A string you define — it labels this consumer client for monitoring and logging purposes (e.g. tb-kafka-client). Must be unique per running consumer if you need to distinguish instances in Kafka metrics.
Bootstrap servers
Comma-separated list of Kafka broker addresses in host:port format, e.g. localhost:9092. Used to establish the initial connection to the cluster.
Poll interval (ms)
How often the integration polls Kafka for new messages. Default: 5000 ms.
Topics
Comma-separated list of Kafka topics to subscribe to, e.g. my-topic.
Auto create topics
When enabled, ThingsBoard automatically creates any subscribed topics that do not already exist on the broker.
Other properties
Additional Kafka consumer properties as key-value pairs. Use this for security configuration such as SASL/SSL when connecting to Confluent Cloud or other secured brokers.
For Confluent Cloud, add the following four entries:
| Key | Value |
|---|---|
ssl.endpoint.identification.algorithm | https |
sasl.mechanism | PLAIN |
sasl.jaas.config | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; |
security.protocol | SASL_SSL |
Replace CLUSTER_API_KEY and CLUSTER_API_SECRET with the API key and secret from your Confluent Cloud cluster’s Data integration → API keys section.
Execute remotely
When enabled, ThingsBoard generates an Integration key and Integration secret. These credentials allow the integration to run as a separate process outside the ThingsBoard cluster — useful for deployments where the integration must reside in a DMZ or on-premises gateway.
Confluent Cloud setup
Section titled “Confluent Cloud setup”- Get the Bootstrap server address.
Open your Confluent Cloud environment, click the cluster to enter it, then open Cluster settings → General. Copy the Bootstrap server value (
YOUR_DOMAIN.confluent.cloud:9092) — paste it into the Bootstrap servers field in ThingsBoard.
Create an API key. Go to Data integration → API keys and click Create key. Select Global access (development) or Granular access (production), complete the wizard, then copy the generated key and secret. Use them in
sasl.jaas.config:org.apache.kafka.common.security.plain.PlainLoginModule required username="YOUR_KEY" password="YOUR_SECRET";
Create a topic. Go to Topics and click Create topic. Enter a topic name (e.g.
my-topic) and click Create with defaults. Use this name in the ThingsBoard integration Topics field.
Add SASL/SSL properties in ThingsBoard. In the integration Connection step, expand Other properties and add the four entries from the table above, replacing
CLUSTER_API_KEYandCLUSTER_API_SECRETwith the values from step 2.
Send uplink message
Section titled “Send uplink message”Use kafka-console-producer.sh to publish a test message. Replace URL_OF_YOUR_BOOTSTRAP_SERVER with your broker address and my-topic with your configured topic:
echo "{\"EUI\":\"43T1YH-REE\",\"ts\":1638876127000,\"data\":\"3d1f0059\",\"port\":10,\"freq\":24300,\"rssi\":-130,\"serial\":\"230165HRT\"}" | \ /usr/local/kafka/bin/kafka-console-producer.sh \ --broker-list URL_OF_YOUR_BOOTSTRAP_SERVER:9092 \ --topic my-topic > /dev/nullIn the Confluent Cloud web UI, open your topic and use the Produce a new message button on the Messages tab:
Device 43T1YH-REE is auto-created on the first uplink. Go to Entities ⇾ Devices, open the device, and check the Latest telemetry tab to confirm all decoded fields arrived correctly.
Downlink
Section titled “Downlink”The Kafka Integration does not have a dedicated downlink converter. To publish messages from ThingsBoard back to a Kafka topic, use the Kafka Rule Node in the rule engine.
Configure the Kafka Rule Node with the same bootstrap servers and security properties as the integration, but point it to a separate downlink topic.
Enable Debug mode on the node. To verify messages are being published, open the node’s Events tab — each processed message appears as a Debug entry. Expand an entry to inspect the payload. The Confluent Cloud topic Metrics will show a Production throughput spike when messages arrive.