Skip to main content

Stream Data Archives

A Stream Data Archive is the unit of configuration and storage for time-series data in OctoMesh. Each archive is a versioned, typed, per-tenant CrateDB table that captures a curated set of attribute paths from a Construction Kit type. Archives have a strict lifecycle (Created → Activated → Disabled / Failed), an immutable schema once activated, and a three-tier activation gate (instance → tenant → archive) that determines whether the data plane is open.

Archives are runtime entities — instances of the System.StreamData CK model — and can be created, activated, disabled, retried, and deleted through the Refinery Studio, GraphQL, REST, or the octo-cli.

What an Archive Is

An archive bundles three concerns into one entity:

  • Target CK type — the source-of-truth for the data shape. Inheritance is supported: an archive defined on type A applies to all derived types; one CrateDB table per concrete type.
  • Column list — the attribute paths captured at ingestion time, mapped to typed CrateDB columns. Required vs. optional and indexed vs. non-indexed are configured per column.
  • Status — the lifecycle state. Inserts and queries are accepted only when status is Activated.

Where a CK model defines the shape of data, an archive defines what slice of that data is persisted to time series. Many archives can target the same CK type — different retention, different columns, different downsampling.

Properties

PropertyDescription
Per-tenantEach tenant has its own CrateDB schema. No cross-tenant data.
TypedEach captured attribute path becomes a real CrateDB column with a declared type.
Immutable schemaOnce activated, TargetCkTypeId and Columns cannot change. New version = new archive.
Multi-archiveA CK type may be captured by any number of archives concurrently.
Status-gatedInserts and queries are accepted only while the archive's status is Activated.
Soft-delete preserves dataDisabling an archive blocks reads and writes but preserves the underlying CrateDB table.

Archive Subtypes

The abstract base Archive has three concrete subtypes (System.StreamData CK model, ≥ 1.4.0):

SubtypeTime axisIngestion pathUse case
RawArchiveSingle timestamp per rowExternal producers (SaveStreamDataInArchive@1, IStreamDataRepository.InsertAsync)Instant measurements (sensor readings, machine telemetry, events)
RollupArchiveBucketed [bucketStart, bucketEnd)System rollup orchestrator only — no external insertsDerived bucketed aggregations of another archive (raw or rollup-of-rollup)
TimeRangeArchiveExplicit [from, to) range per rowExternal producers (SaveTimeRangeStreamDataInArchive@1, IStreamDataRepository.InsertTimeRangeAsync)Externally pre-aggregated data (EDA reports, smart meters, weather APIs)

RawArchive and TimeRangeArchive are operator-defined and accept external data. RollupArchive columns are generated server-side from the configured aggregations — direct edits are rejected.

CK Model

Loaded only when StreamData is enabled at the instance level. Lives in System.StreamData (currently 1.4.0).

# Abstract base
ckType: Archive
isAbstract: true
attributes:
- TargetCkTypeId # CkId<CkTypeId>
- Columns # array<CkArchiveColumn>
- Status # CkArchiveStatus
- RawRetentionMs # int64, optional — partition-drop horizon

# Concrete subtypes
ckType: RawArchive (derivedFrom: Archive)
ckType: TimeRangeArchive (derivedFrom: Archive)
attributes:
- Period # TimeSpan, optional — advisory hint
ckType: RollupArchive (derivedFrom: Archive)
attributes:
- SourceArchiveRtId # rtId of the archive this rollup reads from
- BucketSizeMs # int64
- BucketAlignment # FixedSize | CalendarDay | Iso8601Week | CalendarMonth | CalendarYear
- WatermarkLagMs # int64 — how far behind real time the orchestrator stays
- LastAggregatedBucketEnd # DateTime, optional
- Aggregations # array<CkRollupAggregation>
- FrozenUntil # DateTime, optional

ckEnum: CkArchiveStatus { Created, Activated, Disabled, Failed }
ckEnum: CkRollupFunction { Avg, Min, Max, Sum, Count }
ckEnum: BucketAlignment { FixedSize, CalendarDay, Iso8601Week, CalendarMonth, CalendarYear }

ckRecord: CkArchiveColumn
attributes:
- Path # attribute path, e.g. "sensor.reading.value" or "readings[*].value"
- Required # bool
- Indexed # bool (default true — opt-out only when justified)

ckRecord: CkRollupAggregation
attributes:
- SourcePath # attribute path on the source archive's CK type
- Function # Avg | Min | Max | Sum | Count
- TargetColumnName # optional override; otherwise derived from SourcePath + Function

Storage Layout

Naming

ElementConvention
Schemaclean(tenantRtCkId.SemanticVersionedFullName), max 63 chars
Tableclean(targetCkType.SVF) + "_" + clean(archiveRtCkId.SVF), max 200 chars
ColumncamelCase from CK attribute id; nested paths concatenated camelCase via ColumnNameMapper

clean() strips non-alphanumerics. When a generated name exceeds the length limit, the helper truncates and appends a deterministic SHA-256 suffix ({truncated}_{hash16}). The truncation is logged at Information with both names for traceability.

Standard columns (every archive table)

ColumnTypeNotes
rtIdTEXTEntity instance id
timestampTIMESTAMP WITH TIME ZONEDatapoint timestamp (UTC)
ckTypeIdTEXTConcrete type (relevant for inheritance)
rtCreationDateTimeTIMESTAMP WITH TIME ZONEDEFAULT CURRENT_TIMESTAMP
rtChangedDateTimeTIMESTAMP WITH TIME ZONEUpdated on conflict
rtWellKnownNameTEXTOptional

Primary key: (timestamp, rtId, ckTypeId).

TimeRangeArchive replaces timestamp with bucketStart + bucketEnd; PK is (bucketStart, bucketEnd, rtId, ckTypeId).

Path → column mapping

Path shapeCrateDB column typeNullability
Scalar (voltage)mapped from CK attribute primitive typeNOT NULL if required
Record (sensor)OBJECT(STRICT) with subfields from CkRecordas above
Array of scalars (readings[*].value)ARRAY(<scalar type>)NOT NULL if required
Array of records (readings[*])ARRAY(OBJECT(STRICT))as above

Each generated column is indexed by CrateDB's default rules unless CkArchiveColumn.Indexed = false, in which case the DDL emits INDEX OFF. Disable indexes only for columns that are read but never filtered or aggregated on.

Upsert semantics

Inserts run as multi-row INSERT … ON CONFLICT DO UPDATE. Required columns are always overwritten by the incoming value (the app-side validator guarantees they are present); optional columns merge via COALESCE(EXCLUDED.x, x) — multiple sources can contribute to the same (timestamp, rtId, ckTypeId) without overwriting each other.

rtChangedDateTime is bumped on every conflict; rtCreationDateTime is never modified. Idempotent re-inserts are safe.

Time semantics

  • Canonical timezone is UTC. Incoming timestamps are normalized; naive DateTime values are interpreted as UTC.
  • API responses serialize as ISO-8601 with explicit Z.
  • Late-arriving data is accepted via the standard upsert path.
  • Future timestamps are not rejected — drift detection belongs in the pipeline / monitoring layer.

Lifecycle

StatusTable stateInserts / QueriesNotes
CreatedNot provisionedRejectedFully editable. No CrateDB resources allocated.
ActivatedProvisioned, frozenAcceptedSchema immutable. Validation on update path.
DisabledProvisioned, frozenRejectedData preserved. Re-enable to resume.
FailedMay be partialRejectedActivation DDL failed. retryActivate is idempotent.

Path validation runs on every transition that ends in Activated (Created/Disabled/Failed → Activated): each CkArchiveColumn.Path is re-validated against the current CK model. Failures throw ArchivePathInvalidException before any DDL runs.

Deletion is destructive: the CrateDB table is dropped (idempotent DROP TABLE IF EXISTS) and the entity is soft-deleted (rtState = Archived). Historical data is lost. Deletion is allowed from any status — except a RawArchive with active rollups, which is rejected with RollupSourceInUseException.

Activation Layers

Every data-plane operation is gated by a three-level AND-coupled check:

Instance enabled? ── no ──▶ DI does not register CrateDB stack; tenant enable throws.
yes

Tenant enabled? ── no ──▶ Archive activate throws.
yes

Archive Activated? ── no ──▶ Insert / query throws ArchiveNotActivatedException.
yes

OK
LayerSource
InstanceStreamData:Enabled in appsettings (default false)
TenantStreamDataGlobalSettings.IsEnabled (per-tenant flag)
ArchiveCkArchive.Status == Activated

The studio queries GET /api/v1/streamData/status on startup; if instanceEnabled = false the archive navigation is hidden. If instanceEnabled = true but tenantEnabled = false the user sees an empty state with an "Enable for tenant" action.

Authorization

Three roles, registered in the identity service alongside AdminPanelManagement:

RoleAllowed operations
StreamDataAdminFull lifecycle: Create / Update / Delete archives; Activate / Disable / Enable / Retry; Tenant Enable / Disable; rollup lifecycle (Freeze / Unfreeze / Rewind).
StreamDataWriterInsert into activated archives. Metadata read (List / Get archives). No lifecycle, no CRUD.
StreamDataReaderTime-series queries (simple / aggregation / grouped / downsampling). Metadata read. No insert, no lifecycle.

AdminPanelManagement implies all three. Enforcement is server-side; the studio gates buttons by route guards only. A user without write rights who clicks an action gets a notification from the server-returned ArchiveNotActivatedException / Forbidden.

Rollup Archives

A rollup archive consumes another archive and writes one row per [bucketStart, bucketEnd) window. Buckets are sized by BucketSizeMs and aligned by BucketAlignment:

AlignmentBucket boundary
FixedSizeLastAggregatedBucketEnd + N · BucketSizeMs
CalendarDayUTC midnight
Iso8601WeekMonday 00:00:00 UTC
CalendarMonthFirst day of month 00:00:00 UTC
CalendarYearJanuary 1 00:00:00 UTC

BucketSizeMs is informational only for the calendar alignments (month lengths vary). The orchestrator stays WatermarkLagMs behind real time before closing a bucket, so late-arriving raw data still lands in the right window.

Aggregation functions

CkRollupFunction maps to one or two storage columns:

FunctionStored columnsRead-time formula
Sum{base}_sumas-is
Min{base}_minas-is
Max{base}_maxas-is
Count{base}_count (BIGINT)as-is
Avg{base}_sum, {base}_count (both stored)sum / count — keeps chained rollups numerically correct

The Columns[] projection on a rollup is generated from Aggregations[] via RollupColumnGenerator.Generate; direct edits to Columns are rejected by the validation hook.

Operator controls

OperationMutationNotes
CreatecreateRollupArchive(input)Server derives TargetCkTypeId and Columns from the source archive + aggregations.
Freeze a rangefreezeRollupArchive(rtId, until)Monotonic — the new until must be ≥ the current FrozenUntil. The orchestrator stops producing buckets whose bucketEnd falls in the frozen range.
UnfreezeunfreezeRollupArchive(rtId, acceptGaps)Idempotent. acceptGaps is recorded for audit but the gap-detection guard is a follow-up.
Rewind the watermarkrewindRollupWatermark(rtId, toBucketEnd)Re-aggregates from the given bucket boundary forward. Destructive: previously committed rows in the rewound range are temporarily out of sync until the orchestrator catches up.

A RawArchive referenced by an active rollup cannot be deleted — drop the rollup first.

Time-Range Archives

For externally pre-aggregated data with explicit [from, to) windows. There is no source archive and no orchestrator on this path; producers push rows directly:

  • GraphQL mutation: createTimeRangeArchive(input) — same admin-grade role guard as rollup create.
  • Pipeline node: SaveTimeRangeStreamDataInArchive@1.
  • REST: POST /api/v1/streamData/archives/{archiveRtId}/insertTimeRange.

Period on TimeRangeArchive is advisory only — it lets the studio render a default time range but is not enforced at insert time.

API Surface

GraphQL — lifecycle mutations

Under mutation.streamData:

MutationReturnsDescription
activateArchive(rtId)ArchiveTransitionResultProvisions the CrateDB table, transitions to Activated.
disableArchive(rtId)ArchiveTransitionResultActivated → Disabled. Table is preserved.
enableArchive(rtId)ArchiveTransitionResultDisabled → Activated. Re-validates column paths.
retryArchiveActivation(rtId)ArchiveTransitionResultFailed → Activated. Idempotent (CREATE TABLE IF NOT EXISTS).
deleteArchive(rtId)BooleanDrops the CrateDB table and soft-deletes the entity. Rejected if active rollups reference it.
createRollupArchive(input)OctoObjectIdServer-derived TargetCkTypeId + Columns.
createTimeRangeArchive(input)OctoObjectIdOperator-defined target type and columns.
freezeRollupArchive(rtId, until)ArchiveTransitionResultMonotonic FrozenUntil set.
unfreezeRollupArchive(rtId, acceptGaps)ArchiveTransitionResultClears FrozenUntil.
rewindRollupWatermark(rtId, toBucketEnd)ArchiveTransitionResultRe-aggregates from the given boundary.

All mutations require StreamDataAdmin. Domain exceptions (§Exceptions below) surface as stable GraphQL error codes via error.extensions.OctoDetails.

CRUD on archives (List / Get / Update) is auto-generated from the CK model and lives under runtime.systemStreamDataArchives / …RawArchives / …RollupArchives / …TimeRangeArchives. The validation hook enforces immutability after activation.

GraphQL — path enumeration

query availableArchivePaths(ckTypeId: ID!, maxDepth: Int = 5): [ArchivePathInfo!]!

type ArchivePathInfo {
path: String! # "sensor.reading.value", "readings[*].value"
primitiveType: String # null when isRecord = true
isRecord: Boolean!
isArray: Boolean!
recordTypeId: String
inheritedFromCkTypeId: String # null if attribute belongs to ckTypeId itself
}

The studio's AttributePathPicker consumes this for archive create/edit. Admin-only.

REST — /api/v1/streamData

MethodPathDescription
GET/status{ instanceEnabled, tenantEnabled } for the current tenant.
POST/enable / /disableTenant-level toggle.
POST/archives/{archiveRtId}/activateSame as GraphQL activateArchive.
POST/archives/{archiveRtId}/disable / /enable / /retryStatus transitions.
DELETE/archives/{archiveRtId}Same as GraphQL deleteArchive.
POST/archives/{rollupRtId}/freeze / /unfreeze / /rewindRollup-only controls.
POST/archives/{archiveRtId}/insertTimeRangeBulk time-range insert.
GET/archives/{archiveRtId}/rollupsActive rollups referencing this raw archive.

Pipeline node — SaveStreamDataInArchive@1

A PathNodeConfiguration node with a required property ArchiveRtId : OctoObjectId. The node routes each incoming StreamDataPoint to the configured archive via IStreamDataRepository.InsertAsync. The companion SaveTimeRangeStreamDataInArchive@1 handles TimeRangeArchive.

The mesh-adapter service account holds StreamDataWriter. Targeting an inactive archive surfaces a clear pipeline error (ArchiveNotActivatedException).

SDK — IStreamDataRepository

// Data plane
Task InsertAsync(OctoObjectId archiveRtId, StreamDataPoint point);
Task InsertAsync(OctoObjectId archiveRtId, IEnumerable<StreamDataPoint> points);
Task InsertTimeRangeAsync(OctoObjectId archiveRtId, IEnumerable<TimeRangeStreamDataPoint> points);

Task<StreamDataQueryResult> ExecuteQueryAsync(OctoObjectId archiveRtId, StreamDataQueryOptions options);
Task<StreamDataQueryResult> ExecuteAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteGroupedAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataGroupedAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteDownsamplingQueryAsync(OctoObjectId archiveRtId, StreamDataDownsamplingQueryOptions options);

// Control plane (called by IArchiveLifecycleService)
Task EnsureArchiveCreatedAsync(CkArchive archive);
Task DeleteArchiveAsync(CkArchive archive);

Bulk inserts use all-or-nothing pre-validation: every point is validated before any SQL is sent; a violation throws RequiredAttributeMissingException carrying the index of the offending point — no row is written. A DB-level failure after pre-validation surfaces as exception.

Bulk Insert Semantics

  1. Resolve archive (single lookup), status check.
  2. Pre-validate every point in the batch — required-path coverage, type compatibility, path resolution. Stops at the first violation.
  3. On violation → throw with the offending index. No row written.
  4. On success → single multi-row INSERT … ON CONFLICT … execution.

Pipeline callers see batch success or full failure; partial-commit scenarios are limited to transient DB failures (operational, see Reconciliation below).

Retention

Retention is partition-based and per-archive (work in progress — design captured here; some attributes are not yet exposed in the create form).

  • DDL: archive tables are PARTITIONED BY DATE_TRUNC('day', timestamp). Drop-partition is metadata-only on CrateDB.
  • RawRetentionMs on the Archive base type defines the retention horizon. null = retain forever.
  • The ArchiveRetentionScheduler (IHostedService, hourly) drops partitions older than now() - RawRetentionMs per archive, per tenant. Failures are isolated; one bad archive does not block the rest.

For downsampling-style retention (raw rows folded into a coarser archive before deletion), use a RollupArchive pointing at the source archive — the rollup is the long-term store while the raw archive's retention can be aggressive.

Reconciliation

CkArchive entities live in MongoDB; archive tables live in CrateDB. There is no shared transaction. The lifecycle service keeps the two reconcilable without 2PC:

  1. All DDL is idempotent (IF EXISTS / IF NOT EXISTS).
  2. Operation order is Crate-first, Mongo-last: Activate runs CREATE TABLE, then flips Status in Mongo. Delete runs DROP TABLE, then soft-deletes the entity. Mongo is the source of truth.
  3. Every data-plane call checks Status == Activated before touching Crate. While step 1 has run but step 2 has not, the API rejects with ArchiveNotActivatedException.
  4. On startup the ArchiveReconciler enumerates Activated archives per tenant and (re-)provisions any missing CrateDB tables. Drift in the other direction (table without entity) is logged but not auto-dropped.

Concurrency

ScenarioBehavior
Two parallel Activate on the same archiveSerialized via RepositoryDistributedLockService. The second call observes Activated and returns idempotently.
Disable while inserts are in flightIn-flight inserts complete. New inserts after the flip raise ArchiveNotActivatedException.
Delete while inserts are in flightSame as Disable, plus the DROP TABLE may retry once with a short backoff if the table is briefly locked.
Activate fails mid-DDLStatus → Failed. EnsureArchiveCreatedAsync is idempotent (CREATE TABLE IF NOT EXISTS), so retry is safe.
Bulk insert on disabled archiveWhole batch rejected on the single status lookup.

Exception Catalog

All in Meshmakers.Octo.Runtime.Contracts.StreamData. Common base: StreamDataException (extends RuntimeRepositoryException). Surfaced via GraphQL error.extensions.OctoDetails and via REST status codes.

ExceptionThrown when
StreamDataExceptionBase — generic StreamData failure.
StreamDataNotEnabledExceptionInstance or tenant flag is false on an operation that requires it.
ArchiveNotFoundExceptionarchiveRtId does not resolve to an Archive entity.
ArchiveNotActivatedExceptionArchive exists but Status ≠ Activated on insert / query.
ArchiveSchemaImmutableExceptionPost-activation update tries to change schema-relevant fields.
ArchivePathInvalidExceptionA CkArchiveColumn.Path cannot be resolved against TargetCkTypeId.
ArchiveColumnTypeUnsupportedExceptionAn attribute type cannot be mapped to a CrateDB column type.
RequiredAttributeMissingExceptionInsert lacks a value for a required: true scalar path. Carries the offending point index.
ArchiveActivationFailedExceptionDDL execution failed during activation. Wraps the underlying SQL error.
InvalidArchiveStateTransitionExceptionLifecycle mutation called against an incompatible status.
RollupSourceInUseExceptiondeleteArchive invoked on a raw archive that still has active rollups.
RollupSourceMissingExceptioncreateRollupArchive references a source archiveRtId that does not exist or is not yet activated.

Observability

Metrics

MetricTypeTags
streamdata.archive.countGaugetenant, status
streamdata.archive.status_transitionsCountertenant, archive, from, to
streamdata.archive.activation_duration_msHistogramtenant, archive, outcome
streamdata.insert.duration_msHistogramtenant, archive, batch_size_bucket
streamdata.insert.pointsCountertenant, archive
streamdata.insert.required_violationsCountertenant, archive
streamdata.query.duration_msHistogramtenant, archive, query_type
streamdata.query.rows_returnedHistogramtenant, archive, query_type
streamdata.crate.connections.openGauge(instance-wide)
streamdata.crate.retention_partitions_droppedCountertenant, archive
streamdata.crate.rollup_rows_insertedCountertenant, rollup_archive

Traces

Activity sources: Meshmakers.Octo.StreamData (top-level operations) and Meshmakers.Octo.StreamData.Crate (DB calls). Span attributes: streamdata.archive.rtid, streamdata.archive.target_cktype, streamdata.tenant.

Audit trail

Status transitions are emitted as RtEvent records through IEventRepository with source = RtEventSourcesEnum.StreamData. The triggering user is encoded in the event message; auto-transitions use system:reconciliation. The studio's archive details view exposes the history via the standard event query path.

Creating an Archive

From the CLI

# Tenant-level enable (required once per tenant)
octo-cli -c EnableStreamData

# Archives are runtime entities — create them through the studio, GraphQL,
# or by rt-importing a runtime-model YAML. Then:
octo-cli -c ActivateArchive -id 69fda707d47638c68edc7fea

See octo-cli — Stream Data for the full command surface.

From the Studio

Navigate to Repository → Stream Data Archives. The list shows every archive on the active tenant with a status badge and per-row actions (Activate, Disable, Enable, Retry, Delete, Copy ID). The New Archive, New Rollup, and New Time-Range entry points walk through the create flow with the AttributePathPicker for column selection.

See Studio: Archives for the UI walkthrough.

From GraphQL

# 1. Create the archive via the auto-generated runtime mutation
mutation {
runtime {
systemStreamDataRawArchives {
create(input: {
rtWellKnownName: "EnergyMeterArchive",
targetCkTypeId: "Industry.Energy/EnergyMeter",
columns: [
{ path: "voltage", required: true, indexed: true },
{ path: "current", required: true, indexed: true },
{ path: "powerFactor", required: false, indexed: false }
]
}) { item { rtId } }
}
}
}

# 2. Activate
mutation {
streamData {
activateArchive(rtId: "65d5c447b420da3fb1238201") {
rtId
status
}
}
}

Best Practices

  1. One archive per concern. Different retention, different query patterns, different consumers → different archives.
  2. Be explicit about required. Required scalars become NOT NULL columns and are validated app-side before insert.
  3. Index sparingly. Indexes cost storage and insert throughput. Default to indexed; opt out for columns that are read but never filtered.
  4. Roll up early. A RollupArchive is far cheaper to query over long horizons than a raw scan.
  5. Pick alignment for the use case. Energy / EDA workflows want calendar alignments (monthly, weekly); engineering telemetry usually wants FixedSize.
  6. Freeze before bulk imports. When backfilling a source archive's history, freeze any dependent rollups, finish the import, then unfreeze with acceptGaps: false (or rewind the watermark) to re-aggregate cleanly.
  7. Soft-delete first. Disable an archive to confirm no consumer breaks before deleting.

See Also