Custom Python models
The Custom Model prediction method lets you write a Python class that Trendz executes server-side. Trendz handles data loading, segment iteration, state persistence between segments, and storing the resulting forecast in ThingsBoard — you only implement the model logic.
Custom models are useful when the built-in methods (Fourier, Prophet, ARIMA, Linear Regression) do not fit your data, or when you need to combine several input telemetry streams as regressors.
The IModel interface
Section titled “The IModel interface”Every custom model must extend the abstract IModel class and implement all seven methods. Trendz injects this base class automatically — you do not need to import or define it.
Trendz lifecycle per segment ────────────────────────────
First use ───────── init_state() ──▶ save_state(path) │ ▼ [state file]
Per-segment training ──────────────────── [state file] ──▶ load_state(path) │ ▼ train() ← first segment partial_fit() ← each subsequent segment │ ▼ save_state(path) ──▶ [updated state file]
Generating the forecast ─────────────────────── [state file] ──▶ load_state(path) │ ▼ predict(timestamps) ──▶ [(ts, value), ...]Method reference
Section titled “Method reference”| Method | When called | Purpose |
|---|---|---|
init_state() | Once, before the first training segment | Initialize all internal model objects (sklearn estimators, buffers, scalers, etc.) |
train(data, additionalData) | Once per model (first segment only) | Fit the model from scratch using the current segment’s data |
partial_fit(data, additionalData) | Once per additional segment | Update the model incrementally with new segment data without forgetting previous state |
predict(timestamps) | After all segments are trained | Generate predicted values for the given future timestamps |
save_state(file_path) | After init_state, train, and partial_fit | Serialize the full model state to a file so Trendz can persist it between calls |
load_state(file_path) | Before train, partial_fit, and predict | Restore the model state from a file saved by a previous save_state call |
name() | For logging and identification | Return a unique string name for this model |
data parameter
Section titled “data parameter”Both train and partial_fit receive a data argument — the primary telemetry series for the current segment, sorted chronologically:
data = [ [1700000000000, 42.3], # [timestamp_ms, value] [1700086400000, 43.1], [1700172800000, 41.8], # ...]Each element is a two-item list: a Unix timestamp in milliseconds and the numeric telemetry value.
additionalData parameter
Section titled “additionalData parameter”When additional regressors are configured for a Multivariable model, additionalData is a dict keyed by telemetry_{field_id} (field UUID with hyphens replaced by underscores), each value being a list of [timestamp_ms, value] pairs:
additionalData = { "telemetry_123e4567_e89b_12d3_a456_426614174000": [ [1700000000000, 18.5], [1700086400000, 19.0], # ... ], "telemetry_987fcdeb_51d3_a456_426614174000_c0a8": [ [1700000000000, 0.82], # ... ],}For univariable models, additionalData is an empty dict {}.
predict input and output
Section titled “predict input and output”predict receives a list of future timestamps (milliseconds):
timestamps = [1702944000000, 1703030400000, 1703116800000]It must return a list of (timestamp, value) tuples in the same order:
return [(1702944000000, 44.1), (1703030400000, 45.3), (1703116800000, 44.8)]Trendz validates that the number of returned pairs equals the number of input timestamps and raises an error if they differ.
Example: Linear Regression
Section titled “Example: Linear Regression”This example implements a univariable Linear Regression model with StandardScaler for timestamps and MinMaxScaler for values. It supports both full retraining (train) and incremental updates (partial_fit) by accumulating sufficient statistics rather than storing all raw data.
###################################################### Prediction Method: Linear Regression
from sklearn.preprocessing import MinMaxScaler, StandardScalerfrom sklearn.linear_model import LinearRegressionimport pickleimport numpy as npimport os
class CustomModel(IModel):
def __init__(self, value_transformer=None, timestamp_transformer=None): self.model = None self.timestamp_transformer = ( timestamp_transformer if timestamp_transformer else StandardScaler() ) self.value_transformer = ( value_transformer if value_transformer else MinMaxScaler() ) # Sufficient statistics for incremental updates self.sum_x = 0 self.sum_y = 0 self.sum_xy = 0 self.sum_xx = 0 self.n = 0
def init_state(self): self.model = LinearRegression()
def train(self, data, additionalData=None): # Extract timestamps and values from [[ts, value], ...] pairs ts = np.array([point[0] for point in data]).reshape(-1, 1) values = np.array([point[1] for point in data]).reshape(-1, 1)
# Fit scalers on the full segment self.timestamp_transformer.fit(ts) self.value_transformer.fit(values) ts_scaled = self.timestamp_transformer.transform(ts) values_scaled = self.value_transformer.transform(values)
# Accumulate sufficient statistics self.sum_x = np.sum(ts_scaled) self.sum_y = np.sum(values_scaled) self.sum_xy = np.sum(ts_scaled * values_scaled) self.sum_xx = np.sum(ts_scaled ** 2) self.n = len(ts_scaled)
self.model.fit(ts_scaled, values_scaled)
def partial_fit(self, data, additionalData=None): # Scalers already fitted — transform only, do not refit ts = np.array([point[0] for point in data]).reshape(-1, 1) values = np.array([point[1] for point in data]).reshape(-1, 1) ts_scaled = self.timestamp_transformer.transform(ts) values_scaled = self.value_transformer.transform(values)
# Update sufficient statistics with new segment self.sum_x += np.sum(ts_scaled) self.sum_y += np.sum(values_scaled) self.sum_xy += np.sum(ts_scaled * values_scaled) self.sum_xx += np.sum(ts_scaled ** 2) self.n += len(ts_scaled)
# Recompute slope and intercept from accumulated statistics if self.n > 0: mean_x = self.sum_x / self.n mean_y = self.sum_y / self.n slope = ( (self.sum_xy - self.n * mean_x * mean_y) / (self.sum_xx - self.n * mean_x ** 2) ) intercept = mean_y - slope * mean_x self.model.coef_ = np.array([[slope]]) self.model.intercept_ = np.array([intercept])
def predict(self, timestamps): # timestamps is a list of Unix ms values ts = np.array(timestamps).reshape(-1, 1) ts_scaled = self.timestamp_transformer.transform(ts) predictions_scaled = self.model.predict(ts_scaled) predictions = self.value_transformer.inverse_transform(predictions_scaled) return list(zip(timestamps, predictions.flatten()))
def save_state(self, file_path): state = { 'model': self.model, 'value_transformer': self.value_transformer, 'timestamp_transformer': self.timestamp_transformer, 'sum_x': self.sum_x, 'sum_y': self.sum_y, 'sum_xy': self.sum_xy, 'sum_xx': self.sum_xx, 'n': self.n, } with open(file_path, 'wb') as file: pickle.dump(state, file)
def load_state(self, file_path): with open(file_path, 'rb') as file: state = pickle.load(file) self.model = state['model'] self.value_transformer = state['value_transformer'] self.timestamp_transformer = state['timestamp_transformer'] self.sum_x = state['sum_x'] self.sum_y = state['sum_y'] self.sum_xy = state['sum_xy'] self.sum_xx = state['sum_xx'] self.n = state['n']
def name(self): return "LinearRegressionModel"
#####################################################