Skip to main content

IRollupOrchestrator

Namespace: Meshmakers.Octo.Runtime.Contracts.StreamData

Tenant-scoped orchestrator that drives CkRollupArchive aggregation. On each tick the orchestrator enumerates activated, non-frozen rollups via IRollupArchiveRuntimeStore and processes due buckets: for each bucket it issues an INSERT INTO target (...) SELECT ... FROM source WHERE timestamp ∈ bucket GROUP BY rtId against the stream-data store, then advances the watermark. DB-neutral: SQL emission lives in the data-store provider (IStreamDataRepository).

public interface IRollupOrchestrator

Remarks:

Concept §5. Operation order per bucket: upsert rows first, advance watermark last. On crash between the two, the next tick re-aggregates the same bucket; the upsert collapses duplicates via the natural key (timestamp, rtId). After maxFailures consecutive failures on the same rollup, status is flipped to CkArchiveStatus.Failed and an alert is recorded via IArchiveAuditTrail (§8).

Methods

TickAsync(CancellationToken)

Runs one orchestrator tick: enumerates all activated, non-frozen rollups for the tenant and processes any buckets that are due (i.e. bucketEnd ≤ now - WatermarkLag). Returns the number of buckets that were successfully committed in this tick. Safe to call concurrently with itself only when the underlying watermark store enforces single-writer semantics per rollup; in practice the background host invokes it sequentially per tenant.

Task<int> TickAsync(CancellationToken cancellationToken)

Parameters

cancellationToken CancellationToken

Returns

Task<Int32>

ProcessRollupAsync(OctoObjectId, CancellationToken)

Processes due buckets for a single rollup. Exposed for targeted runs (e.g. after a manual rewindRollupWatermark) and for tests. Returns the number of buckets committed.

Task<int> ProcessRollupAsync(OctoObjectId rollupRtId, CancellationToken cancellationToken)

Parameters

rollupRtId OctoObjectId

cancellationToken CancellationToken

Returns

Task<Int32>

RewindWatermarkAsync(OctoObjectId, DateTime)

Resets the rollup's watermark to (truncated down to the bucket boundary). Destructive: rows in the rewound range will be re-aggregated and may temporarily disagree with previously committed values until the orchestrator catches up. Backs the rewindRollupWatermark GraphQL mutation.

Task RewindWatermarkAsync(OctoObjectId rollupRtId, DateTime toBucketEnd)

Parameters

rollupRtId OctoObjectId

toBucketEnd DateTime

Returns

Task