Apache Pulsar Integration
Apache Pulsar Integration connects ThingsBoard to an Apache Pulsar cluster. ThingsBoard subscribes to a Pulsar topic, consumes incoming messages, and decodes them via an uplink converter into device telemetry and attributes.
Prerequisites
Section titled “Prerequisites”Before creating the integration, prepare the following:
- A running Pulsar broker. Use a local Docker installation or a managed service such as StreamNative Cloud.
- A topic to consume from.
- A security token if your broker requires authentication.
ThingsBoard integration setup
Section titled “ThingsBoard integration setup”The Apache Pulsar integration wizard creates the uplink converter inline — you write the decoder function in step 2 of the wizard without leaving the dialog.
Uplink decoder
Section titled “Uplink decoder”The uplink converter decodes incoming Pulsar messages and maps them to the ThingsBoard data model. Apache Pulsar uses a generic uplink converter.
The decoder function receives:
payload— raw Pulsar message body as a UTF-8 byte arraymetadata—integrationNameand any key-value pairs configured in the integration’s Metadata settings
Sample payload:
{ "deviceName": "Sensor A1", "deviceType": "thermostat", "temperature": 23.5, "humidity": 60}The decoder function used in this tutorial:
// Decode an uplink message from a buffer// payload - array of bytes// metadata - key/value object
/** Decoder **/
var data = decodeToJson(payload);
var deviceName = data.deviceName != null ? data.deviceName : 'Unknown Device';var deviceType = data.deviceType != null ? data.deviceType : 'default';
// Timestamp: use ts (Unix ms), timestamp (ISO 8601), or server timevar timestamp = -1;if (data.ts != null) { timestamp = data.ts;} else if (data.timestamp != null) { timestamp = new Date(data.timestamp).getTime();}if (timestamp == -1) { timestamp = Date.now(); }
var telemetry = {};var attributes = { integrationName: metadata['integrationName']};
var excludeFromTelemetryList = ["deviceName", "deviceType", "ts", "timestamp"];telemetry.putAll(toFlatMap(data, excludeFromTelemetryList, true));
var result = { deviceName: deviceName, deviceType: deviceType, attributes: attributes, telemetry: { ts: timestamp, values: telemetry }};
/** Helper functions 'decodeToString', 'decodeToJson' and 'toFlatMap' are already built-in **/
return result;// Decode an uplink message from a buffer// payload - array of bytes// metadata - key/value object
/** Decoder **/
var data = decodeToJson(payload);
var deviceName = data.deviceName != null ? data.deviceName : 'Unknown Device';var deviceType = data.deviceType != null ? data.deviceType : 'default';
var timestamp = -1;if (data.ts != null) { timestamp = data.ts;} else if (data.timestamp != null) { timestamp = new Date(data.timestamp).getTime();}if (timestamp == -1) { timestamp = Date.now(); }
var telemetry = {};var attributes = { integrationName: metadata['integrationName']};
var excludeFromTelemetryList = ["deviceName", "deviceType", "ts", "timestamp"];toFlatMap(data, telemetry, excludeFromTelemetryList);
var result = { deviceName: deviceName, deviceType: deviceType, attributes: attributes, telemetry: { ts: timestamp, values: telemetry }};
/** Helper functions **/
function decodeToString(payload) { return String.fromCharCode.apply(String, payload);}
function decodeToJson(payload) { return JSON.parse(decodeToString(payload));}
function toFlatMap(obj, result, excludeList, prefix) { prefix = prefix || ''; for (var key in obj) { if (excludeList.indexOf(key) !== -1) continue; var value = obj[key]; var fullKey = prefix ? prefix + '.' + key : key; if (typeof value === 'object' && value !== null && !Array.isArray(value)) { toFlatMap(value, result, excludeList, fullKey); } else { result[fullKey] = value; } }}
return result;To adapt this converter to your device:
- Different device name / type fields — replace
data.deviceNameanddata.deviceTypewith the field names your device sends. - Timestamp — name the field
tsfor Unix milliseconds ortimestampfor ISO 8601. Without either, the converter falls back to server receive time. - Telemetry fields — all JSON fields not in
excludeFromTelemetryListare mapped to telemetry automatically. Add field names to the list to exclude them. - Static attributes — add device properties (e.g. firmware version, location) to the
attributesobject. - Assets instead of devices — use
assetName/assetTypeinstead ofdeviceName/deviceType.
Create the integration
Section titled “Create the integration”- Go to Integrations center ⇾ Integrations and click + Add integration.
- Basic settings:
- Set Integration type to Apache Pulsar and enter a name (e.g.
Apache Pulsar Integration). - Enable integration and Allow create devices or assets are on by default.
- Click Next.
- Set Integration type to Apache Pulsar and enter a name (e.g.
- Uplink data converter:
- Select Create new. A name is pre-filled — change it if needed.
- Replace the default decoder with the function from the Uplink decoder section above.
- Click Next.
- Connection:
- Fill in Service URL, Topic, Subscription name, and the remaining parameters.
- Select Credentials type and provide a token if required.
- See Connection settings for a full description.
- Click Add to save the integration.
Connection settings
Section titled “Connection settings”Service URL
The Pulsar broker endpoint. Use pulsar:// for plain connections (default port 6650) and pulsar+ssl:// for TLS-encrypted connections (default port 6651).
Examples:
- Local:
pulsar://localhost:6650 - TLS / managed:
pulsar+ssl://broker.example.com:6651
Topic
The Pulsar topic to consume messages from (e.g. my-topic). Pulsar topics follow the format {persistent|non-persistent}://{tenant}/{namespace}/{topic-name}. The broker resolves a short name like my-topic to persistent://public/default/my-topic on a standalone instance.
Subscription name
A string identifier for this consumer subscription (e.g. my-subscription). The subscription tracks the last acknowledged message offset and resumes from the same position after a restart. Use a consistent name across integration restarts.
Max number of messages
Maximum number of messages to fetch in a single poll. Default: 1000.
Max number of bytes
Maximum total payload size to fetch per poll, in bytes. Default: 10485760 (10 MB).
Timeout in milliseconds
Consumer receive timeout — how long the client waits for a message before returning an empty result. Default: 100.
Credentials type
| Option | Description |
|---|---|
| Anonymous | No authentication. Suitable for local or open brokers. |
| Security token | JWT bearer token. Required for StreamNative Cloud and other managed Pulsar services that enforce token authentication. |
Execute remotely
When enabled, ThingsBoard generates an Integration key and Integration secret that allow the integration to run as a separate process outside the ThingsBoard cluster — useful when the broker is only reachable from a restricted network. See Remote Integration.
Send a test uplink
Section titled “Send a test uplink”Publish a test message to the Pulsar topic using the pulsar-client CLI tool.
If your broker is running in Docker, exec into the container and use the built-in CLI:
docker exec -it pulsar bin/pulsar-client produce my-topic \ --num-produce 1 \ --messages '{"deviceName":"Sensor A1","deviceType":"thermostat","temperature":23.5,"humidity":60}'If Pulsar binaries are installed locally:
bin/pulsar-client produce my-topic \ --num-produce 1 \ --messages '{"deviceName":"Sensor A1","deviceType":"thermostat","temperature":23.5,"humidity":60}'For a remote broker, add --url pulsar://your-broker-host:6650.
pip install pulsar-clientimport pulsar, json
client = pulsar.Client('pulsar://localhost:6650')producer = client.create_producer('my-topic')payload = json.dumps({ "deviceName": "Sensor A1", "deviceType": "thermostat", "temperature": 23.5, "humidity": 60})producer.send(payload.encode('utf-8'))client.close()print("Message sent.")Verify in ThingsBoard
Section titled “Verify in ThingsBoard”Device — go to Entities ⇾ Devices. The device Sensor A1 is automatically provisioned on the first message. Open it and check the Latest telemetry tab — temperature and humidity should reflect the published values.
Integration events — go to Integrations center ⇾ Integrations, open Apache Pulsar Integration, and check the Events tab. An Uplink event with Status: OK confirms the message was processed. Click … in the Message column to inspect the raw payload.
Converter events — go to Integrations center ⇾ Data converters, open the uplink converter, and click the Events tab. Inspect the In (raw payload), Out (decoded result), and Metadata columns to verify the decoder output.
See also
Section titled “See also”Was this helpful?