Adapter Development
This document provides a comprehensive guide to developing adapters for OctoMesh, covering both Edge and Mesh adapters.
Adapter Overview
Adapters are the integration layer of OctoMesh, connecting external systems to the data mesh platform.
Adapter Types
| Type | Deployment | Use Cases |
|---|---|---|
| Edge Adapter | Near data sources (K3s) | Sensors, PLCs, local databases, file systems |
| Mesh Adapter | Cloud/Datacenter (K8s) | ERP systems, cloud services, data aggregation |
Adapter Roles
- Plugs: Retrieve data INTO OctoMesh
- Sockets: Provide data FROM OctoMesh
Data Pipeline Architecture
Adapters execute data pipelines that define how data flows through the system.
Pipeline Structure
| Stage | Purpose | Examples |
|---|---|---|
| Trigger | Initiates pipeline execution | Schedule, HTTP request, message, entity change |
| Extract | Retrieves data from source | Query database, read file, call API |
| Transform | Processes and maps data | Filter, aggregate, format conversion |
| Load | Stores data in destination | Save to RT, write to stream, send notification |
Pipeline Nodes
Pipelines consist of nodes that perform specific operations:
Creating an Adapter Project
Prerequisites
- .NET 8.0 SDK or later
- Visual Studio, JetBrains Rider, or VS Code
- Access to OctoMesh NuGet packages
Project Setup
- Create a new .NET project:
dotnet new console -n MyAdapter
cd MyAdapter
- Add required NuGet packages:
# Core SDK packages
dotnet add package Meshmakers.Octo.Sdk.Common
dotnet add package Meshmakers.Octo.Sdk.ServiceClient
# For Mesh Adapter
dotnet add package Meshmakers.Octo.Sdk.MeshAdapter
# For Edge Adapter
dotnet add package Meshmakers.Octo.Sdk.EdgeAdapter
Project Structure
MyAdapter/
├── Program.cs # Entry point
├── appsettings.json # Configuration
├── Nodes/
│ ├── Trigger/ # Trigger nodes
│ ├── Extract/ # Extract nodes
│ ├── Transform/ # Transform nodes
│ └── Load/ # Load nodes
├── Pipelines/
│ └── MyPipeline.cs # Pipeline definitions
└── Services/
└── MyService.cs # Custom services
Implementing Pipeline Nodes
Trigger Node Example
A trigger node initiates pipeline execution:
using Meshmakers.Octo.Sdk.Common.Pipelines;
public class FromHttpRequestNodeConfiguration : ITriggerNodeConfiguration
{
public string Route { get; set; } = "/api/trigger";
public HttpMethod Method { get; set; } = HttpMethod.Post;
public bool RequireAuthentication { get; set; } = true;
}
public class FromHttpRequestNode : TriggerNode<FromHttpRequestNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
FromHttpRequestNodeConfiguration config,
CancellationToken cancellationToken)
{
// Access HTTP request data
var requestBody = Context.GetHttpRequestBody();
// Create pipeline context with data
var context = new PipelineContext();
context.SetData("request", requestBody);
return context;
}
}
Extract Node Example
An extract node retrieves data:
public class GetRtEntitiesByTypeNodeConfiguration : IExtractNodeConfiguration
{
public string CkTypeId { get; set; }
public int MaxResults { get; set; } = 100;
public string Filter { get; set; }
}
public class GetRtEntitiesByTypeNode : ExtractNode<GetRtEntitiesByTypeNodeConfiguration>
{
private readonly IAssetRepositoryClient _client;
public GetRtEntitiesByTypeNode(IAssetRepositoryClient client)
{
_client = client;
}
public override async Task<PipelineContext> ExecuteAsync(
GetRtEntitiesByTypeNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var entities = await _client.QueryEntitiesAsync(
config.CkTypeId,
config.Filter,
config.MaxResults,
cancellationToken);
context.SetData("entities", entities);
return context;
}
}
Transform Node Example
A transform node processes data:
public class DataMappingNodeConfiguration : ITransformNodeConfiguration
{
public List<MappingEntry> Mappings { get; set; } = new();
}
public class MappingEntry
{
public string SourcePath { get; set; }
public string TargetPath { get; set; }
public string TransformExpression { get; set; }
}
public class DataMappingNode : TransformNode<DataMappingNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
DataMappingNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var sourceData = context.GetData<object>("source");
var result = new Dictionary<string, object>();
foreach (var mapping in config.Mappings)
{
var value = ExtractValue(sourceData, mapping.SourcePath);
if (!string.IsNullOrEmpty(mapping.TransformExpression))
{
value = ApplyTransform(value, mapping.TransformExpression);
}
SetValue(result, mapping.TargetPath, value);
}
context.SetData("mapped", result);
return context;
}
}
Load Node Example
A load node stores data:
public class ApplyChangesNodeConfiguration : ILoadNodeConfiguration
{
public UpdateKind UpdateKind { get; set; } = UpdateKind.CreateOrUpdate;
public bool ValidateBeforeSave { get; set; } = true;
}
public class ApplyChangesNode : LoadNode<ApplyChangesNodeConfiguration>
{
private readonly IAssetRepositoryClient _client;
public ApplyChangesNode(IAssetRepositoryClient client)
{
_client = client;
}
public override async Task<PipelineContext> ExecuteAsync(
ApplyChangesNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
var updateInfo = context.GetData<UpdateInfo>("updateInfo");
var result = await _client.ApplyChangesAsync(
updateInfo,
config.UpdateKind,
cancellationToken);
context.SetData("result", result);
return context;
}
}
Pipeline Configuration
Defining a Pipeline
Pipelines are configured using JSON or code:
{
"pipelineId": "energy-meter-sync",
"name": "Energy Meter Synchronization",
"description": "Syncs energy meter data from external API",
"nodes": [
{
"nodeType": "FromSchedule",
"config": {
"cronExpression": "0 */5 * * * *"
}
},
{
"nodeType": "MakeHttpRequest",
"config": {
"url": "https://api.external.com/meters",
"method": "GET",
"headers": {
"Authorization": "Bearer ${secrets.apiKey}"
}
}
},
{
"nodeType": "DataMapping",
"config": {
"mappings": [
{
"sourcePath": "$.data[*].id",
"targetPath": "serialNumber"
},
{
"sourcePath": "$.data[*].reading",
"targetPath": "currentValue"
}
]
}
},
{
"nodeType": "CreateUpdateInfo",
"config": {
"ckTypeId": "Industry.Energy/EnergyMeter",
"updateKind": "CreateOrUpdate"
}
},
{
"nodeType": "ApplyChanges",
"config": {
"validateBeforeSave": true
}
}
]
}
Pipeline Builder API
var pipeline = PipelineBuilder.Create("energy-meter-sync")
.WithTrigger<FromScheduleNode>(config =>
{
config.CronExpression = "0 */5 * * * *";
})
.AddExtract<MakeHttpRequestNode>(config =>
{
config.Url = "https://api.external.com/meters";
config.Method = HttpMethod.Get;
})
.AddTransform<DataMappingNode>(config =>
{
config.Mappings.Add(new MappingEntry
{
SourcePath = "$.data[*].id",
TargetPath = "serialNumber"
});
})
.AddTransform<CreateUpdateInfoNode>(config =>
{
config.CkTypeId = "Industry.Energy/EnergyMeter";
})
.AddLoad<ApplyChangesNode>(config =>
{
config.ValidateBeforeSave = true;
})
.Build();
Adapter Configuration
appsettings.json
{
"OctoMesh": {
"IdentityServiceUrl": "https://identity.octomesh.local",
"AssetRepositoryUrl": "https://asset-repo.octomesh.local",
"CommunicationControllerUrl": "https://com-controller.octomesh.local",
"TenantId": "my-tenant",
"ClientId": "my-adapter",
"ClientSecret": "${secrets.clientSecret}"
},
"Adapter": {
"PoolId": "edge-pool-01",
"AdapterId": "energy-adapter-01"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Meshmakers": "Debug"
}
}
}
Dependency Injection Setup
public class Program
{
public static async Task Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Configure OctoMesh services
builder.Services.AddOctoMeshClient(options =>
{
options.IdentityServiceUrl = builder.Configuration["OctoMesh:IdentityServiceUrl"];
options.AssetRepositoryUrl = builder.Configuration["OctoMesh:AssetRepositoryUrl"];
options.TenantId = builder.Configuration["OctoMesh:TenantId"];
});
// Register adapter
builder.Services.AddMeshAdapter(options =>
{
options.PoolId = builder.Configuration["Adapter:PoolId"];
options.AdapterId = builder.Configuration["Adapter:AdapterId"];
});
// Register custom nodes
builder.Services.AddTransient<MyCustomNode>();
var host = builder.Build();
await host.RunAsync();
}
}
Logging in Nodes
Implement proper logging for debugging and monitoring:
public class MyCustomNode : TransformNode<MyCustomNodeConfiguration>
{
private readonly ILogger<MyCustomNode> _logger;
public MyCustomNode(ILogger<MyCustomNode> logger)
{
_logger = logger;
}
public override async Task<PipelineContext> ExecuteAsync(
MyCustomNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
_logger.LogInformation("Starting transformation with config: {@Config}", config);
try
{
// Node logic here
var result = await ProcessDataAsync(context, cancellationToken);
_logger.LogInformation("Transformation completed. Processed {Count} items", result.Count);
context.SetData("result", result);
return context;
}
catch (Exception ex)
{
_logger.LogError(ex, "Transformation failed");
throw;
}
}
}
Error Handling
Pipeline Error Handling
public class MyLoadNode : LoadNode<MyLoadNodeConfiguration>
{
public override async Task<PipelineContext> ExecuteAsync(
MyLoadNodeConfiguration config,
PipelineContext context,
CancellationToken cancellationToken)
{
try
{
await SaveDataAsync(context, cancellationToken);
}
catch (ValidationException ex)
{
// Mark as partial failure, continue pipeline
context.AddWarning($"Validation failed: {ex.Message}");
}
catch (ConnectionException ex)
{
// Retry logic
for (int i = 0; i < 3; i++)
{
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)), cancellationToken);
try
{
await SaveDataAsync(context, cancellationToken);
break;
}
catch { /* Continue retry */ }
}
}
return context;
}
}
Testing Adapters
Unit Testing Nodes
[TestClass]
public class DataMappingNodeTests
{
[TestMethod]
public async Task ExecuteAsync_WithValidMapping_ShouldMapData()
{
// Arrange
var node = new DataMappingNode();
var config = new DataMappingNodeConfiguration
{
Mappings = new List<MappingEntry>
{
new() { SourcePath = "$.name", TargetPath = "displayName" }
}
};
var context = new PipelineContext();
context.SetData("source", new { name = "Test" });
// Act
var result = await node.ExecuteAsync(config, context, CancellationToken.None);
// Assert
var mapped = result.GetData<Dictionary<string, object>>("mapped");
Assert.AreEqual("Test", mapped["displayName"]);
}
}
Integration Testing
[TestClass]
public class EnergyAdapterIntegrationTests
{
private IHost _host;
[TestInitialize]
public void Setup()
{
_host = Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
services.AddOctoMeshClient(options =>
{
options.IdentityServiceUrl = "https://localhost:5003";
// ... test configuration
});
})
.Build();
}
[TestMethod]
public async Task Pipeline_ShouldSyncEnergyMeters()
{
// Test implementation
}
}
Deployment
Docker Container
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["MyAdapter.csproj", "."]
RUN dotnet restore
COPY . .
RUN dotnet build -c Release -o /app/build
FROM build AS publish
RUN dotnet publish -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "MyAdapter.dll"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: energy-adapter
namespace: octo-adapters
spec:
replicas: 1
selector:
matchLabels:
app: energy-adapter
template:
metadata:
labels:
app: energy-adapter
spec:
containers:
- name: adapter
image: myregistry/energy-adapter:latest
env:
- name: OctoMesh__TenantId
value: "production"
- name: OctoMesh__ClientSecret
valueFrom:
secretKeyRef:
name: adapter-secrets
key: client-secret
Next Steps
- Create Edge Adapter: Detailed edge adapter guide
- Create Mesh Adapter: Detailed mesh adapter guide
- Trigger Nodes: Creating custom triggers
- API Reference: Complete node reference