Skip to content
Stand with Ukraine flag

Apache Pulsar Integration

Apache Pulsar Integration connects ThingsBoard to an Apache Pulsar cluster. ThingsBoard subscribes to a Pulsar topic, consumes incoming messages, and decodes them via an uplink converter into device telemetry and attributes.

Before creating the integration, prepare the following:

The Apache Pulsar integration wizard creates the uplink converter inline — you write the decoder function in step 2 of the wizard without leaving the dialog.

The uplink converter decodes incoming Pulsar messages and maps them to the ThingsBoard data model. Apache Pulsar uses a generic uplink converter.

The decoder function receives:

  • payload — raw Pulsar message body as a UTF-8 byte array
  • metadataintegrationName and any key-value pairs configured in the integration’s Metadata settings

Sample payload:

{
"deviceName": "Sensor A1",
"deviceType": "thermostat",
"temperature": 23.5,
"humidity": 60
}

The decoder function used in this tutorial:

// Decode an uplink message from a buffer
// payload - array of bytes
// metadata - key/value object
/** Decoder **/
var data = decodeToJson(payload);
var deviceName = data.deviceName != null ? data.deviceName : 'Unknown Device';
var deviceType = data.deviceType != null ? data.deviceType : 'default';
// Timestamp: use ts (Unix ms), timestamp (ISO 8601), or server time
var timestamp = -1;
if (data.ts != null) {
timestamp = data.ts;
} else if (data.timestamp != null) {
timestamp = new Date(data.timestamp).getTime();
}
if (timestamp == -1) { timestamp = Date.now(); }
var telemetry = {};
var attributes = {
integrationName: metadata['integrationName']
};
var excludeFromTelemetryList = ["deviceName", "deviceType", "ts", "timestamp"];
telemetry.putAll(toFlatMap(data, excludeFromTelemetryList, true));
var result = {
deviceName: deviceName,
deviceType: deviceType,
attributes: attributes,
telemetry: { ts: timestamp, values: telemetry }
};
/** Helper functions 'decodeToString', 'decodeToJson' and 'toFlatMap' are already built-in **/
return result;

To adapt this converter to your device:

  • Different device name / type fields — replace data.deviceName and data.deviceType with the field names your device sends.
  • Timestamp — name the field ts for Unix milliseconds or timestamp for ISO 8601. Without either, the converter falls back to server receive time.
  • Telemetry fields — all JSON fields not in excludeFromTelemetryList are mapped to telemetry automatically. Add field names to the list to exclude them.
  • Static attributes — add device properties (e.g. firmware version, location) to the attributes object.
  • Assets instead of devices — use assetName / assetType instead of deviceName / deviceType.
  1. Go to Integrations center ⇾ Integrations and click + Add integration.
  2. Basic settings:
    • Set Integration type to Apache Pulsar and enter a name (e.g. Apache Pulsar Integration).
    • Enable integration and Allow create devices or assets are on by default.
    • Click Next.
  3. Uplink data converter:
    • Select Create new. A name is pre-filled — change it if needed.
    • Replace the default decoder with the function from the Uplink decoder section above.
    • Click Next.
  4. Connection:
    • Fill in Service URL, Topic, Subscription name, and the remaining parameters.
    • Select Credentials type and provide a token if required.
    • See Connection settings for a full description.
    • Click Add to save the integration.
Service URL

The Pulsar broker endpoint. Use pulsar:// for plain connections (default port 6650) and pulsar+ssl:// for TLS-encrypted connections (default port 6651).

Examples:

  • Local: pulsar://localhost:6650
  • TLS / managed: pulsar+ssl://broker.example.com:6651
Topic

The Pulsar topic to consume messages from (e.g. my-topic). Pulsar topics follow the format {persistent|non-persistent}://{tenant}/{namespace}/{topic-name}. The broker resolves a short name like my-topic to persistent://public/default/my-topic on a standalone instance.

Subscription name

A string identifier for this consumer subscription (e.g. my-subscription). The subscription tracks the last acknowledged message offset and resumes from the same position after a restart. Use a consistent name across integration restarts.

Max number of messages

Maximum number of messages to fetch in a single poll. Default: 1000.

Max number of bytes

Maximum total payload size to fetch per poll, in bytes. Default: 10485760 (10 MB).

Timeout in milliseconds

Consumer receive timeout — how long the client waits for a message before returning an empty result. Default: 100.

Credentials type
OptionDescription
AnonymousNo authentication. Suitable for local or open brokers.
Security tokenJWT bearer token. Required for StreamNative Cloud and other managed Pulsar services that enforce token authentication.
Execute remotely

When enabled, ThingsBoard generates an Integration key and Integration secret that allow the integration to run as a separate process outside the ThingsBoard cluster — useful when the broker is only reachable from a restricted network. See Remote Integration.

Publish a test message to the Pulsar topic using the pulsar-client CLI tool.

If your broker is running in Docker, exec into the container and use the built-in CLI:

Terminal window
docker exec -it pulsar bin/pulsar-client produce my-topic \
--num-produce 1 \
--messages '{"deviceName":"Sensor A1","deviceType":"thermostat","temperature":23.5,"humidity":60}'

Device — go to Entities ⇾ Devices. The device Sensor A1 is automatically provisioned on the first message. Open it and check the Latest telemetry tab — temperature and humidity should reflect the published values.

Integration events — go to Integrations center ⇾ Integrations, open Apache Pulsar Integration, and check the Events tab. An Uplink event with Status: OK confirms the message was processed. Click in the Message column to inspect the raw payload.

Converter events — go to Integrations center ⇾ Data converters, open the uplink converter, and click the Events tab. Inspect the In (raw payload), Out (decoded result), and Metadata columns to verify the decoder output.