Stream Data Archives
A Stream Data Archive is the unit of configuration and storage for time-series data in OctoMesh. Each archive is a versioned, typed, per-tenant CrateDB table that captures a curated set of attribute paths from a Construction Kit type. Archives have a strict lifecycle (Created → Activated → Disabled / Failed), an immutable schema once activated, and a three-tier activation gate (instance → tenant → archive) that determines whether the data plane is open.
Archives are runtime entities — instances of the System.StreamData CK model — and can be created, activated, disabled, retried, and deleted through the Refinery Studio, GraphQL, REST, or the octo-cli.
Archive rows can be backed up too. Use
ExportArchiveData /
ImportArchiveData
to move a single archive's rows, or include all of a tenant's archives in a full tenant backup with
dump --include-archive-data — see Repository Backup & Restore.
What an Archive Is
An archive bundles three concerns into one entity:
- Target CK type — the source-of-truth for the data shape. Inheritance is supported: an archive defined on type
Aapplies to all derived types; one CrateDB table per concrete type. - Column list — the attribute paths captured at ingestion time, mapped to typed CrateDB columns. Required vs. optional and indexed vs. non-indexed are configured per column.
- Status — the lifecycle state. Inserts and queries are accepted only when status is
Activated.
Where a CK model defines the shape of data, an archive defines what slice of that data is persisted to time series. Many archives can target the same CK type — different retention, different columns, different downsampling.
Properties
| Property | Description |
|---|---|
| Per-tenant | Each tenant has its own CrateDB schema. No cross-tenant data. |
| Typed | Each captured attribute path becomes a real CrateDB column with a declared type. |
| Immutable schema | Once activated, TargetCkTypeId and Columns cannot change. New version = new archive. |
| Multi-archive | A CK type may be captured by any number of archives concurrently. |
| Status-gated | Inserts and queries are accepted only while the archive's status is Activated. |
| Soft-delete preserves data | Disabling an archive blocks reads and writes but preserves the underlying CrateDB table. |
Archive Subtypes
The abstract base Archive has three concrete subtypes (System.StreamData CK model, ≥ 1.4.0):
| Subtype | Time axis | Ingestion path | Use case |
|---|---|---|---|
RawArchive | Single timestamp per row | External producers (SaveStreamDataInArchive@1, IStreamDataRepository.InsertAsync) | Instant measurements (sensor readings, machine telemetry, events) |
RollupArchive | Bucketed [bucketStart, bucketEnd) | System rollup orchestrator only — no external inserts | Derived bucketed aggregations of another archive (raw or rollup-of-rollup) |
TimeRangeArchive | Explicit [from, to) range per row | External producers (SaveTimeRangeStreamDataInArchive@1, IStreamDataRepository.InsertTimeRangeAsync) | Externally pre-aggregated data (EDA reports, smart meters, weather APIs) |
RawArchive and TimeRangeArchive are operator-defined and accept external data. RollupArchive columns are generated server-side from the configured aggregations — direct edits are rejected.
CK Model
Loaded only when StreamData is enabled at the instance level. Lives in System.StreamData (currently 1.4.0).
# Abstract base
ckType: Archive
isAbstract: true
attributes:
- TargetCkTypeId # CkId<CkTypeId>
- Columns # array<CkArchiveColumn>
- Status # CkArchiveStatus
- RawRetentionMs # int64, optional — partition-drop horizon
# Concrete subtypes
ckType: RawArchive (derivedFrom: Archive)
ckType: TimeRangeArchive (derivedFrom: Archive)
attributes:
- Period # TimeSpan, optional — advisory hint
ckType: RollupArchive (derivedFrom: Archive)
attributes:
- SourceArchiveRtId # rtId of the archive this rollup reads from
- BucketSizeMs # int64
- BucketAlignment # FixedSize | CalendarDay | Iso8601Week | CalendarMonth | CalendarYear
- ReferenceTimeZone # IANA zone id (e.g. Europe/Vienna), optional — anchors calendar alignments (since 1.6.4)
- WatermarkLagMs # int64 — how far behind real time the orchestrator stays
- LastAggregatedBucketEnd # DateTime, optional
- Aggregations # array<CkRollupAggregation>
- FrozenUntil # DateTime, optional
ckEnum: CkArchiveStatus { Created, Activated, Disabled, Failed }
ckEnum: CkRollupFunction { Avg, Min, Max, Sum, Count }
ckEnum: BucketAlignment { FixedSize, CalendarDay, Iso8601Week, CalendarMonth, CalendarYear }
ckRecord: CkArchiveColumn
attributes:
- Path # attribute path, e.g. "sensor.reading.value" or "readings[*].value"
- Required # bool
- Indexed # bool (default true — opt-out only when justified)
ckRecord: CkRollupAggregation
attributes:
- SourcePath # attribute path on the source archive's CK type
- Function # Avg | Min | Max | Sum | Count
- TargetColumnName # optional override; otherwise derived from SourcePath + Function
Storage Layout
Naming
| Element | Convention |
|---|---|
| Schema | clean(tenantRtCkId.SemanticVersionedFullName), max 63 chars |
| Table | clean(targetCkType.SVF) + "_" + clean(archiveRtCkId.SVF), max 200 chars |
| Column | camelCase from CK attribute id; nested paths concatenated camelCase via ColumnNameMapper |
clean() strips non-alphanumerics. When a generated name exceeds the length limit, the helper truncates and appends a deterministic SHA-256 suffix ({truncated}_{hash16}). The truncation is logged at Information with both names for traceability.
Standard columns (every archive table)
| Column | Type | Notes |
|---|---|---|
rtId | TEXT | Entity instance id |
timestamp | TIMESTAMP WITH TIME ZONE | Datapoint timestamp (UTC) |
ckTypeId | TEXT | Concrete type (relevant for inheritance) |
rtCreationDateTime | TIMESTAMP WITH TIME ZONE | DEFAULT CURRENT_TIMESTAMP |
rtChangedDateTime | TIMESTAMP WITH TIME ZONE | Updated on conflict |
rtWellKnownName | TEXT | Optional |
Primary key: (timestamp, rtId, ckTypeId).
TimeRangeArchive replaces timestamp with bucketStart + bucketEnd; PK is (bucketStart, bucketEnd, rtId, ckTypeId).
Path → column mapping
| Path shape | CrateDB column type | Nullability |
|---|---|---|
Scalar (voltage) | mapped from CK attribute primitive type | NOT NULL if required |
Record (sensor) | OBJECT(STRICT) with subfields from CkRecord | as above |
Array of scalars (readings[*].value) | ARRAY(<scalar type>) | NOT NULL if required |
Array of records (readings[*]) | ARRAY(OBJECT(STRICT)) | as above |
Each generated column is indexed by CrateDB's default rules unless CkArchiveColumn.Indexed = false, in which case the DDL emits INDEX OFF. Disable indexes only for columns that are read but never filtered or aggregated on.
Upsert semantics
Inserts run as multi-row INSERT … ON CONFLICT DO UPDATE. Required columns are always overwritten by the incoming value (the app-side validator guarantees they are present); optional columns merge via COALESCE(EXCLUDED.x, x) — multiple sources can contribute to the same (timestamp, rtId, ckTypeId) without overwriting each other.
rtChangedDateTime is bumped on every conflict; rtCreationDateTime is never modified. Idempotent re-inserts are safe.
Time semantics
- Canonical timezone is UTC. Incoming timestamps are normalized; naive
DateTimevalues are interpreted as UTC. - API responses serialize as ISO-8601 with explicit
Z. - Late-arriving data is accepted via the standard upsert path.
- Future timestamps are not rejected — drift detection belongs in the pipeline / monitoring layer.
Lifecycle
| Status | Table state | Inserts / Queries | Notes |
|---|---|---|---|
Created | Not provisioned | Rejected | Fully editable. No CrateDB resources allocated. |
Activated | Provisioned, frozen | Accepted | Schema immutable. Validation on update path. |
Disabled | Provisioned, frozen | Rejected | Data preserved. Re-enable to resume. |
Failed | May be partial | Rejected | Activation DDL failed. retryActivate is idempotent. |
Path validation runs on every transition that ends in Activated (Created/Disabled/Failed → Activated): each CkArchiveColumn.Path is re-validated against the current CK model. Failures throw ArchivePathInvalidException before any DDL runs.
Deletion is destructive: the CrateDB table is dropped (idempotent DROP TABLE IF EXISTS) and the entity is soft-deleted (rtState = Archived). Historical data is lost. Deletion is allowed from any status — except a RawArchive with active rollups, which is rejected with RollupSourceInUseException.
Activation Layers
Every data-plane operation is gated by a three-level AND-coupled check:
Instance enabled? ── no ──▶ DI does not register CrateDB stack; tenant enable throws.
yes
▼
Tenant enabled? ── no ──▶ Archive activate throws.
yes
▼
Archive Activated? ── no ──▶ Insert / query throws ArchiveNotActivatedException.
yes
▼
OK
| Layer | Source |
|---|---|
| Instance | StreamData:Enabled in appsettings (default false) |
| Tenant | StreamDataGlobalSettings.IsEnabled (per-tenant flag) |
| Archive | CkArchive.Status == Activated |
The studio queries GET /api/v1/streamData/status on startup; if instanceEnabled = false the archive navigation is hidden. If instanceEnabled = true but tenantEnabled = false the user sees an empty state with an "Enable for tenant" action.
Authorization
Three roles, registered in the identity service alongside AdminPanelManagement:
| Role | Allowed operations |
|---|---|
StreamDataAdmin | Full lifecycle: Create / Update / Delete archives; Activate / Disable / Enable / Retry; Tenant Enable / Disable; rollup lifecycle (Freeze / Unfreeze / Rewind). |
StreamDataWriter | Insert into activated archives. Metadata read (List / Get archives). No lifecycle, no CRUD. |
StreamDataReader | Time-series queries (simple / aggregation / grouped / downsampling). Metadata read. No insert, no lifecycle. |
AdminPanelManagement implies all three. Enforcement is server-side; the studio gates buttons by route guards only. A user without write rights who clicks an action gets a notification from the server-returned ArchiveNotActivatedException / Forbidden.
Rollup Archives
A rollup archive consumes another archive and writes one row per [bucketStart, bucketEnd) window. Buckets are sized by BucketSizeMs and aligned by BucketAlignment:
| Alignment | Bucket boundary |
|---|---|
FixedSize | LastAggregatedBucketEnd + N · BucketSizeMs (offset-invariant; ignores the zone) |
CalendarDay | Local midnight in ReferenceTimeZone |
Iso8601Week | Monday 00:00:00 local time in ReferenceTimeZone |
CalendarMonth | First day of month 00:00:00 local time in ReferenceTimeZone |
CalendarYear | January 1 00:00:00 local time in ReferenceTimeZone |
BucketSizeMs is informational only for the calendar alignments (month lengths vary). The orchestrator stays WatermarkLagMs behind real time before closing a bucket, so late-arriving raw data still lands in the right window.
Bucket interval vs source granularity
When the source archive declares a granularity (a TimeRangeArchive's Period, or a parent rollup's BucketSizeMs), the rollup's BucketSizeMs must be ≥ the source granularity and an integer multiple of it. Activation rejects a finer or non-aligned bucket with RollupBucketIntervalException — e.g. over a 15-minute source, 15 min / 30 min / 1 h / 1 d are accepted, while 5 min (finer) and 20 min (not a whole multiple) are refused. The constraint prevents a single source row from being split across two output buckets, which would silently distort Sum/Count and skew Avg/Min/Max. Raw sources that expose no declared granularity are unconstrained. The check lives in RollupValidator.ValidateForActivation, so a misconfigured rollup fails fast at Activate rather than producing wrong aggregates at query time.
Reference time zone (calendar alignments)
The calendar alignments snap each bucket to a local calendar boundary in ReferenceTimeZone — an IANA zone id such as Europe/Vienna — so the buckets are DST-correct: a CalendarDay bucket runs local-midnight to local-midnight, which is 22:00Z → 22:00Z under CEST (summer) and 23:00Z → 23:00Z under CET (winter). An empty zone falls back to UTC boundaries. FixedSize ignores the zone — a whole-hour window is offset-invariant for a whole-hour zone, so an hourly rollup needs no zone. Available since System.StreamData 1.6.4.
BucketAlignment and ReferenceTimeZone are frozen once the rollup is Activated. Switching a live rollup from FixedSize/UTC to a calendar alignment in a real zone (e.g. moving a daily rollup to CalendarDay + Europe/Vienna) requires deleting and re-creating the rollup, then re-activating and backfilling.
Aggregation functions
CkRollupFunction maps to one or two storage columns:
| Function | Stored columns | Read-time formula |
|---|---|---|
Sum | {base}_sum | as-is |
Min | {base}_min | as-is |
Max | {base}_max | as-is |
Count | {base}_count (BIGINT) | as-is |
Avg | {base}_sum, {base}_count (both stored) | sum / count — keeps chained rollups numerically correct |
The Columns[] projection on a rollup is generated from Aggregations[] via RollupColumnGenerator.Generate; direct edits to Columns are rejected by the validation hook.
Operator controls
| Operation | Mutation | Notes |
|---|---|---|
| Create | createRollupArchive(input) | Server derives TargetCkTypeId and Columns from the source archive + aggregations. |
| Freeze a range | freezeRollupArchive(rtId, until) | Monotonic — the new until must be ≥ the current FrozenUntil. The orchestrator stops producing buckets whose bucketEnd falls in the frozen range. |
| Unfreeze | unfreezeRollupArchive(rtId, acceptGaps) | Idempotent. acceptGaps is recorded for audit but the gap-detection guard is a follow-up. |
| Rewind the watermark | rewindRollupWatermark(rtId, toBucketEnd) | Re-aggregates from the given bucket boundary forward. Destructive: previously committed rows in the rewound range are temporarily out of sync until the orchestrator catches up. |
| Recompute a range | recomputeArchive(rtId, from, to, rtIdScope?) | Optimistic, no-downtime recompute of [from, to), optionally scoped to a single entity. Readers keep seeing a consistent snapshot throughout. See Rollup Recompute below. |
A RawArchive referenced by an active rollup cannot be deleted — drop the rollup first.
Rollup Recompute
When raw data changes after the fact — late values, corrections, re-ingests — the rollup rows derived from that window are stale. A recompute re-aggregates the affected windows. The hard requirement is that consumers (meshboards, pipelines, the API) keep working while a recompute runs: no 500, no aborted pipeline, and every read returns a consistent snapshot — either the previous values or the new ones, never a half-written mix.
For the engineer-facing internals — how a retroactive write is detected (RetroactiveWriteDetector, the consumed-watermark criterion), the dirty-windows ledger, the dependency-graph propagation, the periodic orchestrator, the optimistic atomic swap with an end-to-end sequence diagram, and a comparison of all four (re)compute paths — see Rollup Recompute Internals.
Triggers
| Trigger | Surface | Behaviour |
|---|---|---|
| Manual | recomputeArchive GraphQL mutation · POST /archives/{rollupRtId}/recompute · octo-cli RecomputeArchive | An operator forces a recompute over [from, to), optionally scoped to one entity (rtIdScope). Requires StreamDataAdmin. |
| Periodic | RecomputeOrchestratorHostedService (per-tenant tick) | Drains the dirty-windows ledger and recomputes only the windows actually marked dirty — not the whole archive. |
| Chain propagation | internal | After a rollup's recompute commits, its own dependents are marked dirty so a multi-level rollup-of-rollup chain rolls forward top-down. |
Event-driven automatic threshold reset is out of scope here (tracked separately); this feature only produces the dirty signal such a policy would consume.
Optimistic atomic swap (how consistency is guaranteed)
CrateDB has no multi-statement transaction, so the swap uses a per-window generation pointer:
- Each rollup table carries a
generationcolumn (BIGINT NOT NULL DEFAULT 0) that is part of the primary key. Normal forward aggregation always writesgeneration = 0. - A recompute aggregates the range into a per-archive staging table, then copies those rows into the live table stamped with the next generation
N+1. The previous generation stays in place and visible. - A tiny per-rollup side-table (
archive_<rtId>__genmap) holds the active generation per range. Flipping that pointer toN+1is a single-row write — the atomic commit point. - The read path injects
generation = CASE WHEN <range> THEN <activeGen> … ELSE 0 END, so every query resolves exactly one consistent generation per window. - After the flip a background sweep deletes the now-superseded generation's rows.
Failure semantics: a job that crashes before the flip leaves the staging table orphaned (reclaimed by name on the next run) and the live table + pointer untouched — readers keep seeing the previous values. A crash after the flip but before the sweep only leaves dead rows the next sweep reclaims. No partial state is ever observable.
Scoping
rtIdScope restricts the whole pipeline — staging aggregation, the generation pointer, and the post-flip sweep — to a single entity (metering point / stream). A late value for one stream therefore recomputes only that stream's windows and never touches unrelated entities sharing the same bucket. Omitting rtIdScope recomputes the entire range.
Coalescing
Overlapping triggers for the same archive coalesce: their ranges merge into one active job, and the superseded trigger is recorded with state Coalesced in the job history. There is at most one active recompute per archive at a time.
Interaction with rewind
rewindRollupWatermark re-aggregates a range forward at generation = 0. If that range had previously been recomputed, the active-generation pointer would otherwise keep readers pinned to the stale recomputed generation. The rewind therefore reconciles the generation map: it drops the pointers reaching past the rewind boundary and deletes the orphaned generation > 0 rows there, so the forward re-aggregation becomes authoritative again. This is a no-op when stream data is disabled or nothing was recomputed.
Job history & observability
Every recompute is recorded as a job snapshot, queryable newest-first (capped at 50):
- GraphQL —
runtime.streamData.recomputeJobsFor(rtId)returns[RecomputeJobInfo!]!. - REST —
GET /archives/{archiveRtId}/recompute-jobs. - octo-cli —
ListRecomputeJobs.
RecomputeJobInfo carries state (Running / Completed / Failed / Coalesced), rowsProcessed, windowsProcessed, startedAt, finishedAt, durationMs, and errorReason (set on failure). In addition, the per-rollup RollupArchiveInfo (via rollupsFor) exposes live health fields: recomputeInProgress, lastRecomputeStartedAt, lastRecomputeSuccessAt, lastRecomputeFailureAt, lastRecomputeFailureReason, dirtyWindowsPending, and pendingRecomputeRanges. Failures are also emitted through the archive audit trail.
Time-Range Archives
For externally pre-aggregated data with explicit [from, to) windows. There is no source archive and no orchestrator on this path; producers push rows directly:
- GraphQL mutation:
createTimeRangeArchive(input)— same admin-grade role guard as rollup create. - Pipeline node:
SaveTimeRangeStreamDataInArchive@1. - REST:
POST /api/v1/streamData/archives/{archiveRtId}/insertTimeRange.
Period on TimeRangeArchive is advisory only — it lets the studio render a default time range but is not enforced at insert time.
Computed Columns
A computed column on a raw or time-range archive is derived by a formula from one or more other columns of the same row, rather than supplied by the producer — e.g. powerFactor = activePower / apparentPower. The value is materialised as a real, typed CrateDB column, so it is natively aggregatable by rollups and visible to direct SQL consumers (Grafana).
- Where it is evaluated — server-side in the CrateDB repository on every ingest path (pipeline node, REST, SDK), so the value is identical regardless of producer. The formula uses the shared mXparser dialect (see the Formula Expressions reference under Communication).
- Result type — declared explicitly and cast back from the
doublemXparser computes:Boolean,Int,Int64,Double, orDateTime(ticks).Stringand non-scalar types are not supported. A formula / NaN / NULL-operand error storesNULLfor that cell; ingest never fails. - CK model —
CkArchiveColumncarriesName+Formula+ResultType+ the engine-managedComputedState(sinceSystem.StreamData1.5.0) andComputedVersion/PendingFormula(since 1.6.1 / 1.6.2 respectively). A column with aFormulais computed; one with aPathis ingested. The two are mutually exclusive.
Active-archive lifecycle (optimistic / atomic)
Adding, re-formulating, or removing a computed column works on an Activated archive without blocking ingest or breaking readers, reusing the optimistic semantics of Rollup Recompute:
- Add — the column is persisted
Pending, the physical column is added (ALTER TABLE ADD COLUMN), a backfill populates it across the existing rows while it stays hidden from readers, then it flips toActiveatomically. A backfill failure leaves itFailedand the previous archive state intact. - Change formula — the column keeps serving its previous values (version N) while the new formula is backfilled into a versioned physical column
{base}__v{N+1}; ingest dual-writes both during the backfill; then a single store write swaps the version pointer atomically. A failure reverts to the previous formula. - Remove — dropped from the logical projection; the physical column is left as a harmless nullable orphan the read path no longer projects.
Limitation (formula change). Because a formula change re-versions the column's physical name, changing the formula of a computed column that another computed column references is rejected (ComputedColumnInvalidException) — re-point or remove the dependent first. Direct-SQL / Grafana consumers that bind to the physical name must likewise follow the rename.
Rollups over computed columns
A CkRollupAggregation.SourcePath may target a source computed column by its Name — the rollup aggregates it natively like any ingested column (single- and multi-aggregate). A RollupArchive may also carry its own computed columns over its aggregated output columns.
API Surface
GraphQL — lifecycle mutations
Under mutation.streamData:
| Mutation | Returns | Description |
|---|---|---|
activateArchive(rtId) | ArchiveTransitionResult | Provisions the CrateDB table, transitions to Activated. |
disableArchive(rtId) | ArchiveTransitionResult | Activated → Disabled. Table is preserved. |
enableArchive(rtId) | ArchiveTransitionResult | Disabled → Activated. Re-validates column paths. |
retryArchiveActivation(rtId) | ArchiveTransitionResult | Failed → Activated. Idempotent (CREATE TABLE IF NOT EXISTS). |
deleteArchive(rtId) | Boolean | Drops the CrateDB table and soft-deletes the entity. Rejected if active rollups reference it. |
createRollupArchive(input) | OctoObjectId | Server-derived TargetCkTypeId + Columns. |
createTimeRangeArchive(input) | OctoObjectId | Operator-defined target type and columns. |
freezeRollupArchive(rtId, until) | ArchiveTransitionResult | Monotonic FrozenUntil set. |
unfreezeRollupArchive(rtId, acceptGaps) | ArchiveTransitionResult | Clears FrozenUntil. |
rewindRollupWatermark(rtId, toBucketEnd) | ArchiveTransitionResult | Re-aggregates from the given boundary. |
recomputeArchive(rtId, from, to, rtIdScope?) | RecomputeJobInfo | Triggers/coalesces an optimistic recompute of [from, to), optionally per entity. Returns the job snapshot. |
addComputedColumn(rtId, name, formula, resultType, indexed?) | ArchiveTransitionResult | Adds a computed column and backfills it (optimistic / atomic). |
updateComputedColumnFormula(rtId, name, formula) | ArchiveTransitionResult | Re-formulates a computed column (versioned backfill + atomic swap). Rejected when another computed column references it. |
removeComputedColumn(rtId, name) | ArchiveTransitionResult | Removes a computed column (physical column left as an orphan). |
The matching read query is runtime.streamData.recomputeJobsFor(rtId): [RecomputeJobInfo!]! (most recent jobs, newest first, capped at 50).
All mutations require StreamDataAdmin. Domain exceptions (§Exceptions below) surface as stable GraphQL error codes via error.extensions.OctoDetails.
CRUD on archives (List / Get / Update) is auto-generated from the CK model and lives under runtime.systemStreamDataArchives / …RawArchives / …RollupArchives / …TimeRangeArchives. The validation hook enforces immutability after activation.
GraphQL — path enumeration
query availableArchivePaths(ckTypeId: ID!, maxDepth: Int = 5): [ArchivePathInfo!]!
type ArchivePathInfo {
path: String! # "sensor.reading.value", "readings[*].value"
primitiveType: String # null when isRecord = true
isRecord: Boolean!
isArray: Boolean!
recordTypeId: String
inheritedFromCkTypeId: String # null if attribute belongs to ckTypeId itself
}
The studio's AttributePathPicker consumes this for archive create/edit. Admin-only.
REST — /api/v1/streamData
| Method | Path | Description |
|---|---|---|
| GET | /status | { instanceEnabled, tenantEnabled } for the current tenant. |
| POST | /enable / /disable | Tenant-level toggle. |
| POST | /archives/{archiveRtId}/activate | Same as GraphQL activateArchive. |
| POST | /archives/{archiveRtId}/disable / /enable / /retry | Status transitions. |
| DELETE | /archives/{archiveRtId} | Same as GraphQL deleteArchive. |
| POST | /archives/{rollupRtId}/freeze / /unfreeze / /rewind | Rollup-only controls. |
| POST | /archives/{rollupRtId}/recompute | Optimistic recompute of [from, to) (rtIdScope optional). Returns the job snapshot. |
| GET | /archives/{archiveRtId}/recompute-jobs | Recent recompute jobs (newest first) for debugging. |
| POST | /archives/{archiveRtId}/insertTimeRange | Bulk time-range insert. |
| GET | /archives/{archiveRtId}/rollups | Active rollups referencing this raw archive. |
Pipeline node — SaveStreamDataInArchive@1
A PathNodeConfiguration node with a required property ArchiveRtId : OctoObjectId. The node routes each incoming StreamDataPoint to the configured archive via IStreamDataRepository.InsertAsync. The companion SaveTimeRangeStreamDataInArchive@1 handles TimeRangeArchive.
The mesh-adapter service account holds StreamDataWriter. Targeting an inactive archive surfaces a clear pipeline error (ArchiveNotActivatedException).
SDK — IStreamDataRepository
// Data plane
Task InsertAsync(OctoObjectId archiveRtId, StreamDataPoint point);
Task InsertAsync(OctoObjectId archiveRtId, IEnumerable<StreamDataPoint> points);
Task InsertTimeRangeAsync(OctoObjectId archiveRtId, IEnumerable<TimeRangeStreamDataPoint> points);
Task<StreamDataQueryResult> ExecuteQueryAsync(OctoObjectId archiveRtId, StreamDataQueryOptions options);
Task<StreamDataQueryResult> ExecuteAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteGroupedAggregationQueryAsync(OctoObjectId archiveRtId, StreamDataGroupedAggregationQueryOptions options);
Task<StreamDataQueryResult> ExecuteDownsamplingQueryAsync(OctoObjectId archiveRtId, StreamDataDownsamplingQueryOptions options);
// Control plane (called by IArchiveLifecycleService)
Task EnsureArchiveCreatedAsync(CkArchive archive);
Task DeleteArchiveAsync(CkArchive archive);
Bulk inserts use all-or-nothing pre-validation: every point is validated before any SQL is sent; a violation throws RequiredAttributeMissingException carrying the index of the offending point — no row is written. A DB-level failure after pre-validation surfaces as exception.
Bulk Insert Semantics
- Resolve archive (single lookup), status check.
- Pre-validate every point in the batch — required-path coverage, type compatibility, path resolution. Stops at the first violation.
- On violation → throw with the offending index. No row written.
- On success → single multi-row
INSERT … ON CONFLICT …execution.
Pipeline callers see batch success or full failure; partial-commit scenarios are limited to transient DB failures (operational, see Reconciliation below).
Retention
Retention is partition-based and per-archive (work in progress — design captured here; some attributes are not yet exposed in the create form).
- DDL: archive tables are
PARTITIONED BY DATE_TRUNC('day', timestamp). Drop-partition is metadata-only on CrateDB. RawRetentionMson theArchivebase type defines the retention horizon.null= retain forever.- The
ArchiveRetentionScheduler(IHostedService, hourly) drops partitions older thannow() - RawRetentionMsper archive, per tenant. Failures are isolated; one bad archive does not block the rest.
For downsampling-style retention (raw rows folded into a coarser archive before deletion), use a RollupArchive pointing at the source archive — the rollup is the long-term store while the raw archive's retention can be aggressive.
Reconciliation
CkArchive entities live in MongoDB; archive tables live in CrateDB. There is no shared transaction. The lifecycle service keeps the two reconcilable without 2PC:
- All DDL is idempotent (
IF EXISTS/IF NOT EXISTS). - Operation order is Crate-first, Mongo-last:
ActivaterunsCREATE TABLE, then flipsStatusin Mongo.DeleterunsDROP TABLE, then soft-deletes the entity. Mongo is the source of truth. - Every data-plane call checks
Status == Activatedbefore touching Crate. While step 1 has run but step 2 has not, the API rejects withArchiveNotActivatedException. - On startup the
ArchiveReconcilerenumeratesActivatedarchives per tenant and (re-)provisions any missing CrateDB tables. Drift in the other direction (table without entity) is logged but not auto-dropped.
Concurrency
| Scenario | Behavior |
|---|---|
Two parallel Activate on the same archive | Serialized via RepositoryDistributedLockService. The second call observes Activated and returns idempotently. |
Disable while inserts are in flight | In-flight inserts complete. New inserts after the flip raise ArchiveNotActivatedException. |
Delete while inserts are in flight | Same as Disable, plus the DROP TABLE may retry once with a short backoff if the table is briefly locked. |
| Activate fails mid-DDL | Status → Failed. EnsureArchiveCreatedAsync is idempotent (CREATE TABLE IF NOT EXISTS), so retry is safe. |
| Bulk insert on disabled archive | Whole batch rejected on the single status lookup. |
Exception Catalog
All in Meshmakers.Octo.Runtime.Contracts.StreamData. Common base: StreamDataException (extends RuntimeRepositoryException). Surfaced via GraphQL error.extensions.OctoDetails and via REST status codes.
| Exception | Thrown when |
|---|---|
StreamDataException | Base — generic StreamData failure. |
StreamDataNotEnabledException | Instance or tenant flag is false on an operation that requires it. |
ArchiveNotFoundException | archiveRtId does not resolve to an Archive entity. |
ArchiveNotActivatedException | Archive exists but Status ≠ Activated on insert / query. |
ArchiveSchemaImmutableException | Post-activation update tries to change schema-relevant fields. |
ArchivePathInvalidException | A CkArchiveColumn.Path cannot be resolved against TargetCkTypeId. |
ArchiveColumnTypeUnsupportedException | An attribute type cannot be mapped to a CrateDB column type. |
RequiredAttributeMissingException | Insert lacks a value for a required: true scalar path. Carries the offending point index. |
ArchiveActivationFailedException | DDL execution failed during activation. Wraps the underlying SQL error. |
InvalidArchiveStateTransitionException | Lifecycle mutation called against an incompatible status. |
RollupSourceInUseException | deleteArchive invoked on a raw archive that still has active rollups. |
RollupSourceMissingException | createRollupArchive references a source archiveRtId that does not exist or is not yet activated. |
Observability
Metrics
| Metric | Type | Tags |
|---|---|---|
streamdata.archive.count | Gauge | tenant, status |
streamdata.archive.status_transitions | Counter | tenant, archive, from, to |
streamdata.archive.activation_duration_ms | Histogram | tenant, archive, outcome |
streamdata.insert.duration_ms | Histogram | tenant, archive, batch_size_bucket |
streamdata.insert.points | Counter | tenant, archive |
streamdata.insert.required_violations | Counter | tenant, archive |
streamdata.query.duration_ms | Histogram | tenant, archive, query_type |
streamdata.query.rows_returned | Histogram | tenant, archive, query_type |
streamdata.crate.connections.open | Gauge | (instance-wide) |
streamdata.crate.retention_partitions_dropped | Counter | tenant, archive |
streamdata.crate.rollup_rows_inserted | Counter | tenant, rollup_archive |
Traces
Activity sources: Meshmakers.Octo.StreamData (top-level operations) and Meshmakers.Octo.StreamData.Crate (DB calls). Span attributes: streamdata.archive.rtid, streamdata.archive.target_cktype, streamdata.tenant.
Audit trail
Status transitions are emitted as RtEvent records through IEventRepository with source = RtEventSourcesEnum.StreamData. The triggering user is encoded in the event message; auto-transitions use system:reconciliation. The studio's archive details view exposes the history via the standard event query path.
Creating an Archive
From the CLI
# Tenant-level enable (required once per tenant)
octo-cli -c EnableStreamData
# Archives are runtime entities — create them through the studio, GraphQL,
# or by rt-importing a runtime-model YAML. Then:
octo-cli -c ActivateArchive -id 69fda707d47638c68edc7fea
See octo-cli — ActivateArchive and related archive commands in the sidebar for the full command surface.
From the Studio
Navigate to Repository → Stream Data Archives. The list shows every archive on the active tenant with a status badge and per-row actions (Activate, Disable, Enable, Retry, Delete, Copy ID). The New Archive, New Rollup, and New Time-Range entry points walk through the create flow with the AttributePathPicker for column selection.
See Studio: Archives for the UI walkthrough.
From GraphQL
# 1. Create the archive via the auto-generated runtime mutation
mutation {
runtime {
systemStreamDataRawArchives {
create(input: {
rtWellKnownName: "EnergyMeterArchive",
targetCkTypeId: "Industry.Energy/EnergyMeter",
columns: [
{ path: "voltage", required: true, indexed: true },
{ path: "current", required: true, indexed: true },
{ path: "powerFactor", required: false, indexed: false }
]
}) { item { rtId } }
}
}
}
# 2. Activate
mutation {
streamData {
activateArchive(rtId: "65d5c447b420da3fb1238201") {
rtId
status
}
}
}
Best Practices
- One archive per concern. Different retention, different query patterns, different consumers → different archives.
- Be explicit about required. Required scalars become
NOT NULLcolumns and are validated app-side before insert. - Index sparingly. Indexes cost storage and insert throughput. Default to indexed; opt out for columns that are read but never filtered.
- Roll up early. A
RollupArchiveis far cheaper to query over long horizons than a raw scan. - Pick alignment for the use case. Energy / EDA workflows want calendar alignments (monthly, weekly); engineering telemetry usually wants
FixedSize. - Freeze before bulk imports. When backfilling a source archive's history, freeze any dependent rollups, finish the import, then unfreeze with
acceptGaps: false(or rewind the watermark) to re-aggregate cleanly. - Soft-delete first. Disable an archive to confirm no consumer breaks before deleting.
See Also
- Rollups & recompute — detection criterion, dirty ledger, dependency graph, optimistic atomic swap
- Stream Data Access (Overview) — query surfaces (typed / transient / persisted)
- Simple Query — raw time-series retrieval
- Aggregation Queries — server-side aggregation
- Downsampling Query — bucketed downsampling
- octo-cli — EnableStreamData and related archive commands (Activate / Disable / Enable / Retry / Delete) — CLI commands for archive operations
- Studio: Archives — UI walkthrough in the Refinery Studio