Rule Node Development
In this guide, you will learn how to create custom rule nodes and add them to your ThingsBoard instance. We will review rule nodes of three different types: Filter, Enrichment, and Transformation.
Prerequisites
Section titled “Prerequisites”We assume you have completed the following:
You also need the following tools installed:
| Tool | Version |
|---|---|
| OpenJDK | 17 |
| Maven | 3.6.0+ |
| Java IDE | IntelliJ IDEA recommended |
| Lombok plugin | Optional, for your IDE |
Step 1. Download and Build the Sample Project
Section titled “Step 1. Download and Build the Sample Project”Clone the repository and navigate to the project folder:
git clone -b release-4.3 https://github.com/thingsboard/rule-node-examplescd rule-node-examplesBy default, the sample project uses ThingsBoard Community Edition APIs, making your rule nodes compatible with both Community and Professional editions.
To use exclusive Professional Edition APIs (such as Entity Groups), change the thingsboard.version property in pom.xml:
nano pom.xmlSet the version to the Professional Edition release:
... <properties> ... <thingsboard.version>4.3.1.1PE</thingsboard.version> ... </properties>...Build the project:
mvn clean installExpected output:
[INFO] BUILD SUCCESSStep 2. Import Project to the IDE
Section titled “Step 2. Import Project to the IDE”Make sure the Lombok plugin is installed in your IDE. Import the project as a Maven project.
Step 3. Create Your Rule Node
Section titled “Step 3. Create Your Rule Node”Implement the TbNode interface and annotate it with the @RuleNode annotation.
Here is an example rule node that filters incoming messages based on whether a specified key exists in the message payload:
@RuleNode( type = ComponentType.FILTER, name = "check key", relationTypes = {"True", "False"}, configClazz = TbKeyFilterNodeConfiguration.class, nodeDescription = "Checks the existence of the selected key in the message payload.", nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.", uiResources = {"static/rulenode/custom-nodes-config.js"}, configDirective = "tbFilterNodeCheckKeyConfig")public class TbKeyFilterNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private TbKeyFilterNodeConfiguration config; private String key;
@Override public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class); key = config.getKey(); }
@Override public void onMsg(TbContext ctx, TbMsg msg) { try { ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False"); } catch (IOException e) { ctx.tellFailure(msg, e); } }
@Override public void destroy() { }}The @RuleNode Annotation
Section titled “The @RuleNode Annotation”The @RuleNode annotation defines the node type, name, description, UI form, and outbound relations.
| Parameter | Description |
|---|---|
type | One of the available Rule Node Types. Determines which section of the Rule Chain Editor contains your node. |
name | Display name used in the Rule Chain Editor and debug messages. |
nodeDescription | Short description visible in the Rule Chain Editor. |
nodeDetails | Full description with HTML support, visible in the Rule Chain Editor. |
configClazz | Full class name of the configuration JSON class. |
relationTypes | Array of pre-defined relation types. Must match the values used in TbContext.tellNext. |
customRelations | Set to true if you use custom relation types in TbContext.tellNext. |
configDirective | Name of the Angular UI directive for editing node configuration. Optional — if empty, users see a raw JSON editor. |
uiResources | Path to the Angular UI file containing the configuration directive. Optional. |
icon | Icon name from the Angular Material icon set. |
iconUrl | Full URL to a custom icon for the Rule Chain Editor. |
docUrl | Link to the documentation page for this rule node. |
Rule Node Lifecycle
Section titled “Rule Node Lifecycle”The init method is called when a new rule node is created — either when someone adds it to a rule chain or when the system restarts.
Use it to parse configuration or obtain a local copy of TbContext.
The destroy method is called when the rule node is removed from a rule chain or the system stops.
When a user changes the configuration of an existing rule node, the rule engine calls destroy followed by init.
Processing Incoming Messages
Section titled “Processing Incoming Messages”Your rule node must call one of the following methods to signal successful processing:
// Send message via "Success" relationvoid tellSuccess(TbMsg msg);
// Send message via a specific relation typevoid tellNext(TbMsg msg, String relationType);
// Send message via one of the specified relation typesvoid tellNext(TbMsg msg, Set<String> relationTypes);If processing fails, call tellFailure:
void tellFailure(TbMsg msg, Throwable th);Using ThingsBoard Services
Section titled “Using ThingsBoard Services”TbContext provides getters for platform services. Use “Download Sources” in your IDE to browse service interfaces.
| Service | Getter | Purpose |
|---|---|---|
| AttributesService | getAttributesService() | Get and save entity attributes |
| CustomerService | getCustomerService() | CRUD operations on customers |
| UserService | getUserService() | CRUD operations on users |
| AssetService | getAssetService() | CRUD operations on assets |
| DeviceService | getDeviceService() | CRUD operations on devices |
| EntityViewService | getEntityViewService() | CRUD operations on entity views |
| DashboardService | getDashboardService() | Create and manage dashboards |
| RuleEngineAlarmService | getAlarmService() | Create and clear alarms |
| RuleChainService | getRuleChainService() | Create and manage rule chains |
| RuleEngineRpcService | getRpcService() | Send RPC commands to devices |
| RuleEngineTelemetryService | getTelemetryService() | Save telemetry and push WebSocket notifications |
| TimeseriesService | getTimeseriesService() | Query and save telemetry without dashboard notifications |
| RelationService | getRelationService() | Query and manage entity relations |
Professional Edition Services
Section titled “Professional Edition Services”PE users can access additional services via TbContext.getPeContext():
| Service | Getter | Purpose |
|---|---|---|
| IntegrationService | getIntegrationService() | Manage integrations |
| EntityGroupService | getEntityGroupService() | Manage entity groups |
| ReportService | getReportService() | Create reports |
| BlobEntityService | getBlobEntityService() | Manage blob entities |
| GroupPermissionService | getGroupPermissionService() | Manage group permissions |
| RoleService | getRoleService() | Manage roles |
TbPeContext also provides owner management methods:
// Get entity owner (TenantId or CustomerId)EntityId getOwner(TenantId tenantId, EntityId entityId);
// Change entity ownervoid changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device);void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset);void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard);void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType);
// Push downlink message to an integrationvoid pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);Creating New Messages from the Rule Node
Section titled “Creating New Messages from the Rule Node”You can create and push derived messages to the Rule Engine. For example, this code duplicates a message from a customer to all of that customer’s devices:
@Overridepublic void onMsg(TbContext ctx, TbMsg msg) { EntityId msgOriginator = msg.getOriginator(); if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) { CustomerId customerId = new CustomerId(msgOriginator.getId()); boolean hasNext = true; PageLink pageLink = new PageLink(1024); while (hasNext) { PageData<Device> devices = ctx.getDeviceService() .findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink); hasNext = devices.hasNext(); pageLink = pageLink.nextPageLink(); for (Device device : devices.getData()) { TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData()); ctx.enqueueForTellNext(newMsg, "Success"); } } ctx.ack(msg); } else { ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!")); }}Use enqueueForTellNext to push new messages to connected rule nodes by relation type. Alternatively, use enqueue to push a message to the beginning of the root rule chain:
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);A variant with delivery confirmation:
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);Multithreading
Section titled “Multithreading”The Rule Engine uses the actor model — it invokes TbNode.onMsg sequentially for every message in the rule node mailbox. If you process the message in the same thread, your implementation is thread-safe.
However, most API calls execute in separate threads. Use callbacks to acknowledge or forward messages:
@Overridepublic void onMsg(TbContext ctx, TbMsg msg) { try { ObjectNode json = (ObjectNode) mapper.readTree(msg.getData()); double temperatureF = json.get("temperature").asDouble(); double temperatureC = (temperatureF - 32) * 5 / 9; TsKvEntry tsKvEntry = new BasicTsKvEntry( System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC)); ctx.getTelemetryService().saveAndNotify( ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() { @Override public void onSuccess(@Nullable Void aVoid) { ctx.tellSuccess(msg); } @Override public void onFailure(Throwable throwable) { ctx.tellFailure(msg, throwable); } }); } catch (JsonProcessingException e) { ctx.tellFailure(msg, e); }}Notice that tellSuccess and tellFailure are called in the callback thread, not in the main thread.
Clustering Mode
Section titled “Clustering Mode”A single rule node instance runs on each rule-engine microservice. Messages are partitioned by originator ID (device or asset), so messages from one device always go to the same rule node instance.
When nodes are added or removed, a repartition event occurs. Override TbNode.onPartitionChangeMsg to react to topology changes — useful for stateful nodes that cache data by originator ID. Use TbContext.isLocalEntity to check if an entity is assigned to the current partition.
Full cache example (TbCacheExampleNode)
@Slf4j@RuleNode( type = ComponentType.FILTER, name = "Cache example", relationTypes = {"True", "False"}, configClazz = EmptyNodeConfiguration.class, nodeDescription = "Checks that the incoming value exceeds certain threshold", nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbNodeEmptyConfig")public class TbCacheExampleNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper(); private ConcurrentMap<EntityId, Double> cache;
@Override public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException { this.cache = new ConcurrentHashMap<>(); }
@Override public void onMsg(TbContext ctx, TbMsg msg) { try { ObjectNode json = (ObjectNode) mapper.readTree(msg.getData()); double temperature = json.get("temperature").asDouble(); Double temperatureThreshold = getCacheValue( ctx, msg.getOriginator(), "temperatureThreshold", 42); ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False"); } catch (JsonProcessingException e) { ctx.tellFailure(msg, e); } }
@Override public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { // Remove entries for entities no longer assigned to this partition cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey())); }
private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) { return cache.computeIfAbsent(entityId, id -> { try { Optional<AttributeKvEntry> attr = ctx.getAttributesService() .find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get(); return attr.isPresent() ? attr.get().getDoubleValue().orElse(defaultValue) : defaultValue; } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); }
@Override public void destroy() { cache.clear(); }}Step 4. Import Custom Rule Nodes to Your ThingsBoard Instance
Section titled “Step 4. Import Custom Rule Nodes to Your ThingsBoard Instance”Build the project again:
mvn clean installThe JAR file is at target/rule-engine-1.0.0-custom-nodes.jar.
Step 4.1. Add JAR File to a ThingsBoard Service Installation
Section titled “Step 4.1. Add JAR File to a ThingsBoard Service Installation”-
Copy the JAR to the ThingsBoard extensions directory:
Terminal window sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/ -
Change the file owner:
Terminal window sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/* -
Restart ThingsBoard:
Terminal window sudo service thingsboard restart
Step 5. Add Custom Package Name to thingsboard.yml
Section titled “Step 5. Add Custom Package Name to thingsboard.yml”If you changed the package name from org.thingsboard.rule.engine to your own (e.g., com.example.rule.engine), add it to the PLUGINS_SCAN_PACKAGES environment variable in thingsboard.yml:
plugins: scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"Step 6. Troubleshoot Your Rule Node
Section titled “Step 6. Troubleshoot Your Rule Node”Create a Generator rule node and connect it to your custom rule node — this produces a configurable stream of test messages. Enable debug mode on your custom rule node to inspect output and errors.