Skip to main content

Write your first Edge Adapter

This tutorial will help you to write your first Edge Adapter for OctoMesh.

For brevity, we will assume that you have already set up your development environment and have a basic understanding of C# and .NET Core.

info

We will also assume that you have set up a C# project as documented in the Creating Adapter Project section.

note

For a reference implementation you can check the following repository

Overview

We will execute the following steps:

  1. Add configuration to run the adapter
  2. Implement adapter logic
  3. Implement pipeline logic

1. Add configuration to run the adapter

Navigate to the AdminUI and create a new adapter:

Create a new Edge Adapter

After creating the adapter, we need the runtime identifier (rtId) to configure the adapter in the code.

Copy RtId

With that information in the clipboard, we can move to the next part.

2. Implement adapter logic

Update the launch profile to include the adapter configuration.

{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"AdapterEdgeDemo": {
"commandName": "Project",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"OCTO_ADAPTER__TENANTID": meshtest,
"OCTO_ADAPTER__ADAPTERRTID": 6760711ec4ff02221e0b532d
}
}
}
}

The OCTO_ADAPTER__TENANTID is the tenant id of the adapter. The OCTO_ADAPTER__ADAPTERRTID is the runtime identifier of the adapter, which is used to identify the adapter in the system. Replace the values with the ones you copied from the AdminUI.

In Program.cs start with configuring the adapter.

using Meshmakers.Octo.Communication.EdgeAdapter.Demo.Services;
using Meshmakers.Octo.Sdk.Common.Adapters;
using Meshmakers.Octo.Sdk.Common.EtlDataPipeline;
using Meshmakers.Octo.Sdk.Common.Services;

// AdapterBuilder is used for active adapters (connecting to external systems)
var adapterBuilder = new AdapterBuilder();

adapterBuilder.Run(args, (_, services) =>
{
// Register data pipeline services
services.AddDataPipeline()
// Register the ETL context to access meta data of the execution (e.g. tenant id, pipeline id, ...)
.RegisterEtlContext<IEtlContext>();
services.AddSingleton<IAdapterService, AdapterEdgeDemoService>();
});

Next we have to implement the IAdapterService interface, create a new class AdapterEdgeDemoService and implement the interface IAdapterServiceas shown below.

using Meshmakers.Octo.Common.DistributionEventHub.Services;
using Meshmakers.Octo.Communication.Contracts.DataTransferObjects;
using Meshmakers.Octo.Sdk.Common.Adapters;
using Meshmakers.Octo.Sdk.Common.Services;

namespace Meshmakers.Octo.Communication.EdgeAdapter.Demo.Services;

/// <summary>
/// This is the main service for the plug. It is responsible for starting and stopping the plug.
/// Shutdown is called when the plug is stopped or new configuration is received.
/// Startup is called when the plug starts or receives a new configuration.
/// </summary>
/// <param name="pipelineRegistryService">Service for registering and starting pipelines</param>
/// <param name="eventHubControl">Event hub control service</param>
public class AdapterEdgeDemoService(
ILogger<AdapterEdgeDemoService> logger,
IPipelineRegistryService pipelineRegistryService,
IEventHubControl eventHubControl)
: IAdapterService
{
public Task<bool> StartupAsync(AdapterStartup adapterStartup, List<DeploymentUpdateErrorMessageDto> errorMessages, CancellationToken stoppingToken)
{
logger.LogInformation("Startup");

try
{
return Task.FromResult(true);
}
catch (Exception e)
{
logger.LogError(e, "Error while startup");
throw;
}
}

public Task ShutdownAsync(AdapterShutdown adapterShutdown, CancellationToken stoppingToken)
{
try
{
logger.LogInformation("Shutdown");

logger.LogInformation("Shutdown complete");
return Task.CompletedTask;
}
catch (Exception e)
{
logger.LogError(e, "Error while shutdown");
throw;
}
}
}

After these steps, you can run the adapter and see the log output.

In the Admin UI, you should see the adapter to be online!

Adapter online

Now we proceed to the next step.

3. Implement node logic

First, we have to create a node configuration record. This record is used to serialize and deserialize configuration data for the node.

[NodeName("Demo", 1)]
public record DemoNodeConfiguration : SourceTargetPathNodeConfiguration
{
}

NodeName defines the name and the version of the node. An adapter may have multiple nodes with the same name but different versions. Typically, a node has input and output data. The SourceTargetPathNodeConfiguration is used to define the input and output data path (Path, TargetPath) for the node.

Extract and load nodes may have only input data or only output data. In this case, the PathNodeConfiguration or TargetPathNodeConfiguration is used.

Second, we need to create the node itself.

[NodeConfiguration(typeof(DemoNodeConfiguration))]
public class DemoNode(NodeDelegate next) : IPipelineNode
{
public async Task ProcessObjectAsync(IDataContext dataContext, INodeContext nodeContext)
{
// Continue with next node in pipeline
await next(dataContext, nodeContext);
}
}

The NodeConfiguration attribute is used to define the configuration record for the node.

Third, we have to modify the startup configuration to include the node.

// AdapterBuilder is used for active adapters (connecting to external systems)
var adapterBuilder = new AdapterBuilder();

adapterBuilder.Run(args, (_, services) =>
{
// Register data pipeline services
services.AddDataPipeline()
.RegisterNode<DemoNode>() // Sample to register a node
// Register the ETL context to access meta data of the execution (e.g. tenant id, pipeline id, ...)
.RegisterEtlContext<IEtlContext>();
services.AddTransient<IPollingService, PollingService>();
services.AddSingleton<IAdapterService, AdapterEdgeDemoService>();
});

Next, we have to implement the node logic. We extend the node configuration with a new property MyMessage.

[NodeName("Demo", 1)]
public record DemoNodeConfiguration : TargetPathNodeConfiguration
{
public required string MyMessage { get; set; } = "Hello, World!";
}

We want to write the message to the target path.

[NodeConfiguration(typeof(DemoNodeConfiguration))]
public class DemoNode(NodeDelegate next) : IPipelineNode
{
public async Task ProcessObjectAsync(IDataContext dataContext, INodeContext nodeContext)
{
// Get configuration
var c = nodeContext.GetNodeConfiguration<DemoNodeConfiguration>();

// set value
dataContext.SetValueByPath(c.TargetPath, c.DocumentMode, c.TargetValueKind, c.TargetValueWriteMode,
c.MyMessage);

// Continue with next node in pipeline
await next(dataContext, nodeContext);
}
}

Now we have to update the adapter service to include:

  • Registration of pipelines that needs to be executed
  • Start of trigger nodes that execute the pipeline at a specific event
public class AdapterEdgeDemoService(
ILogger<AdapterEdgeDemoService> logger,
IPipelineRegistryService pipelineRegistryService,
IEventHubControl eventHubControl)
: IAdapterService
{
public async Task<bool> StartupAsync(AdapterStartup adapterStartup, List<DeploymentUpdateErrorMessageDto> errorMessages, CancellationToken stoppingToken)
{
logger.LogInformation("Startup");

try
{
// adapterStartup contains configuration:
// Adapter configuration (optional) and a list of pipelines to be executed by this adapter.
// Pipelines are a sequence of nodes that process data.
// The pipeline is registered with the pipeline registry service.

// Register pipelines
var success = await pipelineRegistryService.RegisterPipelinesAsync(adapterStartup.TenantId,
adapterStartup.Configuration.Pipelines, errorMessages);

// If success is false, at least one pipeline failed to register, and the errorMessages list contains the error messages.
// The adapter should start to execute the rest of the pipelines.

// Start triggers. Triggers are special nodes that start the pipeline execution based on some event.
await pipelineRegistryService.StartTriggerPipelineNodesAsync(adapterStartup.TenantId);

// Start connection to rabbitmq event hub
await eventHubControl.StartAsync(stoppingToken);

return success;
}
catch (Exception e)
{
logger.LogError(e, "Error while startup");
throw;
}
}

public async Task ShutdownAsync(AdapterShutdown adapterShutdown, CancellationToken stoppingToken)
{
try
{
logger.LogInformation("Shutdown");

// Stop triggers
await pipelineRegistryService.StopTriggerPipelineNodesAsync(adapterShutdown.TenantId);

// Unregister pipelines
pipelineRegistryService.UnregisterAllPipelines(adapterShutdown.TenantId);

// Stop connection rabbitmq event hub
await eventHubControl.StopAsync(stoppingToken);

logger.LogInformation("Shutdown complete");
}
catch (Exception e)
{
logger.LogError(e, "Error while shutdown");
throw;
}
}
}

Your adapter is now ready to execute the pipeline. The last step in the puzzle is to configure the data pipeline in the AdminUI. We need to create a new data pipeline.

Create the pipeline

A data pipeline consists of two sub-pipelines: Edge and Mesh. Edge pipelines are executed on the edge, while Mesh pipelines are executed in the cloud. Edge pipelines are optional and can be used to preprocess data before sending it to the cloud.

With the checkbox, you can decide if the pipeline should be executed on the edge and/or in the cloud.

Now we have told the adapter to execute the pipeline, but we have to configure the pipeline, which can also be done in the AdminUI.

By hitting the Edit/Debug definitions button after saving the data pipeline, you can see the Pipeline Builder UI where to enter the pipeline definition.

A basic pipeline definition could look like this:

  triggers:
- type: FromPolling@1
interval: 00:00:10
transformations:
- type: Demo@1
description: Simulates data
myMessage: Hello, Mars!

This pipeline definition contains a trigger node that polls every 10 seconds and a transformation node that writes the message "Hello, Mars!" to the target path.

Save the pipeline and deploy (download) the pipeline definition to the adapter.

Deploy the pipeline

After 10 seconds, you should see the message in the log output of the adapter.

DEBUG|[meshtest] Running pipeline for pipeline System.Communication/EdgePipeline@67e132d6477e78e980bbb512 as run with execution id e2d7e1fd-444f-40da-917c-a00afb6a71a2
DEBUG|Connected: guest@localhost:5672/ (address: amqp://localhost:5672, local: 61536)
DEBUG|Endpoint Ready: rabbitmq://localhost/MacBookProvonG_MeshmakersOcto_bus_65jyyyyc8i3jzmg1bdqsixornw?temporary=true
INFO|Bus started: rabbitmq://localhost/
INFO|PipelineExecution: Executing pipeline
DEBUG|PipelineExecution: Node completed
DEBUG|PipelineExecution/Demo@1: Forward Executing
DEBUG|PipelineExecution/Demo@1: Node completed
DEBUG|PipelineExecution/Demo@1: Reverse completed
INFO|PipelineExecution: Pipeline completed
warning

For a reference implementation you can check the following repository. This repository contains a more advanced adapter implementation using Containers including a OctoMesh ETL pipeline and CI/CD pipeline based on Azure DevOps