Skip to main content

Adapter Development

This document provides a comprehensive guide to developing adapters for OctoMesh, covering both Edge and Mesh adapters.

Adapter Overview

Adapters are the integration layer of OctoMesh, connecting external systems to the data mesh platform.

Adapter Types

TypeDeploymentUse Cases
Edge AdapterNear data sources (K3s)Sensors, PLCs, local databases, file systems
Mesh AdapterCloud/Datacenter (K8s)ERP systems, cloud services, data aggregation

Adapter Roles

  • Plugs: Retrieve data INTO OctoMesh
  • Sockets: Provide data FROM OctoMesh

Data Pipeline Architecture

Adapters execute data pipelines that define how data flows through the system.

Pipeline Structure

StagePurposeExamples
TriggerInitiates pipeline executionSchedule, HTTP request, message, entity change
ExtractRetrieves data from sourceQuery database, read file, call API
TransformProcesses and maps dataFilter, aggregate, format conversion
LoadStores data in destinationSave to RT, write to stream, send notification

Pipeline Nodes

Pipelines consist of nodes that perform specific operations:

Creating an Adapter Project

Prerequisites

  • .NET 8.0 SDK or later
  • Visual Studio, JetBrains Rider, or VS Code
  • Access to OctoMesh NuGet packages

Project Setup

  1. Create a new .NET project:
dotnet new console -n MyAdapter
cd MyAdapter
  1. Add required NuGet packages:
# Core SDK packages
dotnet add package Meshmakers.Octo.Sdk.Common
dotnet add package Meshmakers.Octo.Sdk.ServiceClient

# For Mesh Adapter
dotnet add package Meshmakers.Octo.Sdk.MeshAdapter

# For Edge Adapter
dotnet add package Meshmakers.Octo.Sdk.EdgeAdapter

Project Structure

MyAdapter/
├── Program.cs # Entry point
├── appsettings.json # Configuration
├── Nodes/
│ ├── Trigger/ # Trigger nodes
│ ├── Extract/ # Extract nodes
│ ├── Transform/ # Transform nodes
│ └── Load/ # Load nodes
├── Pipelines/
│ └── MyPipeline.cs # Pipeline definitions
└── Services/
└── MyService.cs # Custom services

Implementing Pipeline Nodes

Trigger Node Example

A trigger node initiates pipeline execution:

using Meshmakers.Octo.Sdk.Common.Pipelines;

public class FromHttpRequestNodeConfiguration : ITriggerNodeConfiguration
{
public string Route { get; set; } = "/api/trigger";
public HttpMethod Method { get; set; } = HttpMethod.Post;
public bool RequireAuthentication { get; set; } = true;
}

public class FromHttpRequestNode : TriggerNode<FromHttpRequestNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
FromHttpRequestNodeConfiguration config,
CancellationToken cancellationToken)
{
// Access HTTP request data
var requestBody = Context.GetHttpRequestBody();

// Create pipeline context with data
var context = new PipelineContext();
context.SetData("request", requestBody);

return context;
}
}

Extract Node Example

An extract node retrieves data:

public class GetRtEntitiesByTypeNodeConfiguration : IExtractNodeConfiguration
{
public string CkTypeId { get; set; }
public int MaxResults { get; set; } = 100;
public string Filter { get; set; }
}

public class GetRtEntitiesByTypeNode : ExtractNode<GetRtEntitiesByTypeNodeConfiguration>
{
private readonly IAssetRepositoryClient _client;

public GetRtEntitiesByTypeNode(IAssetRepositoryClient client)
{
_client = client;
}

public override async Task<PipelineContext> ExecuteAsync(
GetRtEntitiesByTypeNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var entities = await _client.QueryEntitiesAsync(
config.CkTypeId,
config.Filter,
config.MaxResults,
cancellationToken);

context.SetData("entities", entities);
return context;
}
}

Transform Node Example

A transform node processes data:

public class DataMappingNodeConfiguration : ITransformNodeConfiguration
{
public List<MappingEntry> Mappings { get; set; } = new();
}

public class MappingEntry
{
public string SourcePath { get; set; }
public string TargetPath { get; set; }
public string TransformExpression { get; set; }
}

public class DataMappingNode : TransformNode<DataMappingNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
DataMappingNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var sourceData = context.GetData<object>("source");
var result = new Dictionary<string, object>();

foreach (var mapping in config.Mappings)
{
var value = ExtractValue(sourceData, mapping.SourcePath);

if (!string.IsNullOrEmpty(mapping.TransformExpression))
{
value = ApplyTransform(value, mapping.TransformExpression);
}

SetValue(result, mapping.TargetPath, value);
}

context.SetData("mapped", result);
return context;
}
}

Load Node Example

A load node stores data:

public class ApplyChangesNodeConfiguration : ILoadNodeConfiguration
{
public UpdateKind UpdateKind { get; set; } = UpdateKind.CreateOrUpdate;
public bool ValidateBeforeSave { get; set; } = true;
}

public class ApplyChangesNode : LoadNode<ApplyChangesNodeConfiguration>
{
private readonly IAssetRepositoryClient _client;

public ApplyChangesNode(IAssetRepositoryClient client)
{
_client = client;
}

public override async Task<PipelineContext> ExecuteAsync(
ApplyChangesNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var updateInfo = context.GetData<UpdateInfo>("updateInfo");

var result = await _client.ApplyChangesAsync(
updateInfo,
config.UpdateKind,
cancellationToken);

context.SetData("result", result);
return context;
}
}

Pipeline Configuration

Defining a Pipeline

Pipelines are configured using JSON or code:

{
"pipelineId": "energy-meter-sync",
"name": "Energy Meter Synchronization",
"description": "Syncs energy meter data from external API",
"nodes": [
{
"nodeType": "FromSchedule",
"config": {
"cronExpression": "0 */5 * * * *"
}
},
{
"nodeType": "MakeHttpRequest",
"config": {
"url": "https://api.external.com/meters",
"method": "GET",
"headers": {
"Authorization": "Bearer ${secrets.apiKey}"
}
}
},
{
"nodeType": "DataMapping",
"config": {
"mappings": [
{
"sourcePath": "$.data[*].id",
"targetPath": "serialNumber"
},
{
"sourcePath": "$.data[*].reading",
"targetPath": "currentValue"
}
]
}
},
{
"nodeType": "CreateUpdateInfo",
"config": {
"ckTypeId": "Industry.Energy/EnergyMeter",
"updateKind": "CreateOrUpdate"
}
},
{
"nodeType": "ApplyChanges",
"config": {
"validateBeforeSave": true
}
}
]
}

Pipeline Builder API

var pipeline = PipelineBuilder.Create("energy-meter-sync")
.WithTrigger<FromScheduleNode>(config =>
{
config.CronExpression = "0 */5 * * * *";
})
.AddExtract<MakeHttpRequestNode>(config =>
{
config.Url = "https://api.external.com/meters";
config.Method = HttpMethod.Get;
})
.AddTransform<DataMappingNode>(config =>
{
config.Mappings.Add(new MappingEntry
{
SourcePath = "$.data[*].id",
TargetPath = "serialNumber"
});
})
.AddTransform<CreateUpdateInfoNode>(config =>
{
config.CkTypeId = "Industry.Energy/EnergyMeter";
})
.AddLoad<ApplyChangesNode>(config =>
{
config.ValidateBeforeSave = true;
})
.Build();

Adapter Configuration

appsettings.json

{
"OctoMesh": {
"IdentityServiceUrl": "https://identity.octomesh.local",
"AssetRepositoryUrl": "https://asset-repo.octomesh.local",
"CommunicationControllerUrl": "https://com-controller.octomesh.local",
"TenantId": "my-tenant",
"ClientId": "my-adapter",
"ClientSecret": "${secrets.clientSecret}"
},
"Adapter": {
"PoolId": "edge-pool-01",
"AdapterId": "energy-adapter-01"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Meshmakers": "Debug"
}
}
}

Dependency Injection Setup

public class Program
{
public static async Task Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);

// Configure OctoMesh services
builder.Services.AddOctoMeshClient(options =>
{
options.IdentityServiceUrl = builder.Configuration["OctoMesh:IdentityServiceUrl"];
options.AssetRepositoryUrl = builder.Configuration["OctoMesh:AssetRepositoryUrl"];
options.TenantId = builder.Configuration["OctoMesh:TenantId"];
});

// Register adapter
builder.Services.AddMeshAdapter(options =>
{
options.PoolId = builder.Configuration["Adapter:PoolId"];
options.AdapterId = builder.Configuration["Adapter:AdapterId"];
});

// Register custom nodes
builder.Services.AddTransient<MyCustomNode>();

var host = builder.Build();
await host.RunAsync();
}
}

Logging in Nodes

Implement proper logging for debugging and monitoring:

public class MyCustomNode : TransformNode<MyCustomNodeConfiguration>
{
private readonly ILogger<MyCustomNode> _logger;

public MyCustomNode(ILogger<MyCustomNode> logger)
{
_logger = logger;
}

public override async Task<PipelineContext> ExecuteAsync(
MyCustomNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
_logger.LogInformation("Starting transformation with config: {@Config}", config);

try
{
// Node logic here
var result = await ProcessDataAsync(context, cancellationToken);

_logger.LogInformation("Transformation completed. Processed {Count} items", result.Count);

context.SetData("result", result);
return context;
}
catch (Exception ex)
{
_logger.LogError(ex, "Transformation failed");
throw;
}
}
}

Error Handling

Pipeline Error Handling

public class MyLoadNode : LoadNode<MyLoadNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
MyLoadNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
await SaveDataAsync(context, cancellationToken);
}
catch (ValidationException ex)
{
// Mark as partial failure, continue pipeline
context.AddWarning($"Validation failed: {ex.Message}");
}
catch (ConnectionException ex)
{
// Retry logic
for (int i = 0; i < 3; i++)
{
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)), cancellationToken);
try
{
await SaveDataAsync(context, cancellationToken);
break;
}
catch { /* Continue retry */ }
}
}

return context;
}
}

Testing Adapters

Unit Testing Nodes

[TestClass]
public class DataMappingNodeTests
{
[TestMethod]
public async Task ExecuteAsync_WithValidMapping_ShouldMapData()
{
// Arrange
var node = new DataMappingNode();
var config = new DataMappingNodeConfiguration
{
Mappings = new List<MappingEntry>
{
new() { SourcePath = "$.name", TargetPath = "displayName" }
}
};
var context = new PipelineContext();
context.SetData("source", new { name = "Test" });

// Act
var result = await node.ExecuteAsync(config, context, CancellationToken.None);

// Assert
var mapped = result.GetData<Dictionary<string, object>>("mapped");
Assert.AreEqual("Test", mapped["displayName"]);
}
}

Integration Testing

[TestClass]
public class EnergyAdapterIntegrationTests
{
private IHost _host;

[TestInitialize]
public void Setup()
{
_host = Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
services.AddOctoMeshClient(options =>
{
options.IdentityServiceUrl = "https://localhost:5003";
// ... test configuration
});
})
.Build();
}

[TestMethod]
public async Task Pipeline_ShouldSyncEnergyMeters()
{
// Test implementation
}
}

Deployment

Docker Container

FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["MyAdapter.csproj", "."]
RUN dotnet restore
COPY . .
RUN dotnet build -c Release -o /app/build

FROM build AS publish
RUN dotnet publish -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MyAdapter.dll"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
name: energy-adapter
namespace: octo-adapters
spec:
replicas: 1
selector:
matchLabels:
app: energy-adapter
template:
metadata:
labels:
app: energy-adapter
spec:
containers:
- name: adapter
image: myregistry/energy-adapter:latest
env:
- name: OctoMesh__TenantId
value: "production"
- name: OctoMesh__ClientSecret
valueFrom:
secretKeyRef:
name: adapter-secrets
key: client-secret

Next Steps