Skip to main content

IStreamDataRepository

Namespace: Meshmakers.Octo.Runtime.Contracts.StreamData

Repository for stream data operations against a tenant-scoped, CrateDB-backed time-series store organised by CkArchive instances. Accessed via ITenantContext.GetStreamDataRepository(). Every operation is scoped to a single archive, addressed by its runtime id.

public interface IStreamDataRepository

Methods

EnsureDatabaseCreatedAsync()

Ensures the tenant's stream data namespace (e.g. CrateDB schema) exists. Idempotent. Called when a tenant opts into stream data; per-archive tables are created later via IStreamDataRepository.EnsureArchiveCreatedAsync(ArchiveSnapshot).

Task EnsureDatabaseCreatedAsync()

Returns

Task

DeleteDatabaseAsync()

Drops the tenant's entire stream data namespace including every archive table. Idempotent.

Task DeleteDatabaseAsync()

Returns

Task

EnsureArchiveCreatedAsync(ArchiveSnapshot)

Creates the storage table for the archive described by according to its current CkArchive definition. The snapshot carries the target CK type and user-picked columns the data store needs to generate DDL — passing it directly avoids a round-trip through IArchiveRuntimeStore from inside the repository. Idempotent (uses CREATE TABLE IF NOT EXISTS) so retries after a transient Mongo update failure converge cleanly.

Task EnsureArchiveCreatedAsync(ArchiveSnapshot snapshot)

Parameters

snapshot ArchiveSnapshot

Returns

Task

DeleteArchiveAsync(OctoObjectId)

Drops the storage table for the archive identified by . Idempotent. Called from the lifecycle service when an archive is deleted.

Task DeleteArchiveAsync(OctoObjectId archiveRtId)

Parameters

archiveRtId OctoObjectId

Returns

Task

InsertAsync(OctoObjectId, StreamDataPoint)

Inserts a single data point into the archive. Throws ArchiveNotActivatedException if the archive's status is not Activated; throws RequiredAttributeMissingException on a missing required path.

Task InsertAsync(OctoObjectId archiveRtId, StreamDataPoint datapoint)

Parameters

archiveRtId OctoObjectId

datapoint StreamDataPoint

Returns

Task

InsertAsync(OctoObjectId, IEnumerable<StreamDataPoint>)

Inserts multiple data points into the archive. Pre-validates the entire batch before any SQL is sent; on first violation no row is written and the offending point's index is surfaced via the thrown exception.

Task InsertAsync(OctoObjectId archiveRtId, IEnumerable<StreamDataPoint> datapoints)

Parameters

archiveRtId OctoObjectId

datapoints IEnumerable<StreamDataPoint>

Returns

Task

InsertTimeRangeAsync(OctoObjectId, IEnumerable<TimeRangeStreamDataPoint>, CancellationToken)

Inserts externally pre-aggregated time-range data points into a TimeRangeArchive. Each point carries an explicit [from, to) window; the natural key (window_start, window_end, rtid, ckTypeId) handles re-deliveries via ON CONFLICT DO UPDATE, setting the row's was_updated flag to true on every upsert. Concept §3 / §5. Throws ArchiveNotActivatedException if the archive is not in Activated state, and ArgumentException if any point has To <= From.

Task InsertTimeRangeAsync(OctoObjectId archiveRtId, IEnumerable<TimeRangeStreamDataPoint> datapoints, CancellationToken cancellationToken)

Parameters

archiveRtId OctoObjectId

datapoints IEnumerable<TimeRangeStreamDataPoint>

cancellationToken CancellationToken

Returns

Task

ExecuteQueryAsync(OctoObjectId, StreamDataQueryOptions)

Executes a simple stream data query against the archive.

Task<StreamDataQueryResult> ExecuteQueryAsync(OctoObjectId archiveRtId, StreamDataQueryOptions options)

Parameters

archiveRtId OctoObjectId

options StreamDataQueryOptions

Returns

Task<StreamDataQueryResult>

ExecuteAggregationQueryAsync(OctoObjectId, StreamDataAggregationQueryOptions)

Executes an aggregation query (without grouping) against the archive.

Task<StreamDataQueryResult> ExecuteAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataAggregationQueryOptions options)

Parameters

archiveRtId OctoObjectId

options StreamDataAggregationQueryOptions

Returns

Task<StreamDataQueryResult>

ExecuteGroupedAggregationQueryAsync(OctoObjectId, StreamDataGroupedAggregationQueryOptions)

Executes a grouped aggregation query against the archive.

Task<StreamDataQueryResult> ExecuteGroupedAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataGroupedAggregationQueryOptions options)

Parameters

archiveRtId OctoObjectId

options StreamDataGroupedAggregationQueryOptions

Returns

Task<StreamDataQueryResult>

ExecuteDownsamplingQueryAsync(OctoObjectId, StreamDataDownsamplingQueryOptions)

Executes a downsampling query with time bins against the archive.

Task<StreamDataQueryResult> ExecuteDownsamplingQueryAsync(OctoObjectId archiveRtId, StreamDataDownsamplingQueryOptions options)

Parameters

archiveRtId OctoObjectId

options StreamDataDownsamplingQueryOptions

Returns

Task<StreamDataQueryResult>

AggregateBucketAsync(ArchiveSnapshot, RollupArchiveSnapshot, DateTime, DateTime, CancellationToken)

Aggregates one bucket from into : reads source rows with timestamp ∈ [bucketStart, bucketEnd), groups by rtId, applies the CkRollupAggregationSpec aggregations, and upserts one row per entity into the rollup archive's table with timestamp = bucketEnd. Rollup-archives concept §5.

Task<int> AggregateBucketAsync(ArchiveSnapshot sourceArchive, RollupArchiveSnapshot rollup, DateTime bucketStart, DateTime bucketEnd, CancellationToken cancellationToken)

Parameters

sourceArchive ArchiveSnapshot

rollup RollupArchiveSnapshot

bucketStart DateTime

bucketEnd DateTime

cancellationToken CancellationToken

Returns

Task<Int32>

Remarks:

Idempotent on the natural key (timestamp, rtId): when the same bucket is re-aggregated (e.g. after a watermark rewind, or a crash before IRollupArchiveRuntimeStore.AdvanceWatermarkAsync(OctoObjectId, DateTime, Boolean) committed), the implementation must collapse duplicates via the data store's upsert primitive (CrateDB: ON CONFLICT (timestamp, rtId) DO UPDATE) so the orchestrator can always retry safely. Returns the number of upserted target rows.

GetArchiveStatsAsync(IReadOnlyList<OctoObjectId>, CancellationToken)

Returns per-archive storage stats (row count, on-disk size, health) for each entry. Bulk call so the studio's archives list can render stats columns without an N+1 round-trip per row. Archives whose backing table doesn't exist yet (not activated, or post-delete) appear in the result with ArchiveStorageStats.TableExists false and zero counters — the caller is not expected to filter the input list beforehand.

Task<IReadOnlyDictionary<OctoObjectId, ArchiveStorageStats>> GetArchiveStatsAsync(IReadOnlyList<OctoObjectId> archiveRtIds, CancellationToken cancellationToken)

Parameters

archiveRtIds IReadOnlyList<OctoObjectId>

cancellationToken CancellationToken

Returns

Task<IReadOnlyDictionary<OctoObjectId, ArchiveStorageStats>>

Remarks:

Implementations may issue a single underlying query against their introspection surface (e.g. CrateDB sys.shards + sys.health) and return one entry per requested rtId. Order of returned entries is implementation-defined; callers must look up by rtId.