Skip to main content

Rollups & recompute

This page is the engineer-facing companion to Stream Data Archives. It explains how a RollupArchive keeps its derived aggregations correct as the underlying data changes — the detection criterion, the dirty-windows ledger, the dependency graph, the periodic orchestrator, and the optimistic atomic swap that keeps readers on a consistent snapshot throughout.

If you only need the operator-facing walkthrough (the Studio buttons and when to use them), read the Studio: Archives — Rollup-Specific Actions section instead. This page is about the mechanics underneath those buttons.

Where this lives
  • CK modelSystem.StreamData (since 1.6.2); the rollup runtime-state attributes and the RecomputeJob entity are available since this model version.
  • Engineocto-construction-kit-engine + octo-construction-kit-engine-mongodb (orchestrators, dirty-detection, dependency graph).
  • API & persistenceocto-asset-repo-services (GraphQL/REST surface, CrateDB generation tables, MongoDB runtime state).
Terminology

The vocabulary used throughout this page — tick, bucket (and bucket size / alignment), watermark (and watermark lag / consumed watermark), dirty window, generation, source vs rollup archive, forward aggregation / recompute / rewind, chain propagation, and coalesce — is defined once in the central stream-data glossary. First uses below link straight to the relevant entry. The running example throughout is the voest energy case: 15-minute source datapoints rolled up to daily / hourly / etc. aggregations.

The three ways rollup data gets (re)computed

A rollup's stored rows can be produced or replaced by exactly three code paths. Everything else (the dirty ledger, chain propagation, the Studio buttons) ultimately drives one of these three.

  1. Forward aggregation. The RollupOrchestrator ticks and aggregates the next closed bucket [watermark, now - watermarkLag), one bucket per tick. It writes an upsert at generation 0 and advances the watermark (LastAggregatedBucketEnd). This is the steady-state path — it only ever moves forward.
  2. Recompute (manual or periodic). A range-scoped [from, to) re-aggregation that runs as an optimistic atomic swap: it is reader-safe (queries keep returning a consistent snapshot throughout) and produces a RecomputeJob record. This is the path used to correct history without downtime.
  3. Rewind watermark. Moves the forward high-water mark backwards. The orchestrator then re-aggregates forward over the rewound range, overwriting the generation 0 upsert in place. This is not reader-safe — readers can observe partially re-aggregated buckets — and is therefore destructive.

In addition, an automatic dirty-detection mechanism decides when a recompute is needed. It does not itself rewrite any rows; it records intent (a dirty window) that the periodic orchestrator later turns into a recompute.

Open-bucket refresh (AB#4306)

Forward aggregation only closes a bucket once it is fully in the past (< now - watermarkLag), so a coarse partial period — this month so far, this year so far — would otherwise show no value until the period ends. To keep those partial-period totals live, the RollupOrchestrator additionally re-aggregates the current open (in-progress) bucket on each tick, writing it at generation 0 without advancing the watermark. It is controlled by the RefreshOpenBucket option (StreamData:Rollup:RefreshOpenBucket, default true) and skips a bucket that is frozen (see Freeze).

Because the open bucket is the forward path's exclusive domain, recompute deliberately excludes it: a manual/periodic recompute caps its to at the start of the bucket containing now, leaving only fully-closed buckets in range (an all-open range becomes a Completed no-op). If a recompute did claim the open bucket it would flip the genmap generation for that window to N+1, and the read path (highest generation wins) would then mask the refresh's generation 0 write — freezing the partial-period total at the recompute instead of tracking new source data. Capping the range keeps the open bucket on the always-fresh generation 0 lane. This matters most in a cascade (rollup-of-rollup): the open bucket of each coarse level (daily → monthly → yearly) is refreshed cheaply from a handful of rows in the level below, so the current period stays current all the way up the ladder.

Comparison

AspectForward aggregationAutomatic dirty-detectionRecomputeRewind watermark
What triggers itRollupOrchestrator tick (steady state)Every archive write, via RetroactiveWriteDetectorrecomputeArchive (Manual) · RecomputeOrchestratorHostedService (Periodic) · chain (ChainPropagation)rewindRollupWatermark (operator)
What it doesAggregates the next closed bucketRecords a CkArchiveDirtyWindow; rewrites nothingRe-aggregates [from, to) into a new generation, then flips the pointerMoves LastAggregatedBucketEnd back; forward path re-runs
Reader-safeYes (append/upsert at gen 0)N/A (no row changes)Yes — consistent old-or-new snapshot, never mixedNo — readers can see partial re-aggregation
ScopeOne bucket per tick, forward onlyA [min ts .. max ts + 1 tick) windowA bounded [from, to) range, optionally rtIdScope-limited to one entityEverything from the rewind boundary forward
ObservabilityLastAggregatedBucketEndArchive.DirtyWindows / PendingRecomputeRangesRecomputeJob history + Archive.LastRecompute* fieldsLastAggregatedBucketEnd change
DestructiveNoNoNoYes
When to useAlways on; nothing to doAlways on; nothing to doCorrect historical data while dashboards/pipelines stay liveLast resort / bulk backfills where readers can tolerate a gap
Rule of thumb

Prefer Recompute for corrections. Reach for Rewind only when you are deliberately re-running a large forward range (for example after a bulk backfill into the source archive) and you can tolerate readers briefly seeing in-progress buckets.

How a change in a base archive is detected

This is the question engineers ask most often: given that nobody tells the rollup "row X changed", how does it know it has to recompute? The answer is purely timestamp-vs-watermark at insert time, and it is value-agnostic.

On every write to an archive, RetroactiveWriteDetector.TryBuildDirtyWindow runs. For the archive being written, it computes the consumed watermark:

consumedWatermark = the minimum LastAggregatedBucketEnd across the source archive's non-deleted dependent rollups — i.e. the earliest point that any dependent has already aggregated past.

Then, for each datapoint timestamp ts in the write:

  • If consumedWatermark is null (no dependent has consumed anything yet) → nothing is dirty. There is no aggregation to invalidate.
  • If ts >= consumedWatermark → the write is a forward Append and is ignored by the detector. The normal forward-aggregation path will pick it up.
  • If ts < consumedWatermark → the write is a RetroactiveModify: it lands inside a window a dependent has already aggregated, so that aggregation is now stale. The detector records a CkArchiveDirtyWindow spanning [min(retroactive ts) .. max(retroactive ts) + 1 tick).
Boundary nuance (< not <=)

Detection is strictly ts < consumedWatermark. A write whose timestamp lands exactly at the watermark is treated as forward (Append), not retroactive. Keep this in mind when reasoning about edge cases at a bucket boundary.

It does not diff values

The detector never compares values. An idempotent re-ingest of byte-for-byte identical data into an already-consumed window still marks that window dirty. This is a deliberate safe over-approximation: it can never miss a real change, at the cost of occasionally scheduling a recompute that produces identical output.

Future optimization — AB#4196

A no-op / threshold optimization (skip the recompute when the re-ingested values are unchanged, or below a delta threshold) is tracked separately as AB#4196. Until then, treat any retroactive write as a guaranteed dirty window.

The dirty-dependents ledger and chain propagation

Detection records intent; propagation and recompute act on it.

  • Ledger records. CkArchiveDirtyWindow (a detected stale window on a source archive) and CkArchiveRecomputeRange (a range queued for a specific dependent) are the two ledger record types. They live as runtime-state attributes on the archive entity (see State & observability), not as CrateDB columns.
  • Dependency graph. RollupDependencyGraph walks the transitive set of dependents, so a multi-level rollup-of-rollup chain is handled top-down. The walk is cycle-safe — a misconfigured cyclic dependency cannot send the propagation into an infinite loop.
  • Periodic orchestrator. RecomputeOrchestratorHostedService is a BackgroundService (default 60 s tick, registered in DI). On each tick it propagates outstanding dirty windows into each affected dependent's PendingRecomputeRanges, then recomputes only those ranges — never the whole archive. These runs carry the trigger Periodic.
  • Chain propagation. After a rollup's recompute commits successfully, that rollup's direct dependents are enqueued again, this time with trigger ChainPropagation. This is what rolls a correction forward through a rollup-of-rollup chain.
  • Coalescing. Overlapping triggers for the same archive coalesce — their ranges merge into a single active job and the superseded trigger is recorded as Coalesced. There is at most one active recompute per archive at a time.

End-to-end flow

The full path from a late write to a fully-propagated correction:

Readers querying the rollup at any instant resolve their generation through the genmap (see below) and therefore see either the old snapshot or the new one — never a half-written mix.

Optimistic atomic swap

The recompute is described internally as "stable, optimistic": stable because readers are never blocked or shown a partial result, optimistic because it computes the new answer alongside the old and only commits with a single pointer flip.

The machinery:

  • Each rollup CrateDB table carries a generation BIGINT column that is part of the primary key. Forward aggregation always writes generation 0.

  • A side-table, archive_<rtId>__genmap, holds the active generation per window range.

  • A recompute computes its output into a per-job staging table at generation N+1, copies those rows into the live table (so both generations coexist briefly), then flips the genmap pointer to N+1. That single-row flip is the atomic commit point. Finally it sweeps the superseded generations.

  • The read path injects the active generation into every query:

    generation = CASE WHEN <range> THEN <gen> ELSE 0 END

    so a reader transparently picks up generation 0 outside the recomputed range and the active recomputed generation inside it.

Consequences:

  • Readers always see a consistent old-or-new snapshot, never mixed.
  • A crash before the flip leaves the previous values fully intact — the staging rows and unreferenced gen N+1 rows are just garbage to be swept; nothing the reader can see changed.
  • This is exactly why Recompute is reader-safe and Rewind is not: rewind overwrites generation 0 in place with no second generation to fall back on.

Reference enums

EnumValuesMeaning
CkRecomputeTriggerManual · Periodic · ChainPropagationWhy a recompute job exists: operator-requested, drained from the dirty ledger by the periodic orchestrator, or enqueued by a successful upstream recompute.
CkRecomputeJobStatePending · Running · Swapping · Completed · Failed · CoalescedJob lifecycle. Swapping is the genmap-flip window; Coalesced means the job was merged into an overlapping active job.
Change kindAppend · RetroactiveModifyClassification of an individual write by RetroactiveWriteDetector: forward (ignored) vs. retroactive (records a dirty window).
Change sourceManual · Pipeline · ImportProvenance of the write that triggered detection — surfaced for audit.

State & observability

All rollup recompute state is runtime-state attributes on the archive entity in MongoDB and the RecomputeJob entity historynot CrateDB columns (the generation column and genmap are an implementation detail of the data plane, not the observable state).

Attribute on ArchiveWhat it holds
Archive.DirtyWindowsDetected stale windows not yet recomputed
Archive.PendingRecomputeRangesRanges queued for this archive's next recompute
Archive.RecomputeInProgressWhether a recompute is currently running
Archive.LastRecomputeStartedAtStart of the most recent recompute
Archive.LastRecomputeSuccessAtCompletion of the most recent successful recompute
Archive.LastRecomputeFailureAtTime of the most recent failure
Archive.LastRecomputeFailureReasonReason string for the most recent failure

These (and the per-job history) are surfaced through:

  • GraphQLrollupsFor(archiveRtId) for the live per-rollup health fields, and recomputeJobsFor(archiveRtId) for the job history (newest first).
  • RESTGET /archives/{archiveRtId}/rollups and GET /archives/{archiveRtId}/recompute-jobs.
  • Refinery Studio — the rollups panel on the archive-detail view (the Recompute column and the recompute-job history table).

Triggers and commands

OperationGraphQLocto-cli
Recompute a rangerecomputeArchive(rtId, from, to, rtIdScope?)RecomputeArchive
List recompute jobsrecomputeJobsFor(archiveRtId)ListRecomputeJobs
Freeze / unfreezefreezeRollupArchive / unfreezeRollupArchiveFreezeRollupArchive / UnfreezeRollupArchive
Rewind watermarkrewindRollupWatermark(rtId, toBucketEnd)RewindRollupWatermark
Activation does not backfill history

Activating a rollup does not retroactively aggregate the source archive's existing history — the watermark starts at now - bucketSize, so only data from activation forward is rolled up automatically. Populating history is an explicit operator action (a recomputeArchive over the historical range, or a rewind). This is by design: an implicit full-history scan on activation would be an unbounded, surprising operation.

Lifecycle interactions: activation, disable, delete (AB#4300)

Queued recompute work only ever drains for an Activated rollup — the periodic orchestrator skips any rollup that is not Activated. Two guards keep that from producing a job that can never make progress:

  • Fail-fast on a non-activated rollup. A backfillRollup or recomputeArchive against a rollup that is Created, Disabled or Failed returns a Failed job immediately, with a reason such as "Archive is not activated (status Disabled) — activate it before backfilling." — instead of pre-creating a Pending job + range that the drain would silently skip forever. Activate the rollup first, then queue the work.
  • Purge on disable / delete. Disabling or deleting an archive purges its queued recompute work: the PendingRecomputeRanges are cleared and the single active (Pending/Running/Swapping) job is terminated as Failed with an explanatory reason. This prevents a lingering un-processable Pending job, and stops a delete-then-re-import that reuses the same runtime id from inheriting stale ranges/jobs.
Existing ghosts self-heal

A Pending job that was queued before a rollup was disabled (i.e. under an earlier engine version, without these guards) is not rewritten retroactively — but it self-heals the moment the rollup is re-activated: the next orchestrator tick adopts that Pending job and drives it to Completed.

See Also