Skip to content
Stand with Ukraine flag

Serial connector example

This tutorial walks through building a complete custom connector for the IoT Gateway — the SerialConnector — which reads data from a serial port and forwards it to ThingsBoard. The same connector ships in the gateway’s built-in extensions folder, so you can use it as a reference or starting point for your own implementation.

What we’re building:

Serial device → bytes → SerialUplinkConverter → ConvertedData → ThingsBoard
ThingsBoard → RPC → SerialDownlinkConverter → bytes → Serial device

Sample device payload:

48\r2430947595\n
  • 48 — humidity value, terminated by \r
  • 2430947595 — device serial number, from byte offset 4 to end of message

Step 1. Create the connector configuration

Section titled “Step 1. Create the connector configuration”

Create custom_serial.json in the same folder as your tb_gateway.json:

Terminal window
touch custom_serial.json

Add the following configuration:

{
"name": "Custom serial connector",
"logLevel": "DEBUG",
"uplinkQueueSize": 100000,
"devices": [
{
"name": "SerialDevice1",
"type": "default",
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"converter": "SerialUplinkConverter",
"downlink_converter": "SerialDownlinkConverter",
"telemetry": [
{
"type": "float",
"key": "humidity",
"untilDelimiter": "\r"
}
],
"attributes": [
{
"key": "SerialNumber",
"type": "string",
"fromByte": 4,
"toByte": -1
}
],
"attributeUpdates": [
{
"attributeOnPlatform": "attr1",
"stringToDevice": "value = ${attr1}\n"
}
],
"serverSideRpc": [
{
"method": "setValue",
"type": "int",
"withResponse": true,
"responseType": "string",
"responseUntilDelimiter": "\r",
"responseTimeoutSec": 5
},
{
"method": "getValue",
"type": "string",
"withResponse": false
}
]
}
]
}

Top-level fields:

FieldDescription
nameConnector name — must match the "name" entry in tb_gateway.json.
logLevelLog verbosity: TRACE, DEBUG, INFO, WARNING, ERROR, CRITICAL.
uplinkQueueSizeMaximum number of uplink data items to buffer before dropping.
devicesArray of device configurations.

Device fields:

FieldDescription
nameDevice name on the ThingsBoard platform.
typeDevice profile name on the platform.
portSerial port path.
baudrateSerial port baud rate.
converterClass name of the uplink converter.
downlink_converterClass name of the downlink converter.
telemetryArray of telemetry datapoint configurations.
attributesArray of attribute datapoint configurations.
attributeUpdatesArray of attribute update configurations (platform → device).
serverSideRpcArray of RPC method configurations (platform → device).

Place the connector and converter files inside the extensions folder for your installation type:

InstallationExtensions folder path
Docker Compose (default volume)tb-gw-extensions
Daemon/var/lib/thingsboard_gateway/extensions
pip (system-wide)/usr/lib/python3/site-packages/thingsboard_gateway/extensions
pip (user)/usr/local/lib/python3/dist-packages/thingsboard-gateway/extensions

Create a subfolder named serial inside the extensions folder. All connector and converter files go there.


Create extensions/serial/serial_connector.py with the following content. The connector manages SerialDevice worker threads and routes data to ThingsBoard.

from queue import Queue
from threading import Event, Thread, Lock
from typing import List, TYPE_CHECKING
import serial.tools
import serial.tools.list_ports
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
from time import monotonic, sleep
try:
import serial
except ImportError:
print("pyserial library not found - installing...")
TBUtility.install_package("pyserial")
import serial
from thingsboard_gateway.connectors.connector import Connector
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_logger import init_logger
if TYPE_CHECKING:
from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
class SerialDevice(Thread):
"""Represents one device connected to a serial port."""
def __init__(self, device_config, uplink_converter, stop_event: Event, logger, uplink_queue):
super().__init__()
self.__log = logger
self.uplink_queue = uplink_queue
self.daemon = True
self.stopped = True
self.__connector_stopped = stop_event
self.config = device_config
self.name = self.config.get('deviceName', self.config.get('name', 'SerialDevice'))
self.type = self.config.get('deviceType', self.config.get('type', 'default'))
self.uplink_converter = uplink_converter
self.downlink_converter = None
self.delimiter = self.config.get('delimiter', '\n')
self.__rpc_in_progress = Event()
self.__previous_connect = 0
self.port = self.config.get('port', '/dev/ttyUSB0')
self.baudrate = self.config.get('baudrate', 9600)
self.timeout = self.config.get('timeout', 1)
self.bytesize = self.config.get('bytesize', serial.EIGHTBITS)
self.stopbits = self.config.get('stopbits', serial.STOPBITS_ONE)
self.parity = self.config.get('parity', serial.PARITY_NONE)
self.dsrdtr = self.config.get('dsrdtr', False)
self.rtscts = self.config.get('rtscts', False)
self.xonxoff = self.config.get('xonxoff', False)
self.write_timeout = self.config.get('writeTimeout', None)
self.inter_byte_timeout = self.config.get('interByteTimeout', None)
self.exclusive = self.config.get('exclusive', None)
self.__serial_lock = Lock()
self.__serial = None
def get_serial(self):
with self.__serial_lock:
if self.__serial is None or not self.__serial.is_open:
try:
self.__serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout,
bytesize=self.bytesize,
stopbits=self.stopbits,
parity=self.parity,
dsrdtr=self.dsrdtr,
rtscts=self.rtscts,
xonxoff=self.xonxoff,
write_timeout=self.write_timeout,
inter_byte_timeout=self.inter_byte_timeout,
exclusive=self.exclusive
)
self.__log.info("Connected to device %s", self.name)
except Exception as e:
self.__log.error("Failed to connect to device %s: %s", self.name, e)
self.__serial = None
return self.__serial
def run(self):
self.__log.info("Device %s started", self.name)
self.stopped = False
self.get_serial()
while not self.__connector_stopped.is_set() and not self.stopped:
try:
if not self.__rpc_in_progress.is_set():
data_from_device = self.__read_data_from_serial()
if data_from_device:
try:
converted_data = self.uplink_converter.convert(None, data_from_device)
self.uplink_queue.put(converted_data)
except Exception as e:
self.__log.error("Failed to convert data from device %s: %s", self.name, e)
except Exception as e:
self.__log.exception("Error in device %s: %s", self.name, e)
self.stop()
self.__log.info("Device %s stopped", self.name)
def handle_rpc_request(self, rpc_method, params):
result = {"success": True}
processed = False
for rpc_config in self.config.get("serverSideRpc", []):
if rpc_method == rpc_config.get("method"):
processed = True
self.__rpc_in_progress.set()
try:
if self.downlink_converter is not None:
converted_data = self.downlink_converter.convert(rpc_config, params)
if converted_data:
with_response = rpc_config.get("withResponse", False)
response_timeout = rpc_config.get("responseTimeoutSec", 5)
response = self.write(converted_data,
with_response=with_response,
response_timeout=response_timeout)
if with_response:
response_uplink_config = {}
if rpc_config.get("responseType"):
response_uplink_config["type"] = rpc_config.get("responseType")
if rpc_config.get("responseFromByte"):
response_uplink_config["fromByte"] = rpc_config.get("responseFromByte")
if rpc_config.get("responseToByte"):
response_uplink_config["toByte"] = rpc_config.get("responseToByte")
if rpc_config.get("responseUntilDelimiter"):
response_uplink_config["delimiter"] = rpc_config.get("responseUntilDelimiter")
if response_uplink_config and response:
result = self.uplink_converter.convert(response_uplink_config, response)
else:
result = {"error": "Cannot convert response with config: %r and response: %r" % (
response_uplink_config, response), "success": False}
else:
result = {"error": "No data to send", "success": False}
else:
result = {"error": "Downlink converter not defined", "success": False}
except Exception as e:
self.__log.error("Failed to process RPC method: %r, params: %r, config: %r — Error: %s",
rpc_method, params, rpc_config, e)
result = {"error": str(e), "success": False}
finally:
self.__rpc_in_progress.clear()
if not processed:
result = {"error": "Method not found", "success": False}
return result
def write(self, data, with_response=False, response_timeout=5):
try:
serial_conn = self.get_serial()
if serial_conn:
with self.__serial_lock:
serial_conn.write(data)
self.__log.debug("Written to device %s: %s", self.name, data)
if with_response:
return self.__read_data_from_serial(response_timeout)
except Exception as e:
self.__log.exception("Failed to write to device %s: %s", self.name, e)
return None
def __read_data_from_serial(self, timeout=1):
data_from_device = b''
serial_conn = None
try:
serial_conn = self.get_serial()
previous_timeout = serial_conn.timeout
if serial_conn:
while not data_from_device.endswith(self.delimiter.encode('utf-8')):
serial_conn.timeout = timeout
chunk = serial_conn.read(1)
if chunk:
data_from_device += chunk
if self.__connector_stopped.is_set() or not chunk or self.stopped:
break
except Exception as e:
self.__log.exception("Failed to read from device %s: %s", self.name, e)
finally:
if serial_conn:
serial_conn.timeout = previous_timeout
return data_from_device
def stop(self):
self.stopped = True
with self.__serial_lock:
if self.__serial:
self.__serial.close()
self.__serial = None
def is_connected_reconnect_if_needed(self):
if self.__serial is None or not self.__serial.isOpen():
if monotonic() - self.__previous_connect > 1:
self.__previous_connect = monotonic()
self.__log.info("Reconnecting to device %s", self.name)
self.get_serial()
return self.__serial is None or not self.__serial.isOpen()
else:
return True
class SerialConnector(Thread, Connector):
"""Manages serial port devices and routes their data to ThingsBoard."""
def __init__(self, gateway: 'TBGatewayService', config, connector_type):
super().__init__()
self._connector_type = connector_type
self.__config = config
self.__id = self.__config["id"]
self.__gateway = gateway
self.name = self.__config["name"]
self.__connected = False
self.__uplink_queue = Queue(self.__config.get('uplinkQueueSize', 100000))
self._log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'),
enable_remote_logging=self.__config.get('enableRemoteLogging', False),
is_connector_logger=True)
self._converter_log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'),
enable_remote_logging=self.__config.get('enableRemoteLogging', False),
is_converter_logger=True)
self._log.info("Starting %s connector", self.get_name())
self.daemon = True
self.stopped = Event()
self.stopped.set()
self.__devices: List[SerialDevice] = []
self._log.info('Connector %s initialization success.', self.get_name())
def __load_devices(self):
devices_config = self.__config.get('devices')
try:
if devices_config is not None:
for device_config in devices_config:
device = None
uplink_converter_class_name = device_config.get('converter', device_config.get('uplink_converter'))
if uplink_converter_class_name is not None:
converter_class = TBModuleLoader.import_module(self._connector_type,
uplink_converter_class_name)
uplink_converter = converter_class(device_config, self._log)
device = SerialDevice(device_config, uplink_converter, self.stopped,
self._log, self.__uplink_queue)
else:
self._log.error('Uplink converter not found for connector %s.', self.get_name())
if device_config.get('downlink_converter') is not None:
downlink_converter_class = TBModuleLoader.import_module(
self._connector_type, device_config.get('downlink_converter'))
if device is not None:
device.downlink_converter = downlink_converter_class(device_config, self._converter_log)
if device is not None:
self.__devices.append(device)
else:
self._log.error('Section "devices" not found in config. Stopping connector %s.', self.get_name())
self.close()
except Exception as e:
self._log.error('Failed to load devices: %s', e)
def __start_devices(self):
failed = len(self.__devices)
for device in self.__devices:
try:
device.start()
failed -= 1
except Exception as e:
self._log.exception("Failed to start device %s: %s", device.name, e)
self.__connected = failed == 0
def open(self):
self.stopped.clear()
self.start()
def get_name(self):
return self.name
def get_type(self):
return self._connector_type
def is_connected(self):
return self.__connected
def is_stopped(self):
return self.stopped.is_set()
def get_config(self):
return self.__config
def get_id(self):
return self.__id
def run(self):
try:
self.__load_devices()
self.__start_devices()
self._log.info("Devices: %s", '\n'.join(d.name for d in self.__devices))
while not self.stopped.is_set():
try:
connected = len(self.__devices)
for device in self.__devices:
if not device.stopped and not device.is_connected_reconnect_if_needed():
connected -= 1
self._log.error("Device %s disconnected — reconnecting", device.name)
device.stop()
device.join()
device = SerialDevice(device.config, device.uplink_converter, self.stopped,
self._log, self.__uplink_queue)
device.start()
self.__connected = connected == len(self.__devices)
if not self.__uplink_queue.empty():
data = self.__uplink_queue.get()
self.__gateway.send_to_storage(self.name, self.__id, data)
else:
sleep(0.05)
except Exception as e:
self._log.error("Error processing data: %s", e)
except Exception as e:
self._log.error("Connector error: %s", e)
def close(self):
self.stopped.set()
for device in self.__devices:
self.__gateway.del_device(device.name)
device.stop()
self._log.stop()
def on_attributes_update(self, content):
self._log.debug("Attribute update: %s", content)
device_name = content.get("device")
if device_name is None:
self._log.error("Device name missing in attribute update: %s", content)
return
for device in self.__devices:
if device_name == device.name:
request_config = device.config.get("attributeUpdates")
if request_config is None:
self._log.error("No attributeUpdates config for device %s", device_name)
return
found = False
for attr_config in request_config:
attribute = attr_config.get("attributeOnPlatform")
if attribute and attribute in content["data"]:
found = True
try:
value = content["data"][attribute]
str_to_send = str(
attr_config["stringToDevice"]
.replace("${" + attribute + "}", str(value))
.replace("${deviceName}", device_name)
.replace("${deviceType}", device.type)
).encode("UTF-8")
device.write(str_to_send)
except Exception as e:
self._log.error("Failed to send attribute update to %s: %s", device_name, e)
if not found:
self._log.error("Attribute update config not found for key %s on device %s",
list(content['data'].keys())[0], device_name)
def server_side_rpc_handler(self, content):
self._log.debug("RPC request: %s", content)
device_name = content.get("device")
rpc_data = content.get("data", {})
rpc_method = rpc_data.get("method")
req_id = rpc_data.get("id")
params = rpc_data.get("params")
if device_name is None:
self._log.error("Device name missing in RPC request: %s", content)
return
for device in self.__devices:
if device_name == device.name:
result = device.handle_rpc_request(rpc_method, params)
if "error" in result:
self._log.error("RPC error for device %s: %s", device_name, result["error"])
if result is not None:
self.__gateway.send_rpc_reply(device=device_name,
req_id=req_id,
content=result,
wait_for_publish=True,
quality_of_service=1)

Create extensions/serial/uplink_serial_converter.py. The uplink converter parses raw bytes from the device and produces a ConvertedData object.

from typing import Any, Tuple
from simplejson import loads
from thingsboard_gateway.connectors.converter import Converter
from thingsboard_gateway.gateway.constants import REPORT_STRATEGY_PARAMETER, TELEMETRY_PARAMETER, TIMESERIES_PARAMETER
from thingsboard_gateway.gateway.entities.converted_data import ConvertedData
from thingsboard_gateway.gateway.entities.datapoint_key import DatapointKey
from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig
from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
class SerialUplinkConverter(Converter):
"""
Converts incoming serial bytes to the ConvertedData format expected by ThingsBoard.
One converter instance is created per configured device.
"""
def __init__(self, config, logger):
self._log = logger
self.__config = config
self.__device_report_strategy = None
self.__device_name = self.__config.get('deviceName', self.__config.get('name', 'SerialDevice'))
self.__device_type = self.__config.get('deviceType', self.__config.get('type', 'default'))
try:
self.__device_report_strategy = ReportStrategyConfig(self.__config.get(REPORT_STRATEGY_PARAMETER))
except ValueError as e:
self._log.trace("No report strategy config for device %s: %s", self.__device_name, e)
def convert(self, config, data: bytes):
"""
Converts device bytes to ConvertedData.
When config is not None (RPC response), returns a dict with the converted result.
"""
self._log.debug("Data to convert: %s", data)
if config is not None:
return {"result": self.__convert_value_to_type(data, config)}
converted_data = ConvertedData(self.__device_name, self.__device_type)
for dp_config in self.__config.get(TIMESERIES_PARAMETER, self.__config.get(TELEMETRY_PARAMETER, [])):
try:
entry = self.__convert_telemetry_datapoint(data, dp_config)
if entry:
converted_data.add_to_telemetry(entry)
except Exception as e:
self._log.error("Error converting telemetry: %s", e)
for dp_config in self.__config.get('attributes', []):
try:
attr = self.__convert_attributes_datapoint(data, dp_config)
if attr:
converted_data.add_to_attributes(*attr)
except Exception as e:
self._log.error("Error converting attribute: %s", e)
self._log.debug("Converted data: %s", converted_data)
return converted_data
def __convert_telemetry_datapoint(self, data, dp_config) -> TelemetryEntry:
key = dp_config.get('key')
datapoint_key = TBUtility.convert_key_to_datapoint_key(
key, self.__device_report_strategy, dp_config, self._log)
value = self.__convert_value_to_type(data, dp_config)
if not datapoint_key or not value:
self._log.trace("Datapoint %s not found in: %s", key, data.hex())
return None
return TelemetryEntry({datapoint_key: value})
def __convert_attributes_datapoint(self, data, dp_config) -> Tuple[DatapointKey, Any]:
key = dp_config.get('key')
datapoint_key = TBUtility.convert_key_to_datapoint_key(
key, self.__device_report_strategy, dp_config, self._log)
value = self.__convert_value_to_type(data, dp_config)
if not datapoint_key or not value:
self._log.trace("Datapoint %s not found in: %s", key, data.hex())
return None
return (datapoint_key, value)
@staticmethod
def __convert_value_to_type(data, dp_config):
type_ = dp_config.get('type')
d = data
if dp_config.get("untilDelimiter") or dp_config.get("fromDelimiter"):
from_delim = dp_config.get("fromDelimiter")
until_delim = dp_config.get("untilDelimiter")
from_pos = d.find(from_delim.encode('UTF-8')) if from_delim else 0
until_pos = d.find(until_delim.encode('UTF-8')) if until_delim else -1
if from_pos != -1 and until_pos != -1 and from_pos < until_pos:
d = d[from_pos:until_pos]
elif from_pos != -1 and from_pos < len(d):
d = d[from_pos:]
elif until_pos != -1 and until_pos < len(d):
d = d[:until_pos]
elif dp_config.get("fromByte") or dp_config.get("toByte"):
from_byte = dp_config.get("fromByte")
to_byte = dp_config.get("toByte")
if from_byte and to_byte and from_byte < to_byte and len(d) > to_byte:
d = d[to_byte:from_byte]
else:
if from_byte and len(d) > from_byte:
d = d[from_byte:]
if to_byte and (len(d) > to_byte or to_byte == -1):
d = d[:to_byte]
if type_ == 'string':
return d.decode('UTF-8').strip()
elif type_ == 'json':
return loads(d.decode('UTF-8'))
elif type_ == 'int':
return int(d)
elif type_ in ('float', 'double'):
return float(d)
elif type_ == 'bool':
try:
return bool(int(d))
except ValueError:
return d.decode('UTF-8').strip().lower() == 'true'
else:
return d.hex()

After processing 48\r2430947595\n, the converter produces:

Device name: "SerialDevice1"
Device type: "default"
Telemetry: [{"humidity": 48.0}]
Attributes: {"SerialNumber": "2430947595"}

Create extensions/serial/downlink_serial_converter.py. The downlink converter turns ThingsBoard RPC payloads into raw bytes to send to the device.

from math import ceil
from struct import pack, unpack
from thingsboard_gateway.connectors.converter import Converter
class SerialDownlinkConverter(Converter):
"""
Converts RPC or attribute update payloads into bytes for the serial port.
One converter instance is created per configured device.
"""
def __init__(self, config, logger):
self._log = logger
self.__config = config
def convert(self, config, data) -> bytes:
"""Returns bytes to write to the serial port."""
self._log.debug("Data to convert: %s", data)
if data is None:
return None
byteorder = self.__config.get('byteorder', 'big').lower()
type_ = config.get("type")
if type_ == "int":
length = ceil(data.bit_length() / 8)
return data.to_bytes(length, byteorder=byteorder)
elif type_ in ("float", "double"):
fmt_single = ('>' if byteorder == 'big' else '<') + 'f'
single_bytes = pack(fmt_single, data)
if unpack(fmt_single, single_bytes)[0] == data:
return single_bytes
fmt_double = ('>' if byteorder == 'big' else '<') + 'd'
return pack(fmt_double, data)
return data.encode("UTF-8")

Step 6. Register the connector in tb_gateway.json

Section titled “Step 6. Register the connector in tb_gateway.json”

Add the following entry to the "connectors" array in tb_gateway.json:

{
"name": "Serial Connector",
"type": "serial",
"configuration": "custom_serial.json",
"class": "SerialConnector"
}
FieldDescription
nameConnector name — must match the "name" in custom_serial.json.
typeExtensions subfolder name (serial).
configurationPath to the connector config file, relative to the gateway config folder.
classConnector class name inside the connector file.

Terminal window
sudo systemctl restart thingsboard-gateway

Default log locations:

InstallationLog folder
Docker Composetb-gw-logs volume
Daemon/var/log/thingsboard-gateway/
Python module (pip)./logs/

Connect the serial device, then open Devices in the ThingsBoard UI. You should see a device named SerialDevice1. Open it and go to the Latest telemetry tab — the humidity key should appear with the value parsed from the serial stream.


The SerialConnector class implements all required methods from the Connector interface. Key methods used in this example:

MethodRole in this connector
__init__Initialises the uplink queue, logger, and device list from config.
openStarts the connector thread.
runMain loop: loads devices, starts threads, drains the uplink queue, handles reconnects.
closeStops all device threads and the logger.
on_attributes_updateFormats the attribute value as a UTF-8 string and writes it to the device’s serial port.
server_side_rpc_handlerDelegates RPC handling to SerialDevice.handle_rpc_request and sends the reply.

The SerialUplinkConverter and SerialDownlinkConverter classes implement the Converter interface.

Classconvert(config, data)
SerialUplinkConverterconfig=None for telemetry; config dict for RPC responses. data is the raw bytes from the port. Returns ConvertedData.
SerialDownlinkConverterconfig is the RPC config section. data is the value from ThingsBoard. Returns bytes.