Skip to content
Stand with Ukraine flag

MicroPython Client SDK

The ThingsBoard MicroPython Client SDK enables MicroPython devices to connect to ThingsBoard via MQTT. It supports telemetry upload, attribute management, server-side RPC, device claiming, and provisioning.

  • MQTT protocol connectivity
  • Telemetry data streaming
  • Attribute transmission and subscriptions
  • Server-side RPC handling
  • Client and shared attribute requests
  • Device claiming
  • Device provisioning
import mip
mip.install('github:thingsboard/thingsboard-micropython-client-sdk')

Check for existing installation before installing:

try:
from thingsboard_sdk.tb_device_mqtt import TBDeviceMqttClient
print("thingsboard-micropython-client-sdk package already installed.")
except ImportError:
print("Installing thingsboard-micropython-client-sdk package...")
mip.install('github:thingsboard/thingsboard-micropython-client-sdk')
from thingsboard_sdk.tb_device_mqtt import TBDeviceMqttClient

Full example — connect to Wi-Fi, send telemetry, and handle RPC requests:

import time
import network
from thingsboard_sdk.tb_device_mqtt import TBDeviceMqttClient
# Enabling WLAN interface
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Establishing connection to the Wi-Fi
if not wlan.isconnected():
print("Connecting to network...")
wlan.connect("WIFI_SSID", "WIFI_PASSWORD")
while not wlan.isconnected():
pass
print("Connected! Network config:", wlan.ifconfig())
# This callback will be called when an RPC request is received from ThingsBoard.
def on_server_side_rpc_request(request_id, request_body):
print("[RPC] id:", request_id, "body:", request_body)
client.send_rpc_reply(request_id, "ok")
# Initialising client to communicate with ThingsBoard
client = TBDeviceMqttClient("127.0.0.1", port=1883, access_token="ACCESS_TOKEN")
# Register the server-side RPC callback before the main loop
client.set_server_side_rpc_request_handler(on_server_side_rpc_request)
# Connect to ThingsBoard
client.connect()
def safe_check_msg():
"""Non-blocking MQTT poll."""
try:
client.check_for_msg()
return True
except OSError as e:
print("[MQTT] check_msg OSError:", e)
except Exception as e:
print("[MQTT] check_msg error:", e)
return False
# Main loop (non-blocking)
while True:
safe_check_msg()
client.send_telemetry({"CPU": 12.0})
time.sleep_ms(50)
# Default connecting
client.connect()
# Connecting with custom timeout
client.connect(timeout=20)
# Connecting with waiting for connection result
result = client.connect(timeout=20)
client.connect()
# some tasks with ThingsBoard
client.disconnect()
# Sending telemetry data in dictionary format
telemetry = {"temperature": 25.5, "humidity": 60}
client.send_telemetry(telemetry)
# Sending telemetry data grouped by timestamps
from time import time
telemetry = [{"ts": 1451649600000, "values": {"temperature": 42.2, "humidity": 71}},
{"ts": 1451649601000, "values": {"temperature": 42.3, "humidity": 72}}]
client.send_telemetry(telemetry)
attributes = {"sensorModel": "DHT-22", "attribute_2": "value"}
client.send_attributes(attributes)
def on_attributes_change(result, exception=None):
if exception is not None:
print("Exception: " + str(exception))
else:
print(result)
client.request_attributes(client_keys=["atr1", "atr2"], callback=on_attributes_change)
def callback(result, *args):
print("Received data: %r", result)
sub_id = client.subscribe_to_attribute("frequency", callback)

Subscribe to all attributes:

def callback(result, *args):
print("Received data: %r", result)
sub_id = client.subscribe_to_all_attributes(callback)

Unsubscribe using the returned subscription ID:

def callback(result, *args):
print("Received data: %r", result)
sub_id = client.subscribe_to_attribute("frequency", callback)
client.unsubscribe_from_attribute(sub_id)

Set a handler for server-side RPC requests:

def handler(request_id, request_body):
print("Received RPC request with ID: %s and body: %r", request_id, request_body)
client.send_rpc_reply(request_id, {"status": "success"})
client.set_server_side_rpc_request_handler(handler)
# Claiming a device with a claim code that will be valid indefinitely
client.claim_device("my_claim_code")
# Claiming a device with a claim code that will be valid for 60 seconds
client.claim_device("my_claim_code", duration_ms=60000)
# Forming provision request
provision_request = TBDeviceMqttClient.get_provision_request("my_provision_key", "my_provision_secret")
# With specified device name
provision_request = TBDeviceMqttClient.get_provision_request("my_provision_key", "my_provision_secret", device_name="My Device")
# With specified access token
provision_request = TBDeviceMqttClient.get_provision_request("my_provision_key", "my_provision_secret", access_token="my_access_token")
# For a gateway device
provision_request = TBDeviceMqttClient.get_provision_request("my_provision_key", "my_provision_secret", gateway=True)

Send the provisioning request:

provision_request = TBDeviceMqttClient.get_provision_request("my_provision_key", "my_provision_secret")
provisioned_credentials = client.provision("127.0.0.1", 1883, provision_request)

mip installation fails with OSError: -202 — ensure the device has an active internet connection before installing:

import network
import mip
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect("YOUR_WIFI_SSID", "YOUR_WIFI_PASSWORD")
while not wlan.isconnected():
pass
mip.install('github:thingsboard/thingsboard-micropython-client-sdk')

Additional examples are available in the thingsboard-micropython-client-sdk GitHub repository.