Skip to content
Stand with Ukraine flag

AWS Kinesis Integration

AWS Kinesis Data Streams is a managed service for collecting, processing, and analyzing data streams in real time. After integrating AWS Kinesis with ThingsBoard, you can process and visualize data from Kinesis streams in the ThingsBoard IoT platform.

Obtain AWS access keys. In the AWS console, go to IAM ⇾ Users, open your user, and go to the Security credentials tab. Under Access keys, click Create access key. Note down your Access Key ID, Secret Access Key, and your AWS region name — you will need all three below.

Install the AWS CLI. Follow the official AWS CLI installation guide for your operating system.

Configure your credentials:

Terminal window
aws configure

Enter the values when prompted:

PromptValue
AWS Access Key IDYour access key ID
AWS Secret Access KeyYour secret access key
Default region namee.g. us-east-1
Default output formate.g. json

Verify the installation:

Terminal window
aws kinesis help

A list of available Kinesis commands is displayed. Press q to close.

AWS Kinesis stores records as raw bytes. When sending data via the CLI with --data, the value is Base64-encoded automatically. ThingsBoard decodes Base64-encoded payloads to a UTF-8 string before passing them to the uplink converter.

This tutorial uses two streams: one for incoming device data (uplink) and one for outgoing commands (downlink).

Create the uplink stream:

Terminal window
aws kinesis create-stream --stream-name my-thingsboard --shard-count 1
aws kinesis describe-stream --stream-name my-thingsboard

Create the downlink stream:

Terminal window
aws kinesis create-stream --stream-name tb-test-downlink --shard-count 1
aws kinesis describe-stream --stream-name tb-test-downlink

Wait until StreamStatus shows ACTIVE before proceeding.

The uplink converter transforms incoming Kinesis records into the ThingsBoard data format. AWS Kinesis uses a generic uplink converter. Kinesis records arrive as raw bytes; ThingsBoard decodes them to a string and passes the result to the decoder as payload.

Sample payload:

{
"devName": "kitchen_thermostat",
"devType": "thermostat",
"temperature": 22
}

The devName field maps to deviceName, devType to deviceType, and temperature is stored as telemetry.

  1. Download the uplink converter file:
  2. Go to Integrations center ⇾ Data converters.
  3. Click + Add data converter ⇾ Import converter.
  4. Drag and drop the downloaded JSON file into the Import converter window and click Import.

The decoder function used in this tutorial:

var data = decodeToJson(payload);
var deviceName = data.devName;
var deviceType = data.devType;
var result = {
deviceName: deviceName,
deviceType: deviceType,
telemetry: {
temperature: data.temperature
}
};
return result;

To adapt this converter to your device:

  • Different field names — replace data.devName, data.devType, and data.temperature with the field names in your payload.
  • Additional telemetry — add more keys to the telemetry object (e.g. humidity: data.humidity).
  • Attributes — add an attributes key alongside telemetry to store static device properties.
  • Nested payload — if your record contains a nested JSON string in the data field, call decodeToJson(data.data) to parse the inner object.

The downlink converter encodes ThingsBoard commands into Kinesis-compatible messages. For the full encoder reference, see Downlink data converter. The encoder output must include:

  • contentType — how data is encoded: TEXT, JSON, or BINARY
  • data — the payload to write to the Kinesis stream
  • metadata.streamName — the target Kinesis stream name
  • metadata.partitionKey — the partition key used to route the record to a shard

Expected output structure:

{
"contentType": "JSON",
"data": "{\"devName\":\"kitchen_thermostat\",\"version\":\"0.11\"}",
"metadata": {
"streamName": "tb-test-downlink",
"partitionKey": "1234"
}
}
  1. Download the downlink converter file:
  2. Go to Integrations center ⇾ Data converters.
  3. Click + Add data converter ⇾ Import converter.
  4. Drag and drop the downloaded JSON file into the Import converter window and click Import.

The encoder used in this tutorial:

var data = {
devName: 'kitchen_thermostat',
version: msg.version
};
var result = {
contentType: 'JSON',
data: JSON.stringify(data),
metadata: {
streamName: 'tb-test-downlink',
partitionKey: '123'
}
};
return result;

To adapt this encoder to your device:

  • Different stream — replace tb-test-downlink in streamName with your target Kinesis stream name.
  • Different partition key — replace '123' with a meaningful partition key (e.g. device ID) to control which shard receives the record.
  • Different payload — replace msg.version with whichever attribute or RPC parameter you want to forward.
  1. Go to Integrations center ⇾ Integrations and click + Add integration.
  2. Basic settings:
    • Set Integration type to AWS Kinesis and enter a name (e.g. AWS Kinesis integration).
    • Enable integration and Allow create devices or assets are on by default.
    • Click Next.
  3. Uplink data converter:
    • Click Select existing and choose the previously imported AWS Kinesis Uplink Converter.
    • Click Next.
  4. Downlink data converter:
    • Click Select existing and choose the previously imported AWS Kinesis Downlink Converter.
    • Click Next.
  5. Connection settings:
    • Stream namemy-thingsboard
    • Region — your AWS region (e.g. eu-west-1)
    • Initial position in streamLatest (reads only new records; use Trim horizon to process all existing records first)
    • Access Key ID and Secret Access Key — your AWS IAM credentials
    • Leave Use Consumers with Enhanced Fan-Out disabled for this tutorial.
    • Read more about each parameter in Connection settings.
  6. Click Add to save the integration.
Stream name

The name of the AWS Kinesis stream ThingsBoard reads from for uplink messages (e.g. my-thingsboard). The stream must exist in your AWS account before the integration can connect.

Region

The AWS region where the stream is deployed (e.g. us-east-1, eu-west-1). Must match the region used when the stream was created.

Initial position in stream

Determines where in the stream ThingsBoard starts reading when the integration is first started or restarted without a saved checkpoint.

ValueDescription
LatestRead only new records produced after the integration starts. Previously produced records are skipped. Use for live data ingestion.
Trim horizonRead from the oldest available record in each shard. Use to process all existing data before switching to live ingestion.
At timestampRead from a specific point in time. An additional timestamp field appears when this option is selected.
Use credentials from the Amazon EC2 Instance Metadata Service

When enabled, ThingsBoard retrieves AWS credentials automatically from the EC2 Instance Metadata Service (IMDS) instead of the static key fields. Enable this only when ThingsBoard is deployed on an EC2 instance with an attached IAM role that grants the required Kinesis permissions. When active, the Access Key ID and Secret Access Key fields are ignored.

Access Key ID

The AWS IAM access key ID used to authenticate API requests. The associated IAM user or role must have the following permissions on the stream:

  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:DescribeStream
  • kinesis:ListShards
  • kinesis:PutRecord (required for downlink)
Secret Access Key

The AWS IAM secret access key corresponding to the Access Key ID above.

Use Consumers with Enhanced Fan-Out

When enabled, ThingsBoard registers as an Enhanced Fan-Out (EFO) consumer, which receives records via a dedicated HTTP/2 push subscription at 2 MB/s per shard — independent of other consumers on the same shard. Without EFO, all standard consumers share a single 2 MB/s polling limit per shard. Recommended for high-throughput workloads or when multiple consumers read the same stream concurrently.

Enabling this option exposes two additional parameters:

ParameterDefaultDescription
Max records10000Maximum number of records to retrieve per request. AWS Kinesis caps this at 10,000 per GetRecords call.
Request timeout (sec)30Timeout in seconds for the SubscribeToShard HTTP/2 request. If the connection is not established within this period, it is retried.
Execute remotely

When enabled, ThingsBoard generates an Integration key and Integration secret that allow the integration to run as a separate process outside the ThingsBoard cluster — useful when the Kinesis endpoint is only reachable from a restricted network.

Advanced settings
ParameterDefaultDescription
Application name(stream name)The Kinesis Client Library (KCL) application name. Used to identify the consumer and track per-shard checkpoints in a dedicated DynamoDB table. Defaults to the stream name if left empty. Set a unique name when multiple independent integrations consume the same stream.
DescriptionOptional text description for the integration.
MetadataKey-value pairs injected into every uplink message as integrationMetadata in the converter script.

Send a test record to the uplink stream using the AWS CLI:

Terminal window
aws kinesis put-record \
--stream-name my-thingsboard \
--partition-key 123 \
--data '{"devName": "kitchen_thermostat", "devType": "thermostat", "temperature": 22}'

Go to Entities ⇾ Devices. The device kitchen_thermostat is automatically provisioned by the integration. Open it and check the Latest Telemetry tab — the temperature value should equal 22.

To route messages from ThingsBoard to the Kinesis downlink stream, connect the Integration Downlink node to the Root Rule Chain.

  1. Open Rule Chains ⇾ Root Rule Chain and click the edit icon.
  2. Find the Integration Downlink node in the node panel, drag it onto the canvas, enter a name (e.g. send downlink), select the AWS Kinesis integration, and click Add.
  3. Connect the Message Type Switch node to the Integration Downlink node using the Attributes Updated relation type.
  4. Click Apply changes to save the rule chain.

Trigger a downlink by adding a shared attribute to the provisioned device:

  1. Go to Entities ⇾ Devices and open kitchen_thermostat.
  2. Open the Attributes tab, switch to Shared attributes, and click +.
  3. Enter key version and value v.0.11, then click Add.

Adding the attribute triggers the Attributes Updated rule chain relation, which routes the message to the Integration Downlink node. The downlink encoder writes a record to the tb-test-downlink stream.

To verify the record was written, get a shard iterator and read from the stream:

Terminal window
aws kinesis get-shard-iterator \
--shard-id shardId-000000000000 \
--shard-iterator-type TRIM_HORIZON \
--stream-name tb-test-downlink

Use the returned ShardIterator value in the following command:

Terminal window
aws kinesis get-records --shard-iterator <ShardIterator>

Sample response:

{
"Records": [
{
"Data": "eyJkZXZOYW1lIjoia2l0Y2hlbl90aGVybW9zdGF0IiwidmVyc2lvbiI6InYuMC4xMSJ9",
"PartitionKey": "1234",
"ApproximateArrivalTimestamp": 1569609612.27,
"SequenceNumber": "49599912710236940383450082324919185009278025474345271298"
}
],
"NextShardIterator": "AAAAAAAAAAFQlgSyxBdpKxlRrocJCYT9YDrCi/vxl0sstJgg4CM+pttVsK4AjjQwJ/QJsags5vdpQdopaqk9aKefAUOWobgwHVaZvhI4tdkmHBr45uO0Hq9AxUlKDxfiYbM0qgN33+5SvGxU8gJBUihYFY4ydPWOWdVTf2lOxp0a9X6DFrjsUqwMXR9skLw8/lQkBmHVFBlFURPy+z/AMuYHga5mDch/",
"MillisBehindLatest": 0
}

The Data field is Base64-encoded. Decoded, it contains:

{
"devName": "kitchen_thermostat",
"version": "v.0.11"
}