Skip to main content

Write your first Trigger Node

This tutorial will help you to write your first Trigger Node for OctoMesh.

For brevity, we will assume that you have already set up your development environment and have a basic understanding of C# and .NET Core.

info

We will also assume that you have set up an Edge or Mesh Adapter using the tutorial Write your first Edge Adapter or Write your first Mesh Adapter.

note

For a reference implementation you can check the following repository

Overview

Trigger Nodes are used to trigger the transformation part in the OctoMesh system. Triggers are based on events (internally or externally). A pipeline definition may contain multiple triggers, but needs at least one trigger to start the pipeline execution.

Examples of triggers are:

  • FromExecutePipelineCommand: A trigger that starts a pipeline execution from a command from AdminUI or CLI.
  • FromHttpRequest: A trigger that starts a pipeline execution from an HTTP request.
  • FromPipelineTriggerEvent: A trigger that starts a pipeline execution from an event (from BotService).

Trigger start the execution of the transformation part of the pipeline. They are able to pass input data to the transformation part. Also, the output of the transformation part can be passed back to the trigger and further to be passed to an external system.

In this tutorial, we will create a trigger that starts a pipeline execution based on a TCP request. For this, we need to do following steps:

  1. Create a new node configuration
  2. Create the trigger node
  3. Implement the trigger logic

1. Create a new node configuration

Create a new class that extends the TriggerNodeConfiguration class from the OctoMesh SDK.

using Meshmakers.Octo.Sdk.Common;

[NodeName("DemoTrigger", 1)]
public record DemoTriggerNodeConfiguration : TriggerNodeConfiguration
{
/// <summary>
/// Port the sample TCP listener listens to
/// </summary>
public required ushort Port { get; set; } = 8000;
}

This node configuration class is used to configure the trigger node. The NodeName attribute is used to define the name of the node and the version of the node. It is important to define the version of the node, because the version is used to distinguish between different versions of the same node.

We define a property Port that is used to configure the port the TCP listener listens to. The default value is 8000.

2. Create the trigger node

Create a new class that implements the ITriggerNode interface from the OctoMesh SDK.

[NodeConfiguration(typeof(DemoTriggerNodeConfiguration))]
// ReSharper disable once ClassNeverInstantiated.Global
public class DemoTriggerNode : ITriggerPipelineNode
{
public Task StartAsync(ITriggerContext context)
{
return Task.CompletedTask;
}

public Task StopAsync(ITriggerContext context)
{
return Task.CompletedTask;
}
}

The NodeConfiguration attribute is used to define the configuration class for the node. The DemoTriggerNode class implements the ITriggerPipelineNode interface. The StartAsync method is called when the trigger node is started. The StopAsync method is called when the trigger node stops.

Trigger nodes are started and stopped by the OctoMesh adapter when the adapter is started and stopped. This is done by the IAdapterService implementation of the adapter using the call StartTriggerPipelineNodesAsyncof the IPipelineRegistryService interface. The start and stop for that reason also happens if a new configuration is deployed to the adapter.

3. Implement the trigger logic

The trigger logic needs to open a TCP listener and listen for incoming connections. When a connection is established, the trigger node needs to start the pipeline execution. The input data is passed to the pipeline execution, and the output is passed back as a result of the TCP request.

Let's open the TCP listener and listen for incoming connections in the StartAsync method. In the StopAsync method, we stop the TCP listener.


```sharp
[NodeConfiguration(typeof(DemoTriggerNodeConfiguration))]
// ReSharper disable once ClassNeverInstantiated.Global
public class DemoTriggerNode( /* you can inject services here */) : ITriggerPipelineNode
{
private TcpListener? _tcpListener;
private CancellationTokenSource? _cts;


public Task StartAsync(ITriggerContext context)
{
var c = context.NodeContext.GetNodeConfiguration<DemoTriggerNodeConfiguration>();

// Listen to TCP port for incoming messages at all interfaces
_cts = new CancellationTokenSource();
_tcpListener = new TcpListener(IPAddress.Any, c.Port);
_tcpListener.Start();

context.NodeContext.Info("TCP listener started and waiting for connections...");

Task.Run(async () =>
{
// Wait for incoming connections
while (!_cts.Token.IsCancellationRequested)
{
try
{
var tcpClient = await _tcpListener.AcceptTcpClientAsync();
_ = ProcessClientAsync(tcpClient, context, _cts.Token); // Execute in a separate task
}
catch (ObjectDisposedException)
{
// The Listener was stopped
break;
}
}
});

return Task.CompletedTask;
}

public Task StopAsync(ITriggerContext context)
{
try
{
_cts?.Cancel();
_tcpListener?.Stop();
context.NodeContext.Info("TCP listener stopped.");
}
catch (Exception ex)
{
context.NodeContext.Error($"Error stopping: {ex.Message}");
throw DemoPipelineExecutionException.StopFailed(ex);
}

return Task.CompletedTask;
}

private async Task ProcessClientAsync(TcpClient client, ITriggerContext context,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}

We need to parse the incoming message as JSON and start the pipeline execution in the ProcessClientAsync method. We do some exception handling for JSON parsing and pipeline execution.

using Meshmakers.Octo.Sdk.Common.Services;
using Newtonsoft.Json;

namespace Meshmakers.Octo.Communication.MeshAdapter.Demo;

public class DemoPipelineExecutionException : PipelineExecutionException
{
public DemoPipelineExecutionException()
{
}

public DemoPipelineExecutionException(string message) : base(message)
{
}

public DemoPipelineExecutionException(string message, Exception inner) : base(message, inner)
{
}

public static Exception StopFailed(Exception exception)
{
return new DemoPipelineExecutionException("Stop failed", exception);
}

public static Exception PipelineExecutionFailed(Exception exception)
{
return new DemoPipelineExecutionException("Execution of pipeline failed", exception);
}

public static Exception MessageDeserializationFailed(JsonReaderException jsonReaderException)
{
return new DemoPipelineExecutionException("Message deserialization failed", jsonReaderException);
}
}

The next step is to implement the ProcessClientAsync method. This method reads incoming messages from the client, starts the pipeline execution, and sends the output back to the client.

We need to parse the incoming message as JSON and start the pipeline execution with the message as input. We do some exception handling for JSON parsing and pipeline execution.

  private async Task ProcessClientAsync(TcpClient client, ITriggerContext context, CancellationToken cancellationToken)
{
try
{
// Read incoming messages from the client
await using var networkStream = client.GetStream();
var buffer = new byte[1024];
int bytesRead;
var messageBuilder = new StringBuilder();
while ((bytesRead = await networkStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
{
context.NodeContext.Info("Client connected.");

// Process the incoming message
messageBuilder.Append(Encoding.UTF8.GetString(buffer, 0, bytesRead));
if (networkStream.DataAvailable)
{
continue;
}

var message = messageBuilder.ToString();
context.NodeContext.Info($"Received message: {message}");

// Parse the incoming message as JSON and start the pipeline execution with the message as input
var input = JToken.Parse(message);
var output = await context.ExecuteAsync(
new ExecutePipelineOptions(DateTime.UtcNow)
{
ExternalReceivedDateTime = DateTime.UtcNow
},
input);

// Serialize the output as JSON and send it back to the client
var outputString = JsonConvert.SerializeObject(output);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(outputString + Environment.NewLine);
await networkStream.WriteAsync(utf8Bytes, cancellationToken);
await networkStream.FlushAsync(cancellationToken);

messageBuilder.Clear();
}
}
catch (JsonReaderException ex)
{
context.NodeContext.Error($"Error parsing input: {ex.Message}");
throw DemoPipelineExecutionException.MessageDeserializationFailed(ex);
}
catch (Exception ex)
{
context.NodeContext.Error($"Error processing message: {ex.Message}");
throw DemoPipelineExecutionException.PipelineExecutionFailed(ex);
}
finally
{
client.Close();
}
}

4. Create pipeline definition with the trigger node

The last step is to create a pipeline definition that contains the trigger node. The pipeline definition is a YAML file that defines the pipeline structure.

We create a new pipeline in AdminUI and add the trigger node to the pipeline definition.

triggers:
- type: DemoTrigger@1
port: 8000
transformations:
- type: Demo@1
description: Simulates data
myMessage: Hello, Mars!

Start the adapter and deploy the pipeline definition to the adapter. The trigger node will start listening to incoming TCP connections on port 8000.

To test the trigger node, you can use a TCP client like netcat or telnet to send a message to the trigger node.

echo '{"message": "Hello, Earth!"}' | nc localhost 8000

You can also use the prepared PowerShell script to send a message to the trigger node.

When the trigger node receives a message, it starts the pipeline execution with the message as input. The output of the pipeline execution is sent back to the client. you can see the output in the console where the adapter is running.

Hello, Mars!