Skip to content
Stand with Ukraine flag

Custom Python models

The Custom Model prediction method lets you write a Python class that Trendz executes server-side. Trendz handles data loading, segment iteration, state persistence between segments, and storing the resulting forecast in ThingsBoard — you only implement the model logic.

Custom models are useful when the built-in methods (Fourier, Prophet, ARIMA, Linear Regression) do not fit your data, or when you need to combine several input telemetry streams as regressors.

Every custom model must extend the abstract IModel class and implement all seven methods. Trendz injects this base class automatically — you do not need to import or define it.

Trendz lifecycle per segment
────────────────────────────
First use
─────────
init_state() ──▶ save_state(path)
[state file]
Per-segment training
────────────────────
[state file] ──▶ load_state(path)
train() ← first segment
partial_fit() ← each subsequent segment
save_state(path) ──▶ [updated state file]
Generating the forecast
───────────────────────
[state file] ──▶ load_state(path)
predict(timestamps) ──▶ [(ts, value), ...]
MethodWhen calledPurpose
init_state()Once, before the first training segmentInitialize all internal model objects (sklearn estimators, buffers, scalers, etc.)
train(data, additionalData)Once per model (first segment only)Fit the model from scratch using the current segment’s data
partial_fit(data, additionalData)Once per additional segmentUpdate the model incrementally with new segment data without forgetting previous state
predict(timestamps)After all segments are trainedGenerate predicted values for the given future timestamps
save_state(file_path)After init_state, train, and partial_fitSerialize the full model state to a file so Trendz can persist it between calls
load_state(file_path)Before train, partial_fit, and predictRestore the model state from a file saved by a previous save_state call
name()For logging and identificationReturn a unique string name for this model

Both train and partial_fit receive a data argument — the primary telemetry series for the current segment, sorted chronologically:

data = [
[1700000000000, 42.3], # [timestamp_ms, value]
[1700086400000, 43.1],
[1700172800000, 41.8],
# ...
]

Each element is a two-item list: a Unix timestamp in milliseconds and the numeric telemetry value.

When additional regressors are configured for a Multivariable model, additionalData is a dict keyed by telemetry_{field_id} (field UUID with hyphens replaced by underscores), each value being a list of [timestamp_ms, value] pairs:

additionalData = {
"telemetry_123e4567_e89b_12d3_a456_426614174000": [
[1700000000000, 18.5],
[1700086400000, 19.0],
# ...
],
"telemetry_987fcdeb_51d3_a456_426614174000_c0a8": [
[1700000000000, 0.82],
# ...
],
}

For univariable models, additionalData is an empty dict {}.

predict receives a list of future timestamps (milliseconds):

timestamps = [1702944000000, 1703030400000, 1703116800000]

It must return a list of (timestamp, value) tuples in the same order:

return [(1702944000000, 44.1), (1703030400000, 45.3), (1703116800000, 44.8)]

Trendz validates that the number of returned pairs equals the number of input timestamps and raises an error if they differ.

This example implements a univariable Linear Regression model with StandardScaler for timestamps and MinMaxScaler for values. It supports both full retraining (train) and incremental updates (partial_fit) by accumulating sufficient statistics rather than storing all raw data.

#####################################################
# Prediction Method: Linear Regression
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.linear_model import LinearRegression
import pickle
import numpy as np
import os
class CustomModel(IModel):
def __init__(self, value_transformer=None, timestamp_transformer=None):
self.model = None
self.timestamp_transformer = (
timestamp_transformer if timestamp_transformer else StandardScaler()
)
self.value_transformer = (
value_transformer if value_transformer else MinMaxScaler()
)
# Sufficient statistics for incremental updates
self.sum_x = 0
self.sum_y = 0
self.sum_xy = 0
self.sum_xx = 0
self.n = 0
def init_state(self):
self.model = LinearRegression()
def train(self, data, additionalData=None):
# Extract timestamps and values from [[ts, value], ...] pairs
ts = np.array([point[0] for point in data]).reshape(-1, 1)
values = np.array([point[1] for point in data]).reshape(-1, 1)
# Fit scalers on the full segment
self.timestamp_transformer.fit(ts)
self.value_transformer.fit(values)
ts_scaled = self.timestamp_transformer.transform(ts)
values_scaled = self.value_transformer.transform(values)
# Accumulate sufficient statistics
self.sum_x = np.sum(ts_scaled)
self.sum_y = np.sum(values_scaled)
self.sum_xy = np.sum(ts_scaled * values_scaled)
self.sum_xx = np.sum(ts_scaled ** 2)
self.n = len(ts_scaled)
self.model.fit(ts_scaled, values_scaled)
def partial_fit(self, data, additionalData=None):
# Scalers already fitted — transform only, do not refit
ts = np.array([point[0] for point in data]).reshape(-1, 1)
values = np.array([point[1] for point in data]).reshape(-1, 1)
ts_scaled = self.timestamp_transformer.transform(ts)
values_scaled = self.value_transformer.transform(values)
# Update sufficient statistics with new segment
self.sum_x += np.sum(ts_scaled)
self.sum_y += np.sum(values_scaled)
self.sum_xy += np.sum(ts_scaled * values_scaled)
self.sum_xx += np.sum(ts_scaled ** 2)
self.n += len(ts_scaled)
# Recompute slope and intercept from accumulated statistics
if self.n > 0:
mean_x = self.sum_x / self.n
mean_y = self.sum_y / self.n
slope = (
(self.sum_xy - self.n * mean_x * mean_y) /
(self.sum_xx - self.n * mean_x ** 2)
)
intercept = mean_y - slope * mean_x
self.model.coef_ = np.array([[slope]])
self.model.intercept_ = np.array([intercept])
def predict(self, timestamps):
# timestamps is a list of Unix ms values
ts = np.array(timestamps).reshape(-1, 1)
ts_scaled = self.timestamp_transformer.transform(ts)
predictions_scaled = self.model.predict(ts_scaled)
predictions = self.value_transformer.inverse_transform(predictions_scaled)
return list(zip(timestamps, predictions.flatten()))
def save_state(self, file_path):
state = {
'model': self.model,
'value_transformer': self.value_transformer,
'timestamp_transformer': self.timestamp_transformer,
'sum_x': self.sum_x,
'sum_y': self.sum_y,
'sum_xy': self.sum_xy,
'sum_xx': self.sum_xx,
'n': self.n,
}
with open(file_path, 'wb') as file:
pickle.dump(state, file)
def load_state(self, file_path):
with open(file_path, 'rb') as file:
state = pickle.load(file)
self.model = state['model']
self.value_transformer = state['value_transformer']
self.timestamp_transformer = state['timestamp_transformer']
self.sum_x = state['sum_x']
self.sum_y = state['sum_y']
self.sum_xy = state['sum_xy']
self.sum_xx = state['sum_xx']
self.n = state['n']
def name(self):
return "LinearRegressionModel"
#####################################################