Serial connector example
This tutorial walks through building a complete custom connector for the IoT Gateway — the SerialConnector — which reads data from a serial port and forwards it to ThingsBoard. The same connector ships in the gateway’s built-in extensions folder, so you can use it as a reference or starting point for your own implementation.
What we’re building:
Serial device → bytes → SerialUplinkConverter → ConvertedData → ThingsBoard ThingsBoard → RPC → SerialDownlinkConverter → bytes → Serial deviceSample device payload:
48\r2430947595\n48— humidity value, terminated by\r2430947595— device serial number, from byte offset 4 to end of message
Step 1. Create the connector configuration
Section titled “Step 1. Create the connector configuration”Create custom_serial.json in the same folder as your tb_gateway.json:
touch custom_serial.jsonAdd the following configuration:
{ "name": "Custom serial connector", "logLevel": "DEBUG", "uplinkQueueSize": 100000, "devices": [ { "name": "SerialDevice1", "type": "default", "port": "/dev/ttyUSB0", "baudrate": 9600, "converter": "SerialUplinkConverter", "downlink_converter": "SerialDownlinkConverter", "telemetry": [ { "type": "float", "key": "humidity", "untilDelimiter": "\r" } ], "attributes": [ { "key": "SerialNumber", "type": "string", "fromByte": 4, "toByte": -1 } ], "attributeUpdates": [ { "attributeOnPlatform": "attr1", "stringToDevice": "value = ${attr1}\n" } ], "serverSideRpc": [ { "method": "setValue", "type": "int", "withResponse": true, "responseType": "string", "responseUntilDelimiter": "\r", "responseTimeoutSec": 5 }, { "method": "getValue", "type": "string", "withResponse": false } ] } ]}Top-level fields:
| Field | Description |
|---|---|
| name | Connector name — must match the "name" entry in tb_gateway.json. |
| logLevel | Log verbosity: TRACE, DEBUG, INFO, WARNING, ERROR, CRITICAL. |
| uplinkQueueSize | Maximum number of uplink data items to buffer before dropping. |
| devices | Array of device configurations. |
Device fields:
| Field | Description |
|---|---|
| name | Device name on the ThingsBoard platform. |
| type | Device profile name on the platform. |
| port | Serial port path. |
| baudrate | Serial port baud rate. |
| converter | Class name of the uplink converter. |
| downlink_converter | Class name of the downlink converter. |
| telemetry | Array of telemetry datapoint configurations. |
| attributes | Array of attribute datapoint configurations. |
| attributeUpdates | Array of attribute update configurations (platform → device). |
| serverSideRpc | Array of RPC method configurations (platform → device). |
Step 2. Locate the extensions folder
Section titled “Step 2. Locate the extensions folder”Place the connector and converter files inside the extensions folder for your installation type:
| Installation | Extensions folder path |
|---|---|
| Docker Compose (default volume) | tb-gw-extensions |
| Daemon | /var/lib/thingsboard_gateway/extensions |
| pip (system-wide) | /usr/lib/python3/site-packages/thingsboard_gateway/extensions |
| pip (user) | /usr/local/lib/python3/dist-packages/thingsboard-gateway/extensions |
Create a subfolder named serial inside the extensions folder. All connector and converter
files go there.
Step 3. Implement SerialConnector
Section titled “Step 3. Implement SerialConnector”Create extensions/serial/serial_connector.py with the following content.
The connector manages SerialDevice worker threads and routes data to ThingsBoard.
from queue import Queuefrom threading import Event, Thread, Lockfrom typing import List, TYPE_CHECKING
import serial.toolsimport serial.tools.list_portsfrom thingsboard_gateway.tb_utility.tb_utility import TBUtilityfrom time import monotonic, sleep
try: import serialexcept ImportError: print("pyserial library not found - installing...") TBUtility.install_package("pyserial") import serial
from thingsboard_gateway.connectors.connector import Connectorfrom thingsboard_gateway.tb_utility.tb_loader import TBModuleLoaderfrom thingsboard_gateway.tb_utility.tb_logger import init_logger
if TYPE_CHECKING: from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
class SerialDevice(Thread): """Represents one device connected to a serial port."""
def __init__(self, device_config, uplink_converter, stop_event: Event, logger, uplink_queue): super().__init__() self.__log = logger self.uplink_queue = uplink_queue self.daemon = True self.stopped = True self.__connector_stopped = stop_event self.config = device_config self.name = self.config.get('deviceName', self.config.get('name', 'SerialDevice')) self.type = self.config.get('deviceType', self.config.get('type', 'default')) self.uplink_converter = uplink_converter self.downlink_converter = None self.delimiter = self.config.get('delimiter', '\n') self.__rpc_in_progress = Event() self.__previous_connect = 0
self.port = self.config.get('port', '/dev/ttyUSB0') self.baudrate = self.config.get('baudrate', 9600) self.timeout = self.config.get('timeout', 1) self.bytesize = self.config.get('bytesize', serial.EIGHTBITS) self.stopbits = self.config.get('stopbits', serial.STOPBITS_ONE) self.parity = self.config.get('parity', serial.PARITY_NONE) self.dsrdtr = self.config.get('dsrdtr', False) self.rtscts = self.config.get('rtscts', False) self.xonxoff = self.config.get('xonxoff', False) self.write_timeout = self.config.get('writeTimeout', None) self.inter_byte_timeout = self.config.get('interByteTimeout', None) self.exclusive = self.config.get('exclusive', None) self.__serial_lock = Lock() self.__serial = None
def get_serial(self): with self.__serial_lock: if self.__serial is None or not self.__serial.is_open: try: self.__serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout, bytesize=self.bytesize, stopbits=self.stopbits, parity=self.parity, dsrdtr=self.dsrdtr, rtscts=self.rtscts, xonxoff=self.xonxoff, write_timeout=self.write_timeout, inter_byte_timeout=self.inter_byte_timeout, exclusive=self.exclusive ) self.__log.info("Connected to device %s", self.name) except Exception as e: self.__log.error("Failed to connect to device %s: %s", self.name, e) self.__serial = None return self.__serial
def run(self): self.__log.info("Device %s started", self.name) self.stopped = False self.get_serial() while not self.__connector_stopped.is_set() and not self.stopped: try: if not self.__rpc_in_progress.is_set(): data_from_device = self.__read_data_from_serial() if data_from_device: try: converted_data = self.uplink_converter.convert(None, data_from_device) self.uplink_queue.put(converted_data) except Exception as e: self.__log.error("Failed to convert data from device %s: %s", self.name, e) except Exception as e: self.__log.exception("Error in device %s: %s", self.name, e) self.stop() self.__log.info("Device %s stopped", self.name)
def handle_rpc_request(self, rpc_method, params): result = {"success": True} processed = False for rpc_config in self.config.get("serverSideRpc", []): if rpc_method == rpc_config.get("method"): processed = True self.__rpc_in_progress.set() try: if self.downlink_converter is not None: converted_data = self.downlink_converter.convert(rpc_config, params) if converted_data: with_response = rpc_config.get("withResponse", False) response_timeout = rpc_config.get("responseTimeoutSec", 5) response = self.write(converted_data, with_response=with_response, response_timeout=response_timeout) if with_response: response_uplink_config = {} if rpc_config.get("responseType"): response_uplink_config["type"] = rpc_config.get("responseType") if rpc_config.get("responseFromByte"): response_uplink_config["fromByte"] = rpc_config.get("responseFromByte") if rpc_config.get("responseToByte"): response_uplink_config["toByte"] = rpc_config.get("responseToByte") if rpc_config.get("responseUntilDelimiter"): response_uplink_config["delimiter"] = rpc_config.get("responseUntilDelimiter") if response_uplink_config and response: result = self.uplink_converter.convert(response_uplink_config, response) else: result = {"error": "Cannot convert response with config: %r and response: %r" % ( response_uplink_config, response), "success": False} else: result = {"error": "No data to send", "success": False} else: result = {"error": "Downlink converter not defined", "success": False} except Exception as e: self.__log.error("Failed to process RPC method: %r, params: %r, config: %r — Error: %s", rpc_method, params, rpc_config, e) result = {"error": str(e), "success": False} finally: self.__rpc_in_progress.clear() if not processed: result = {"error": "Method not found", "success": False} return result
def write(self, data, with_response=False, response_timeout=5): try: serial_conn = self.get_serial() if serial_conn: with self.__serial_lock: serial_conn.write(data) self.__log.debug("Written to device %s: %s", self.name, data) if with_response: return self.__read_data_from_serial(response_timeout) except Exception as e: self.__log.exception("Failed to write to device %s: %s", self.name, e) return None
def __read_data_from_serial(self, timeout=1): data_from_device = b'' serial_conn = None try: serial_conn = self.get_serial() previous_timeout = serial_conn.timeout if serial_conn: while not data_from_device.endswith(self.delimiter.encode('utf-8')): serial_conn.timeout = timeout chunk = serial_conn.read(1) if chunk: data_from_device += chunk if self.__connector_stopped.is_set() or not chunk or self.stopped: break except Exception as e: self.__log.exception("Failed to read from device %s: %s", self.name, e) finally: if serial_conn: serial_conn.timeout = previous_timeout return data_from_device
def stop(self): self.stopped = True with self.__serial_lock: if self.__serial: self.__serial.close() self.__serial = None
def is_connected_reconnect_if_needed(self): if self.__serial is None or not self.__serial.isOpen(): if monotonic() - self.__previous_connect > 1: self.__previous_connect = monotonic() self.__log.info("Reconnecting to device %s", self.name) self.get_serial() return self.__serial is None or not self.__serial.isOpen() else: return True
class SerialConnector(Thread, Connector): """Manages serial port devices and routes their data to ThingsBoard."""
def __init__(self, gateway: 'TBGatewayService', config, connector_type): super().__init__() self._connector_type = connector_type self.__config = config self.__id = self.__config["id"] self.__gateway = gateway self.name = self.__config["name"] self.__connected = False self.__uplink_queue = Queue(self.__config.get('uplinkQueueSize', 100000)) self._log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'), enable_remote_logging=self.__config.get('enableRemoteLogging', False), is_connector_logger=True) self._converter_log = init_logger(self.__gateway, self.name, level=self.__config.get('logLevel'), enable_remote_logging=self.__config.get('enableRemoteLogging', False), is_converter_logger=True) self._log.info("Starting %s connector", self.get_name()) self.daemon = True self.stopped = Event() self.stopped.set() self.__devices: List[SerialDevice] = [] self._log.info('Connector %s initialization success.', self.get_name())
def __load_devices(self): devices_config = self.__config.get('devices') try: if devices_config is not None: for device_config in devices_config: device = None uplink_converter_class_name = device_config.get('converter', device_config.get('uplink_converter')) if uplink_converter_class_name is not None: converter_class = TBModuleLoader.import_module(self._connector_type, uplink_converter_class_name) uplink_converter = converter_class(device_config, self._log) device = SerialDevice(device_config, uplink_converter, self.stopped, self._log, self.__uplink_queue) else: self._log.error('Uplink converter not found for connector %s.', self.get_name()) if device_config.get('downlink_converter') is not None: downlink_converter_class = TBModuleLoader.import_module( self._connector_type, device_config.get('downlink_converter')) if device is not None: device.downlink_converter = downlink_converter_class(device_config, self._converter_log) if device is not None: self.__devices.append(device) else: self._log.error('Section "devices" not found in config. Stopping connector %s.', self.get_name()) self.close() except Exception as e: self._log.error('Failed to load devices: %s', e)
def __start_devices(self): failed = len(self.__devices) for device in self.__devices: try: device.start() failed -= 1 except Exception as e: self._log.exception("Failed to start device %s: %s", device.name, e) self.__connected = failed == 0
def open(self): self.stopped.clear() self.start()
def get_name(self): return self.name
def get_type(self): return self._connector_type
def is_connected(self): return self.__connected
def is_stopped(self): return self.stopped.is_set()
def get_config(self): return self.__config
def get_id(self): return self.__id
def run(self): try: self.__load_devices() self.__start_devices() self._log.info("Devices: %s", '\n'.join(d.name for d in self.__devices)) while not self.stopped.is_set(): try: connected = len(self.__devices) for device in self.__devices: if not device.stopped and not device.is_connected_reconnect_if_needed(): connected -= 1 self._log.error("Device %s disconnected — reconnecting", device.name) device.stop() device.join() device = SerialDevice(device.config, device.uplink_converter, self.stopped, self._log, self.__uplink_queue) device.start() self.__connected = connected == len(self.__devices) if not self.__uplink_queue.empty(): data = self.__uplink_queue.get() self.__gateway.send_to_storage(self.name, self.__id, data) else: sleep(0.05) except Exception as e: self._log.error("Error processing data: %s", e) except Exception as e: self._log.error("Connector error: %s", e)
def close(self): self.stopped.set() for device in self.__devices: self.__gateway.del_device(device.name) device.stop() self._log.stop()
def on_attributes_update(self, content): self._log.debug("Attribute update: %s", content) device_name = content.get("device") if device_name is None: self._log.error("Device name missing in attribute update: %s", content) return for device in self.__devices: if device_name == device.name: request_config = device.config.get("attributeUpdates") if request_config is None: self._log.error("No attributeUpdates config for device %s", device_name) return found = False for attr_config in request_config: attribute = attr_config.get("attributeOnPlatform") if attribute and attribute in content["data"]: found = True try: value = content["data"][attribute] str_to_send = str( attr_config["stringToDevice"] .replace("${" + attribute + "}", str(value)) .replace("${deviceName}", device_name) .replace("${deviceType}", device.type) ).encode("UTF-8") device.write(str_to_send) except Exception as e: self._log.error("Failed to send attribute update to %s: %s", device_name, e) if not found: self._log.error("Attribute update config not found for key %s on device %s", list(content['data'].keys())[0], device_name)
def server_side_rpc_handler(self, content): self._log.debug("RPC request: %s", content) device_name = content.get("device") rpc_data = content.get("data", {}) rpc_method = rpc_data.get("method") req_id = rpc_data.get("id") params = rpc_data.get("params") if device_name is None: self._log.error("Device name missing in RPC request: %s", content) return for device in self.__devices: if device_name == device.name: result = device.handle_rpc_request(rpc_method, params) if "error" in result: self._log.error("RPC error for device %s: %s", device_name, result["error"]) if result is not None: self.__gateway.send_rpc_reply(device=device_name, req_id=req_id, content=result, wait_for_publish=True, quality_of_service=1)Step 4. Implement SerialUplinkConverter
Section titled “Step 4. Implement SerialUplinkConverter”Create extensions/serial/uplink_serial_converter.py. The uplink converter parses raw bytes
from the device and produces a ConvertedData
object.
from typing import Any, Tuplefrom simplejson import loads
from thingsboard_gateway.connectors.converter import Converterfrom thingsboard_gateway.gateway.constants import REPORT_STRATEGY_PARAMETER, TELEMETRY_PARAMETER, TIMESERIES_PARAMETERfrom thingsboard_gateway.gateway.entities.converted_data import ConvertedDatafrom thingsboard_gateway.gateway.entities.datapoint_key import DatapointKeyfrom thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfigfrom thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntryfrom thingsboard_gateway.tb_utility.tb_utility import TBUtility
class SerialUplinkConverter(Converter): """ Converts incoming serial bytes to the ConvertedData format expected by ThingsBoard. One converter instance is created per configured device. """
def __init__(self, config, logger): self._log = logger self.__config = config self.__device_report_strategy = None self.__device_name = self.__config.get('deviceName', self.__config.get('name', 'SerialDevice')) self.__device_type = self.__config.get('deviceType', self.__config.get('type', 'default')) try: self.__device_report_strategy = ReportStrategyConfig(self.__config.get(REPORT_STRATEGY_PARAMETER)) except ValueError as e: self._log.trace("No report strategy config for device %s: %s", self.__device_name, e)
def convert(self, config, data: bytes): """ Converts device bytes to ConvertedData. When config is not None (RPC response), returns a dict with the converted result. """ self._log.debug("Data to convert: %s", data) if config is not None: return {"result": self.__convert_value_to_type(data, config)}
converted_data = ConvertedData(self.__device_name, self.__device_type) for dp_config in self.__config.get(TIMESERIES_PARAMETER, self.__config.get(TELEMETRY_PARAMETER, [])): try: entry = self.__convert_telemetry_datapoint(data, dp_config) if entry: converted_data.add_to_telemetry(entry) except Exception as e: self._log.error("Error converting telemetry: %s", e) for dp_config in self.__config.get('attributes', []): try: attr = self.__convert_attributes_datapoint(data, dp_config) if attr: converted_data.add_to_attributes(*attr) except Exception as e: self._log.error("Error converting attribute: %s", e) self._log.debug("Converted data: %s", converted_data) return converted_data
def __convert_telemetry_datapoint(self, data, dp_config) -> TelemetryEntry: key = dp_config.get('key') datapoint_key = TBUtility.convert_key_to_datapoint_key( key, self.__device_report_strategy, dp_config, self._log) value = self.__convert_value_to_type(data, dp_config) if not datapoint_key or not value: self._log.trace("Datapoint %s not found in: %s", key, data.hex()) return None return TelemetryEntry({datapoint_key: value})
def __convert_attributes_datapoint(self, data, dp_config) -> Tuple[DatapointKey, Any]: key = dp_config.get('key') datapoint_key = TBUtility.convert_key_to_datapoint_key( key, self.__device_report_strategy, dp_config, self._log) value = self.__convert_value_to_type(data, dp_config) if not datapoint_key or not value: self._log.trace("Datapoint %s not found in: %s", key, data.hex()) return None return (datapoint_key, value)
@staticmethod def __convert_value_to_type(data, dp_config): type_ = dp_config.get('type') d = data
if dp_config.get("untilDelimiter") or dp_config.get("fromDelimiter"): from_delim = dp_config.get("fromDelimiter") until_delim = dp_config.get("untilDelimiter") from_pos = d.find(from_delim.encode('UTF-8')) if from_delim else 0 until_pos = d.find(until_delim.encode('UTF-8')) if until_delim else -1 if from_pos != -1 and until_pos != -1 and from_pos < until_pos: d = d[from_pos:until_pos] elif from_pos != -1 and from_pos < len(d): d = d[from_pos:] elif until_pos != -1 and until_pos < len(d): d = d[:until_pos] elif dp_config.get("fromByte") or dp_config.get("toByte"): from_byte = dp_config.get("fromByte") to_byte = dp_config.get("toByte") if from_byte and to_byte and from_byte < to_byte and len(d) > to_byte: d = d[to_byte:from_byte] else: if from_byte and len(d) > from_byte: d = d[from_byte:] if to_byte and (len(d) > to_byte or to_byte == -1): d = d[:to_byte]
if type_ == 'string': return d.decode('UTF-8').strip() elif type_ == 'json': return loads(d.decode('UTF-8')) elif type_ == 'int': return int(d) elif type_ in ('float', 'double'): return float(d) elif type_ == 'bool': try: return bool(int(d)) except ValueError: return d.decode('UTF-8').strip().lower() == 'true' else: return d.hex()After processing 48\r2430947595\n, the converter produces:
Device name: "SerialDevice1"Device type: "default"Telemetry: [{"humidity": 48.0}]Attributes: {"SerialNumber": "2430947595"}Step 5. Implement SerialDownlinkConverter
Section titled “Step 5. Implement SerialDownlinkConverter”Create extensions/serial/downlink_serial_converter.py. The downlink converter turns
ThingsBoard RPC payloads into raw bytes to send to the device.
from math import ceilfrom struct import pack, unpack
from thingsboard_gateway.connectors.converter import Converter
class SerialDownlinkConverter(Converter): """ Converts RPC or attribute update payloads into bytes for the serial port. One converter instance is created per configured device. """
def __init__(self, config, logger): self._log = logger self.__config = config
def convert(self, config, data) -> bytes: """Returns bytes to write to the serial port.""" self._log.debug("Data to convert: %s", data) if data is None: return None byteorder = self.__config.get('byteorder', 'big').lower() type_ = config.get("type")
if type_ == "int": length = ceil(data.bit_length() / 8) return data.to_bytes(length, byteorder=byteorder) elif type_ in ("float", "double"): fmt_single = ('>' if byteorder == 'big' else '<') + 'f' single_bytes = pack(fmt_single, data) if unpack(fmt_single, single_bytes)[0] == data: return single_bytes fmt_double = ('>' if byteorder == 'big' else '<') + 'd' return pack(fmt_double, data)
return data.encode("UTF-8")Step 6. Register the connector in tb_gateway.json
Section titled “Step 6. Register the connector in tb_gateway.json”Add the following entry to the "connectors" array in tb_gateway.json:
{ "name": "Serial Connector", "type": "serial", "configuration": "custom_serial.json", "class": "SerialConnector"}| Field | Description |
|---|---|
| name | Connector name — must match the "name" in custom_serial.json. |
| type | Extensions subfolder name (serial). |
| configuration | Path to the connector config file, relative to the gateway config folder. |
| class | Connector class name inside the connector file. |
Step 7. Restart the gateway
Section titled “Step 7. Restart the gateway”sudo systemctl restart thingsboard-gatewaysudo python3 -c 'from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService; TBGatewayService("./tb_gateway.json")'Default log locations:
| Installation | Log folder |
|---|---|
| Docker Compose | tb-gw-logs volume |
| Daemon | /var/log/thingsboard-gateway/ |
| Python module (pip) | ./logs/ |
Step 8. Verify the result
Section titled “Step 8. Verify the result”Connect the serial device, then open Devices in the ThingsBoard UI.
You should see a device named SerialDevice1. Open it and go to the Latest telemetry tab —
the humidity key should appear with the value parsed from the serial stream.
Connector method reference
Section titled “Connector method reference”The SerialConnector class implements all required methods from the
Connector interface.
Key methods used in this example:
| Method | Role in this connector |
|---|---|
__init__ | Initialises the uplink queue, logger, and device list from config. |
open | Starts the connector thread. |
run | Main loop: loads devices, starts threads, drains the uplink queue, handles reconnects. |
close | Stops all device threads and the logger. |
on_attributes_update | Formats the attribute value as a UTF-8 string and writes it to the device’s serial port. |
server_side_rpc_handler | Delegates RPC handling to SerialDevice.handle_rpc_request and sends the reply. |
Converter method reference
Section titled “Converter method reference”The SerialUplinkConverter and SerialDownlinkConverter classes implement the
Converter interface.
| Class | convert(config, data) |
|---|---|
SerialUplinkConverter | config=None for telemetry; config dict for RPC responses. data is the raw bytes from the port. Returns ConvertedData. |
SerialDownlinkConverter | config is the RPC config section. data is the value from ThingsBoard. Returns bytes. |