Stream Data Archives
A Stream Data Archive is the unit of configuration and storage for time-series data in OctoMesh. Each archive is a versioned, typed, per-tenant CrateDB table that captures a curated set of attribute paths from a Construction Kit type. Archives have a strict lifecycle (Created → Activated → Disabled / Failed), an immutable schema once activated, and a three-tier activation gate (instance → tenant → archive) that determines whether the data plane is open.
Archives are runtime entities — instances of the System.StreamData CK model — and can be created, activated, disabled, retried, and deleted through the Refinery Studio, GraphQL, REST, or the octo-cli.
What an Archive Is
An archive bundles three concerns into one entity:
- Target CK type — the source-of-truth for the data shape. Inheritance is supported: an archive defined on type
Aapplies to all derived types; one CrateDB table per concrete type. - Column list — the attribute paths captured at ingestion time, mapped to typed CrateDB columns. Required vs. optional and indexed vs. non-indexed are configured per column.
- Status — the lifecycle state. Inserts and queries are accepted only when status is
Activated.
Where a CK model defines the shape of data, an archive defines what slice of that data is persisted to time series. Many archives can target the same CK type — different retention, different columns, different downsampling.
Properties
| Property | Description |
|---|---|
| Per-tenant | Each tenant has its own CrateDB schema. No cross-tenant data. |
| Typed | Each captured attribute path becomes a real CrateDB column with a declared type. |
| Immutable schema | Once activated, TargetCkTypeId and Columns cannot change. New version = new archive. |
| Multi-archive | A CK type may be captured by any number of archives concurrently. |
| Status-gated | Inserts and queries are accepted only while the archive's status is Activated. |
| Soft-delete preserves data | Disabling an archive blocks reads and writes but preserves the underlying CrateDB table. |
Archive Subtypes
The abstract base Archive has three concrete subtypes (System.StreamData CK model, ≥ 1.4.0):
| Subtype | Time axis | Ingestion path | Use case |
|---|---|---|---|
RawArchive | Single timestamp per row | External producers (SaveStreamDataInArchive@1, IStreamDataRepository.InsertAsync) | Instant measurements (sensor readings, machine telemetry, events) |
RollupArchive | Bucketed [bucketStart, bucketEnd) | System rollup orchestrator only — no external inserts | Derived bucketed aggregations of another archive (raw or rollup-of-rollup) |
TimeRangeArchive | Explicit [from, to) range per row | External producers (SaveTimeRangeStreamDataInArchive@1, IStreamDataRepository.InsertTimeRangeAsync) | Externally pre-aggregated data (EDA reports, smart meters, weather APIs) |
RawArchive and TimeRangeArchive are operator-defined and accept external data. RollupArchive columns are generated server-side from the configured aggregations — direct edits are rejected.
CK Model
Loaded only when StreamData is enabled at the instance level. Lives in System.StreamData (currently 1.4.0).
# Abstract base
ckType: Archive
isAbstract: true
attributes:
- TargetCkTypeId # CkId<CkTypeId>
- Columns # array<CkArchiveColumn>
- Status # CkArchiveStatus
- RawRetentionMs # int64, optional — partition-drop horizon
# Concrete subtypes
ckType: RawArchive (derivedFrom: Archive)
ckType: TimeRangeArchive (derivedFrom: Archive)
attributes:
- Period # TimeSpan, optional — advisory hint
ckType: RollupArchive (derivedFrom: Archive)
attributes:
- SourceArchiveRtId # rtId of the archive this rollup reads from
- BucketSizeMs # int64
- BucketAlignment # FixedSize | CalendarDay | Iso8601Week | CalendarMonth | CalendarYear
- WatermarkLagMs # int64 — how far behind real time the orchestrator stays
- LastAggregatedBucketEnd # DateTime, optional
- Aggregations # array<CkRollupAggregation>
- FrozenUntil # DateTime, optional
ckEnum: CkArchiveStatus { Created, Activated, Disabled, Failed }
ckEnum: CkRollupFunction { Avg, Min, Max, Sum, Count }
ckEnum: BucketAlignment { FixedSize, CalendarDay, Iso8601Week, CalendarMonth, CalendarYear }
ckRecord: CkArchiveColumn
attributes:
- Path # attribute path, e.g. "sensor.reading.value" or "readings[*].value"
- Required # bool
- Indexed # bool (default true — opt-out only when justified)
ckRecord: CkRollupAggregation
attributes:
- SourcePath # attribute path on the source archive's CK type
- Function # Avg | Min | Max | Sum | Count
- TargetColumnName # optional override; otherwise derived from SourcePath + Function
Storage Layout
Naming
| Element | Convention |
|---|---|
| Schema | clean(tenantRtCkId.SemanticVersionedFullName), max 63 chars |
| Table | clean(targetCkType.SVF) + "_" + clean(archiveRtCkId.SVF), max 200 chars |
| Column | camelCase from CK attribute id; nested paths concatenated camelCase via ColumnNameMapper |
clean() strips non-alphanumerics. When a generated name exceeds the length limit, the helper truncates and appends a deterministic SHA-256 suffix ({truncated}_{hash16}). The truncation is logged at Information with both names for traceability.
Standard columns (every archive table)
| Column | Type | Notes |
|---|---|---|
rtId | TEXT | Entity instance id |
timestamp | TIMESTAMP WITH TIME ZONE | Datapoint timestamp (UTC) |
ckTypeId | TEXT | Concrete type (relevant for inheritance) |
rtCreationDateTime | TIMESTAMP WITH TIME ZONE | DEFAULT CURRENT_TIMESTAMP |
rtChangedDateTime | TIMESTAMP WITH TIME ZONE | Updated on conflict |
rtWellKnownName | TEXT | Optional |
Primary key: (timestamp, rtId, ckTypeId).
TimeRangeArchive replaces timestamp with bucketStart + bucketEnd; PK is (bucketStart, bucketEnd, rtId, ckTypeId).
Path → column mapping
| Path shape | CrateDB column type | Nullability |
|---|---|---|
Scalar (voltage) | mapped from CK attribute primitive type | NOT NULL if required |
Record (sensor) | OBJECT(STRICT) with subfields from CkRecord | as above |
Array of scalars (readings[*].value) | ARRAY(<scalar type>) | NOT NULL if required |
Array of records (readings[*]) | ARRAY(OBJECT(STRICT)) | as above |
Each generated column is indexed by CrateDB's default rules unless CkArchiveColumn.Indexed = false, in which case the DDL emits INDEX OFF. Disable indexes only for columns that are read but never filtered or aggregated on.
Upsert semantics
Inserts run as multi-row INSERT … ON CONFLICT DO UPDATE. Required columns are always overwritten by the incoming value (the app-side validator guarantees they are present); optional columns merge via COALESCE(EXCLUDED.x, x) — multiple sources can contribute to the same (timestamp, rtId, ckTypeId) without overwriting each other.
rtChangedDateTime is bumped on every conflict; rtCreationDateTime is never modified. Idempotent re-inserts are safe.
Time semantics
- Canonical timezone is UTC. Incoming timestamps are normalized; naive
DateTimevalues are interpreted as UTC. - API responses serialize as ISO-8601 with explicit
Z. - Late-arriving data is accepted via the standard upsert path.
- Future timestamps are not rejected — drift detection belongs in the pipeline / monitoring layer.
Lifecycle
| Status | Table state | Inserts / Queries | Notes |
|---|---|---|---|
Created | Not provisioned | Rejected | Fully editable. No CrateDB resources allocated. |
Activated | Provisioned, frozen | Accepted | Schema immutable. Validation on update path. |
Disabled | Provisioned, frozen | Rejected | Data preserved. Re-enable to resume. |
Failed | May be partial | Rejected | Activation DDL failed. retryActivate is idempotent. |
Path validation runs on every transition that ends in Activated (Created/Disabled/Failed → Activated): each CkArchiveColumn.Path is re-validated against the current CK model. Failures throw ArchivePathInvalidException before any DDL runs.
Deletion is destructive: the CrateDB table is dropped (idempotent DROP TABLE IF EXISTS) and the entity is soft-deleted (rtState = Archived). Historical data is lost. Deletion is allowed from any status — except a RawArchive with active rollups, which is rejected with RollupSourceInUseException.
Activation Layers
Every data-plane operation is gated by a three-level AND-coupled check:
Instance enabled? ── no ──▶ DI does not register CrateDB stack; tenant enable throws.
yes
▼
Tenant enabled? ── no ──▶ Archive activate throws.
yes
▼
Archive Activated? ── no ──▶ Insert / query throws ArchiveNotActivatedException.
yes
▼
OK
| Layer | Source |
|---|---|
| Instance | StreamData:Enabled in appsettings (default false) |
| Tenant | StreamDataGlobalSettings.IsEnabled (per-tenant flag) |
| Archive | CkArchive.Status == Activated |
The studio queries GET /api/v1/streamData/status on startup; if instanceEnabled = false the archive navigation is hidden. If instanceEnabled = true but tenantEnabled = false the user sees an empty state with an "Enable for tenant" action.
Authorization
Three roles, registered in the identity service alongside AdminPanelManagement:
| Role | Allowed operations |
|---|---|
StreamDataAdmin | Full lifecycle: Create / Update / Delete archives; Activate / Disable / Enable / Retry; Tenant Enable / Disable; rollup lifecycle (Freeze / Unfreeze / Rewind). |
StreamDataWriter | Insert into activated archives. Metadata read (List / Get archives). No lifecycle, no CRUD. |
StreamDataReader | Time-series queries (simple / aggregation / grouped / downsampling). Metadata read. No insert, no lifecycle. |
AdminPanelManagement implies all three. Enforcement is server-side; the studio gates buttons by route guards only. A user without write rights who clicks an action gets a notification from the server-returned ArchiveNotActivatedException / Forbidden.
Rollup Archives
A rollup archive consumes another archive and writes one row per [bucketStart, bucketEnd) window. Buckets are sized by BucketSizeMs and aligned by BucketAlignment:
| Alignment | Bucket boundary |
|---|---|
FixedSize | LastAggregatedBucketEnd + N · BucketSizeMs |
CalendarDay | UTC midnight |
Iso8601Week | Monday 00:00:00 UTC |
CalendarMonth | First day of month 00:00:00 UTC |
CalendarYear | January 1 00:00:00 UTC |
BucketSizeMs is informational only for the calendar alignments (month lengths vary). The orchestrator stays WatermarkLagMs behind real time before closing a bucket, so late-arriving raw data still lands in the right window.
Aggregation functions
CkRollupFunction maps to one or two storage columns:
| Function | Stored columns | Read-time formula |
|---|---|---|
Sum | {base}_sum | as-is |
Min | {base}_min | as-is |
Max | {base}_max | as-is |
Count | {base}_count (BIGINT) | as-is |
Avg | {base}_sum, {base}_count (both stored) | sum / count — keeps chained rollups numerically correct |
The Columns[] projection on a rollup is generated from Aggregations[] via RollupColumnGenerator.Generate; direct edits to Columns are rejected by the validation hook.
Operator controls
| Operation | Mutation | Notes |
|---|---|---|
| Create | createRollupArchive(input) | Server derives TargetCkTypeId and Columns from the source archive + aggregations. |
| Freeze a range | freezeRollupArchive(rtId, until) | Monotonic — the new until must be ≥ the current FrozenUntil. The orchestrator stops producing buckets whose bucketEnd falls in the frozen range. |
| Unfreeze | unfreezeRollupArchive(rtId, acceptGaps) | Idempotent. acceptGaps is recorded for audit but the gap-detection guard is a follow-up. |
| Rewind the watermark | rewindRollupWatermark(rtId, toBucketEnd) | Re-aggregates from the given bucket boundary forward. Destructive: previously committed rows in the rewound range are temporarily out of sync until the orchestrator catches up. |
A RawArchive referenced by an active rollup cannot be deleted — drop the rollup first.
Time-Range Archives
For externally pre-aggregated data with explicit [from, to) windows. There is no source archive and no orchestrator on this path; producers push rows directly:
- GraphQL mutation:
createTimeRangeArchive(input)— same admin-grade role guard as rollup create. - Pipeline node:
SaveTimeRangeStreamDataInArchive@1. - REST:
POST /api/v1/streamData/archives/{archiveRtId}/insertTimeRange.
Period on TimeRangeArchive is advisory only — it lets the studio render a default time range but is not enforced at insert time.
API Surface
GraphQL — lifecycle mutations
Under mutation.streamData:
| Mutation | Returns | Description |
|---|---|---|
activateArchive(rtId) | ArchiveTransitionResult | Provisions the CrateDB table, transitions to Activated. |
disableArchive(rtId) | ArchiveTransitionResult | Activated → Disabled. Table is preserved. |
enableArchive(rtId) | ArchiveTransitionResult | Disabled → Activated. Re-validates column paths. |
retryArchiveActivation(rtId) | ArchiveTransitionResult | Failed → Activated. Idempotent (CREATE TABLE IF NOT EXISTS). |
deleteArchive(rtId) | Boolean | Drops the CrateDB table and soft-deletes the entity. Rejected if active rollups reference it. |
createRollupArchive(input) | OctoObjectId | Server-derived TargetCkTypeId + Columns. |
createTimeRangeArchive(input) | OctoObjectId | Operator-defined target type and columns. |
freezeRollupArchive(rtId, until) | ArchiveTransitionResult | Monotonic FrozenUntil set. |
unfreezeRollupArchive(rtId, acceptGaps) | ArchiveTransitionResult | Clears FrozenUntil. |
rewindRollupWatermark(rtId, toBucketEnd) | ArchiveTransitionResult | Re-aggregates from the given boundary. |
All mutations require StreamDataAdmin. Domain exceptions (§Exceptions below) surface as stable GraphQL error codes via error.extensions.OctoDetails.
CRUD on archives (List / Get / Update) is auto-generated from the CK model and lives under runtime.systemStreamDataArchives / …RawArchives / …RollupArchives / …TimeRangeArchives. The validation hook enforces immutability after activation.
GraphQL — path enumeration
query availableArchivePaths(ckTypeId: ID!, maxDepth: Int = 5): [ArchivePathInfo!]!
type ArchivePathInfo {
path: String! # "sensor.reading.value", "readings[*].value"
primitiveType: String # null when isRecord = true
isRecord: Boolean!
isArray: Boolean!
recordTypeId: String
inheritedFromCkTypeId: String # null if attribute belongs to ckTypeId itself
}
The studio's AttributePathPicker consumes this for archive create/edit. Admin-only.
REST — /api/v1/streamData
| Method | Path | Description |
|---|---|---|
| GET | /status | { instanceEnabled, tenantEnabled } for the current tenant. |
| POST | /enable / /disable | Tenant-level toggle. |
| POST | /archives/{archiveRtId}/activate | Same as GraphQL activateArchive. |
| POST | /archives/{archiveRtId}/disable / /enable / /retry | Status transitions. |
| DELETE | /archives/{archiveRtId} | Same as GraphQL deleteArchive. |
| POST | /archives/{rollupRtId}/freeze / /unfreeze / /rewind | Rollup-only controls. |
| POST | /archives/{archiveRtId}/insertTimeRange | Bulk time-range insert. |
| GET | /archives/{archiveRtId}/rollups | Active rollups referencing this raw archive. |
Pipeline node — SaveStreamDataInArchive@1
A PathNodeConfiguration node with a required property ArchiveRtId : OctoObjectId. The node routes each incoming StreamDataPoint to the configured archive via IStreamDataRepository.InsertAsync. The companion SaveTimeRangeStreamDataInArchive@1 handles TimeRangeArchive.
The mesh-adapter service account holds StreamDataWriter. Targeting an inactive archive surfaces a clear pipeline error (ArchiveNotActivatedException).
SDK — IStreamDataRepository
// Data plane
Task InsertAsync(OctoObjectId archiveRtId, StreamDataPoint point);
Task InsertAsync(OctoObjectId archiveRtId, IEnumerable<StreamDataPoint> points);
Task InsertTimeRangeAsync(OctoObjectId archiveRtId, IEnumerable<TimeRangeStreamDataPoint> points);
Task<StreamDataQueryResult> ExecuteQueryAsync(OctoObjectId archiveRtId, StreamDataQueryOptions options);
Task<StreamDataQueryResult> ExecuteAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteGroupedAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataGroupedAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteDownsamplingQueryAsync(OctoObjectId archiveRtId, StreamDataDownsamplingQueryOptions options);
// Control plane (called by IArchiveLifecycleService)
Task EnsureArchiveCreatedAsync(CkArchive archive);
Task DeleteArchiveAsync(CkArchive archive);
Bulk inserts use all-or-nothing pre-validation: every point is validated before any SQL is sent; a violation throws RequiredAttributeMissingException carrying the index of the offending point — no row is written. A DB-level failure after pre-validation surfaces as exception.
Bulk Insert Semantics
- Resolve archive (single lookup), status check.
- Pre-validate every point in the batch — required-path coverage, type compatibility, path resolution. Stops at the first violation.
- On violation → throw with the offending index. No row written.
- On success → single multi-row
INSERT … ON CONFLICT …execution.
Pipeline callers see batch success or full failure; partial-commit scenarios are limited to transient DB failures (operational, see Reconciliation below).
Retention
Retention is partition-based and per-archive (work in progress — design captured here; some attributes are not yet exposed in the create form).
- DDL: archive tables are
PARTITIONED BY DATE_TRUNC('day', timestamp). Drop-partition is metadata-only on CrateDB. RawRetentionMson theArchivebase type defines the retention horizon.null= retain forever.- The
ArchiveRetentionScheduler(IHostedService, hourly) drops partitions older thannow() - RawRetentionMsper archive, per tenant. Failures are isolated; one bad archive does not block the rest.
For downsampling-style retention (raw rows folded into a coarser archive before deletion), use a RollupArchive pointing at the source archive — the rollup is the long-term store while the raw archive's retention can be aggressive.
Reconciliation
CkArchive entities live in MongoDB; archive tables live in CrateDB. There is no shared transaction. The lifecycle service keeps the two reconcilable without 2PC:
- All DDL is idempotent (
IF EXISTS/IF NOT EXISTS). - Operation order is Crate-first, Mongo-last:
ActivaterunsCREATE TABLE, then flipsStatusin Mongo.DeleterunsDROP TABLE, then soft-deletes the entity. Mongo is the source of truth. - Every data-plane call checks
Status == Activatedbefore touching Crate. While step 1 has run but step 2 has not, the API rejects withArchiveNotActivatedException. - On startup the
ArchiveReconcilerenumeratesActivatedarchives per tenant and (re-)provisions any missing CrateDB tables. Drift in the other direction (table without entity) is logged but not auto-dropped.
Concurrency
| Scenario | Behavior |
|---|---|
Two parallel Activate on the same archive | Serialized via RepositoryDistributedLockService. The second call observes Activated and returns idempotently. |
Disable while inserts are in flight | In-flight inserts complete. New inserts after the flip raise ArchiveNotActivatedException. |
Delete while inserts are in flight | Same as Disable, plus the DROP TABLE may retry once with a short backoff if the table is briefly locked. |
| Activate fails mid-DDL | Status → Failed. EnsureArchiveCreatedAsync is idempotent (CREATE TABLE IF NOT EXISTS), so retry is safe. |
| Bulk insert on disabled archive | Whole batch rejected on the single status lookup. |
Exception Catalog
All in Meshmakers.Octo.Runtime.Contracts.StreamData. Common base: StreamDataException (extends RuntimeRepositoryException). Surfaced via GraphQL error.extensions.OctoDetails and via REST status codes.
| Exception | Thrown when |
|---|---|
StreamDataException | Base — generic StreamData failure. |
StreamDataNotEnabledException | Instance or tenant flag is false on an operation that requires it. |
ArchiveNotFoundException | archiveRtId does not resolve to an Archive entity. |
ArchiveNotActivatedException | Archive exists but Status ≠ Activated on insert / query. |
ArchiveSchemaImmutableException | Post-activation update tries to change schema-relevant fields. |
ArchivePathInvalidException | A CkArchiveColumn.Path cannot be resolved against TargetCkTypeId. |
ArchiveColumnTypeUnsupportedException | An attribute type cannot be mapped to a CrateDB column type. |
RequiredAttributeMissingException | Insert lacks a value for a required: true scalar path. Carries the offending point index. |
ArchiveActivationFailedException | DDL execution failed during activation. Wraps the underlying SQL error. |
InvalidArchiveStateTransitionException | Lifecycle mutation called against an incompatible status. |
RollupSourceInUseException | deleteArchive invoked on a raw archive that still has active rollups. |
RollupSourceMissingException | createRollupArchive references a source archiveRtId that does not exist or is not yet activated. |
Observability
Metrics
| Metric | Type | Tags |
|---|---|---|
streamdata.archive.count | Gauge | tenant, status |
streamdata.archive.status_transitions | Counter | tenant, archive, from, to |
streamdata.archive.activation_duration_ms | Histogram | tenant, archive, outcome |
streamdata.insert.duration_ms | Histogram | tenant, archive, batch_size_bucket |
streamdata.insert.points | Counter | tenant, archive |
streamdata.insert.required_violations | Counter | tenant, archive |
streamdata.query.duration_ms | Histogram | tenant, archive, query_type |
streamdata.query.rows_returned | Histogram | tenant, archive, query_type |
streamdata.crate.connections.open | Gauge | (instance-wide) |
streamdata.crate.retention_partitions_dropped | Counter | tenant, archive |
streamdata.crate.rollup_rows_inserted | Counter | tenant, rollup_archive |
Traces
Activity sources: Meshmakers.Octo.StreamData (top-level operations) and Meshmakers.Octo.StreamData.Crate (DB calls). Span attributes: streamdata.archive.rtid, streamdata.archive.target_cktype, streamdata.tenant.
Audit trail
Status transitions are emitted as RtEvent records through IEventRepository with source = RtEventSourcesEnum.StreamData. The triggering user is encoded in the event message; auto-transitions use system:reconciliation. The studio's archive details view exposes the history via the standard event query path.
Creating an Archive
From the CLI
# Tenant-level enable (required once per tenant)
octo-cli -c EnableStreamData
# Archives are runtime entities — create them through the studio, GraphQL,
# or by rt-importing a runtime-model YAML. Then:
octo-cli -c ActivateArchive -id 69fda707d47638c68edc7fea
See octo-cli — Stream Data for the full command surface.
From the Studio
Navigate to Repository → Stream Data Archives. The list shows every archive on the active tenant with a status badge and per-row actions (Activate, Disable, Enable, Retry, Delete, Copy ID). The New Archive, New Rollup, and New Time-Range entry points walk through the create flow with the AttributePathPicker for column selection.
See Studio: Archives for the UI walkthrough.
From GraphQL
# 1. Create the archive via the auto-generated runtime mutation
mutation {
runtime {
systemStreamDataRawArchives {
create(input: {
rtWellKnownName: "EnergyMeterArchive",
targetCkTypeId: "Industry.Energy/EnergyMeter",
columns: [
{ path: "voltage", required: true, indexed: true },
{ path: "current", required: true, indexed: true },
{ path: "powerFactor", required: false, indexed: false }
]
}) { item { rtId } }
}
}
}
# 2. Activate
mutation {
streamData {
activateArchive(rtId: "65d5c447b420da3fb1238201") {
rtId
status
}
}
}
Best Practices
- One archive per concern. Different retention, different query patterns, different consumers → different archives.
- Be explicit about required. Required scalars become
NOT NULLcolumns and are validated app-side before insert. - Index sparingly. Indexes cost storage and insert throughput. Default to indexed; opt out for columns that are read but never filtered.
- Roll up early. A
RollupArchiveis far cheaper to query over long horizons than a raw scan. - Pick alignment for the use case. Energy / EDA workflows want calendar alignments (monthly, weekly); engineering telemetry usually wants
FixedSize. - Freeze before bulk imports. When backfilling a source archive's history, freeze any dependent rollups, finish the import, then unfreeze with
acceptGaps: false(or rewind the watermark) to re-aggregate cleanly. - Soft-delete first. Disable an archive to confirm no consumer breaks before deleting.
See Also
- Stream Data Access (Intro) — query surfaces (typed / transient / persisted)
- Simple Query — raw time-series retrieval
- Aggregation Queries — server-side aggregation
- Downsampling Query — bucketed downsampling
- octo-cli — Stream Data — CLI commands for archive operations
- Studio: Archives — UI walkthrough in the Refinery Studio