IStreamDataRepository
Namespace: Meshmakers.Octo.Runtime.Contracts.StreamData
Repository for stream data operations against a tenant-scoped, CrateDB-backed time-series store
organised by CkArchive instances. Accessed via
ITenantContext.GetStreamDataRepository(). Every operation is scoped to a single archive,
addressed by its runtime id.
public interface IStreamDataRepository
Methods
EnsureDatabaseCreatedAsync()
Ensures the tenant's stream data namespace (e.g. CrateDB schema) exists. Idempotent. Called when a tenant opts into stream data; per-archive tables are created later via IStreamDataRepository.EnsureArchiveCreatedAsync(ArchiveSnapshot).
Task EnsureDatabaseCreatedAsync()
Returns
DeleteDatabaseAsync()
Drops the tenant's entire stream data namespace including every archive table. Idempotent.
Task DeleteDatabaseAsync()
Returns
EnsureArchiveCreatedAsync(ArchiveSnapshot)
Creates the storage table for the archive described by according
to its current CkArchive definition. The snapshot carries the target CK type and
user-picked columns the data store needs to generate DDL — passing it directly avoids a
round-trip through IArchiveRuntimeStore from inside the repository.
Idempotent (uses CREATE TABLE IF NOT EXISTS) so retries after a transient Mongo
update failure converge cleanly.
Task EnsureArchiveCreatedAsync(ArchiveSnapshot snapshot)
Parameters
snapshot ArchiveSnapshot
Returns
DeleteArchiveAsync(OctoObjectId)
Drops the storage table for the archive identified by . Idempotent. Called from the lifecycle service when an archive is deleted.
Task DeleteArchiveAsync(OctoObjectId archiveRtId)
Parameters
archiveRtId OctoObjectId
Returns
InsertAsync(OctoObjectId, StreamDataPoint)
Inserts a single data point into the archive. Throws
ArchiveNotActivatedException if the archive's status is not Activated;
throws RequiredAttributeMissingException on a missing required path.
Task InsertAsync(OctoObjectId archiveRtId, StreamDataPoint datapoint)
Parameters
archiveRtId OctoObjectId
datapoint StreamDataPoint
Returns
InsertAsync(OctoObjectId, IEnumerable<StreamDataPoint>)
Inserts multiple data points into the archive. Pre-validates the entire batch before any SQL is sent; on first violation no row is written and the offending point's index is surfaced via the thrown exception.
Task InsertAsync(OctoObjectId archiveRtId, IEnumerable<StreamDataPoint> datapoints)
Parameters
archiveRtId OctoObjectId
datapoints IEnumerable<StreamDataPoint>
Returns
InsertTimeRangeAsync(OctoObjectId, IEnumerable<TimeRangeStreamDataPoint>, CancellationToken)
Inserts externally pre-aggregated time-range data points into a TimeRangeArchive.
Each point carries an explicit [from, to) window; the natural key
(window_start, window_end, rtid, ckTypeId) handles re-deliveries via
ON CONFLICT DO UPDATE, setting the row's was_updated flag to true on every
upsert. Concept §3 / §5. Throws ArchiveNotActivatedException if the archive is not
in Activated state, and ArgumentException if any point has To <= From.
Task InsertTimeRangeAsync(OctoObjectId archiveRtId, IEnumerable<TimeRangeStreamDataPoint> datapoints, CancellationToken cancellationToken)
Parameters
archiveRtId OctoObjectId
datapoints IEnumerable<TimeRangeStreamDataPoint>
cancellationToken CancellationToken
Returns
ExecuteQueryAsync(OctoObjectId, StreamDataQueryOptions)
Executes a simple stream data query against the archive.
Task<StreamDataQueryResult> ExecuteQueryAsync(OctoObjectId archiveRtId, StreamDataQueryOptions options)
Parameters
archiveRtId OctoObjectId
options StreamDataQueryOptions
Returns
ExecuteAggregationQueryAsync(OctoObjectId, StreamDataAggregationQueryOptions)
Executes an aggregation query (without grouping) against the archive.
Task<StreamDataQueryResult> ExecuteAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataAggregationQueryOptions options)
Parameters
archiveRtId OctoObjectId
options StreamDataAggregationQueryOptions
Returns
ExecuteGroupedAggregationQueryAsync(OctoObjectId, StreamDataGroupedAggregationQueryOptions)
Executes a grouped aggregation query against the archive.
Task<StreamDataQueryResult> ExecuteGroupedAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataGroupedAggregationQueryOptions options)
Parameters
archiveRtId OctoObjectId
options StreamDataGroupedAggregationQueryOptions
Returns
ExecuteDownsamplingQueryAsync(OctoObjectId, StreamDataDownsamplingQueryOptions)
Executes a downsampling query with time bins against the archive.
Task<StreamDataQueryResult> ExecuteDownsamplingQueryAsync(OctoObjectId archiveRtId, StreamDataDownsamplingQueryOptions options)
Parameters
archiveRtId OctoObjectId
options StreamDataDownsamplingQueryOptions
Returns
AggregateBucketAsync(ArchiveSnapshot, RollupArchiveSnapshot, DateTime, DateTime, CancellationToken)
Aggregates one bucket from into :
reads source rows with timestamp ∈ [bucketStart, bucketEnd), groups by rtId,
applies the CkRollupAggregationSpec aggregations, and upserts one row per
entity into the rollup archive's table with timestamp = bucketEnd. Rollup-archives
concept §5.
Task<int> AggregateBucketAsync(ArchiveSnapshot sourceArchive, RollupArchiveSnapshot rollup, DateTime bucketStart, DateTime bucketEnd, CancellationToken cancellationToken)
Parameters
sourceArchive ArchiveSnapshot
rollup RollupArchiveSnapshot
bucketStart DateTime
bucketEnd DateTime
cancellationToken CancellationToken
Returns
Remarks:
Idempotent on the natural key (timestamp, rtId): when the same bucket is
re-aggregated (e.g. after a watermark rewind, or a crash before
IRollupArchiveRuntimeStore.AdvanceWatermarkAsync(OctoObjectId, DateTime, Boolean) committed), the
implementation must collapse duplicates via the data store's upsert primitive
(CrateDB: ON CONFLICT (timestamp, rtId) DO UPDATE) so the orchestrator can
always retry safely. Returns the number of upserted target rows.
GetArchiveStatsAsync(IReadOnlyList<OctoObjectId>, CancellationToken)
Returns per-archive storage stats (row count, on-disk size, health) for each entry. Bulk call so the studio's archives list can render stats columns without an N+1 round-trip per row. Archives whose backing table doesn't exist yet (not activated, or post-delete) appear in the result with ArchiveStorageStats.TableExists false and zero counters — the caller is not expected to filter the input list beforehand.
Task<IReadOnlyDictionary<OctoObjectId, ArchiveStorageStats>> GetArchiveStatsAsync(IReadOnlyList<OctoObjectId> archiveRtIds, CancellationToken cancellationToken)
Parameters
archiveRtIds IReadOnlyList<OctoObjectId>
cancellationToken CancellationToken
Returns
Task<IReadOnlyDictionary<OctoObjectId, ArchiveStorageStats>>
Remarks:
Implementations may issue a single underlying query against their introspection surface
(e.g. CrateDB sys.shards + sys.health) and return one entry per requested
rtId. Order of returned entries is implementation-defined; callers must look up by rtId.