Rollups & recompute
This page is the engineer-facing companion to Stream Data Archives. It explains how a RollupArchive keeps its derived aggregations correct as the underlying data changes — the detection criterion, the dirty-windows ledger, the dependency graph, the periodic orchestrator, and the optimistic atomic swap that keeps readers on a consistent snapshot throughout.
If you only need the operator-facing walkthrough (the Studio buttons and when to use them), read the Studio: Archives — Rollup-Specific Actions section instead. This page is about the mechanics underneath those buttons.
- CK model —
System.StreamData(since1.6.2); the rollup runtime-state attributes and theRecomputeJobentity are available since this model version. - Engine —
octo-construction-kit-engine+octo-construction-kit-engine-mongodb(orchestrators, dirty-detection, dependency graph). - API & persistence —
octo-asset-repo-services(GraphQL/REST surface, CrateDB generation tables, MongoDB runtime state).
The vocabulary used throughout this page — tick, bucket (and bucket size / alignment), watermark (and watermark lag / consumed watermark), dirty window, generation, source vs rollup archive, forward aggregation / recompute / rewind, chain propagation, and coalesce — is defined once in the central stream-data glossary. First uses below link straight to the relevant entry. The running example throughout is the voest energy case: 15-minute source datapoints rolled up to daily / hourly / etc. aggregations.
The three ways rollup data gets (re)computed
A rollup's stored rows can be produced or replaced by exactly three code paths. Everything else (the dirty ledger, chain propagation, the Studio buttons) ultimately drives one of these three.
- Forward aggregation. The
RollupOrchestratorticks and aggregates the next closed bucket[watermark, now - watermarkLag), one bucket per tick. It writes an upsert atgeneration 0and advances the watermark (LastAggregatedBucketEnd). This is the steady-state path — it only ever moves forward. - Recompute (manual or periodic). A range-scoped
[from, to)re-aggregation that runs as an optimistic atomic swap: it is reader-safe (queries keep returning a consistent snapshot throughout) and produces aRecomputeJobrecord. This is the path used to correct history without downtime. - Rewind watermark. Moves the forward high-water mark backwards. The orchestrator then re-aggregates forward over the rewound range, overwriting the
generation 0upsert in place. This is not reader-safe — readers can observe partially re-aggregated buckets — and is therefore destructive.
In addition, an automatic dirty-detection mechanism decides when a recompute is needed. It does not itself rewrite any rows; it records intent (a dirty window) that the periodic orchestrator later turns into a recompute.
Forward aggregation only closes a bucket once it is fully in the past (< now - watermarkLag), so a coarse partial period — this month so far, this year so far — would otherwise show no value until the period ends. To keep those partial-period totals live, the RollupOrchestrator additionally re-aggregates the current open (in-progress) bucket on each tick, writing it at generation 0 without advancing the watermark. It is controlled by the RefreshOpenBucket option (StreamData:Rollup:RefreshOpenBucket, default true) and skips a bucket that is frozen (see Freeze).
Because the open bucket is the forward path's exclusive domain, recompute deliberately excludes it: a manual/periodic recompute caps its to at the start of the bucket containing now, leaving only fully-closed buckets in range (an all-open range becomes a Completed no-op). If a recompute did claim the open bucket it would flip the genmap generation for that window to N+1, and the read path (highest generation wins) would then mask the refresh's generation 0 write — freezing the partial-period total at the recompute instead of tracking new source data. Capping the range keeps the open bucket on the always-fresh generation 0 lane. This matters most in a cascade (rollup-of-rollup): the open bucket of each coarse level (daily → monthly → yearly) is refreshed cheaply from a handful of rows in the level below, so the current period stays current all the way up the ladder.
Comparison
| Aspect | Forward aggregation | Automatic dirty-detection | Recompute | Rewind watermark |
|---|---|---|---|---|
| What triggers it | RollupOrchestrator tick (steady state) | Every archive write, via RetroactiveWriteDetector | recomputeArchive (Manual) · RecomputeOrchestratorHostedService (Periodic) · chain (ChainPropagation) | rewindRollupWatermark (operator) |
| What it does | Aggregates the next closed bucket | Records a CkArchiveDirtyWindow; rewrites nothing | Re-aggregates [from, to) into a new generation, then flips the pointer | Moves LastAggregatedBucketEnd back; forward path re-runs |
| Reader-safe | Yes (append/upsert at gen 0) | N/A (no row changes) | Yes — consistent old-or-new snapshot, never mixed | No — readers can see partial re-aggregation |
| Scope | One bucket per tick, forward only | A [min ts .. max ts + 1 tick) window | A bounded [from, to) range, optionally rtIdScope-limited to one entity | Everything from the rewind boundary forward |
| Observability | LastAggregatedBucketEnd | Archive.DirtyWindows / PendingRecomputeRanges | RecomputeJob history + Archive.LastRecompute* fields | LastAggregatedBucketEnd change |
| Destructive | No | No | No | Yes |
| When to use | Always on; nothing to do | Always on; nothing to do | Correct historical data while dashboards/pipelines stay live | Last resort / bulk backfills where readers can tolerate a gap |
Prefer Recompute for corrections. Reach for Rewind only when you are deliberately re-running a large forward range (for example after a bulk backfill into the source archive) and you can tolerate readers briefly seeing in-progress buckets.
How a change in a base archive is detected
This is the question engineers ask most often: given that nobody tells the rollup "row X changed", how does it know it has to recompute? The answer is purely timestamp-vs-watermark at insert time, and it is value-agnostic.
On every write to an archive, RetroactiveWriteDetector.TryBuildDirtyWindow runs. For the archive being written, it computes the consumed watermark:
consumedWatermark= the minimumLastAggregatedBucketEndacross the source archive's non-deleted dependent rollups — i.e. the earliest point that any dependent has already aggregated past.
Then, for each datapoint timestamp ts in the write:
- If
consumedWatermarkis null (no dependent has consumed anything yet) → nothing is dirty. There is no aggregation to invalidate. - If
ts >= consumedWatermark→ the write is a forwardAppendand is ignored by the detector. The normal forward-aggregation path will pick it up. - If
ts < consumedWatermark→ the write is aRetroactiveModify: it lands inside a window a dependent has already aggregated, so that aggregation is now stale. The detector records aCkArchiveDirtyWindowspanning[min(retroactive ts) .. max(retroactive ts) + 1 tick).
< not <=)Detection is strictly ts < consumedWatermark. A write whose timestamp lands exactly at the watermark is treated as forward (Append), not retroactive. Keep this in mind when reasoning about edge cases at a bucket boundary.
It does not diff values
The detector never compares values. An idempotent re-ingest of byte-for-byte identical data into an already-consumed window still marks that window dirty. This is a deliberate safe over-approximation: it can never miss a real change, at the cost of occasionally scheduling a recompute that produces identical output.
A no-op / threshold optimization (skip the recompute when the re-ingested values are unchanged, or below a delta threshold) is tracked separately as AB#4196. Until then, treat any retroactive write as a guaranteed dirty window.
The dirty-dependents ledger and chain propagation
Detection records intent; propagation and recompute act on it.
- Ledger records.
CkArchiveDirtyWindow(a detected stale window on a source archive) andCkArchiveRecomputeRange(a range queued for a specific dependent) are the two ledger record types. They live as runtime-state attributes on the archive entity (see State & observability), not as CrateDB columns. - Dependency graph.
RollupDependencyGraphwalks the transitive set of dependents, so a multi-level rollup-of-rollup chain is handled top-down. The walk is cycle-safe — a misconfigured cyclic dependency cannot send the propagation into an infinite loop. - Periodic orchestrator.
RecomputeOrchestratorHostedServiceis aBackgroundService(default 60 s tick, registered in DI). On each tick it propagates outstanding dirty windows into each affected dependent'sPendingRecomputeRanges, then recomputes only those ranges — never the whole archive. These runs carry the triggerPeriodic. - Chain propagation. After a rollup's recompute commits successfully, that rollup's direct dependents are enqueued again, this time with trigger
ChainPropagation. This is what rolls a correction forward through a rollup-of-rollup chain. - Coalescing. Overlapping triggers for the same archive coalesce — their ranges merge into a single active job and the superseded trigger is recorded as
Coalesced. There is at most one active recompute per archive at a time.
End-to-end flow
The full path from a late write to a fully-propagated correction:
Readers querying the rollup at any instant resolve their generation through the genmap (see below) and therefore see either the old snapshot or the new one — never a half-written mix.
Optimistic atomic swap
The recompute is described internally as "stable, optimistic": stable because readers are never blocked or shown a partial result, optimistic because it computes the new answer alongside the old and only commits with a single pointer flip.
The machinery:
-
Each rollup CrateDB table carries a
generation BIGINTcolumn that is part of the primary key. Forward aggregation always writesgeneration 0. -
A side-table,
archive_<rtId>__genmap, holds the active generation per window range. -
A recompute computes its output into a per-job staging table at generation
N+1, copies those rows into the live table (so both generations coexist briefly), then flips the genmap pointer toN+1. That single-row flip is the atomic commit point. Finally it sweeps the superseded generations. -
The read path injects the active generation into every query:
generation = CASE WHEN <range> THEN <gen> ELSE 0 ENDso a reader transparently picks up generation
0outside the recomputed range and the active recomputed generation inside it.
Consequences:
- Readers always see a consistent old-or-new snapshot, never mixed.
- A crash before the flip leaves the previous values fully intact — the staging rows and unreferenced
gen N+1rows are just garbage to be swept; nothing the reader can see changed. - This is exactly why Recompute is reader-safe and Rewind is not: rewind overwrites
generation 0in place with no second generation to fall back on.
Reference enums
| Enum | Values | Meaning |
|---|---|---|
CkRecomputeTrigger | Manual · Periodic · ChainPropagation | Why a recompute job exists: operator-requested, drained from the dirty ledger by the periodic orchestrator, or enqueued by a successful upstream recompute. |
CkRecomputeJobState | Pending · Running · Swapping · Completed · Failed · Coalesced | Job lifecycle. Swapping is the genmap-flip window; Coalesced means the job was merged into an overlapping active job. |
| Change kind | Append · RetroactiveModify | Classification of an individual write by RetroactiveWriteDetector: forward (ignored) vs. retroactive (records a dirty window). |
| Change source | Manual · Pipeline · Import | Provenance of the write that triggered detection — surfaced for audit. |
State & observability
All rollup recompute state is runtime-state attributes on the archive entity in MongoDB and the RecomputeJob entity history — not CrateDB columns (the generation column and genmap are an implementation detail of the data plane, not the observable state).
Attribute on Archive | What it holds |
|---|---|
Archive.DirtyWindows | Detected stale windows not yet recomputed |
Archive.PendingRecomputeRanges | Ranges queued for this archive's next recompute |
Archive.RecomputeInProgress | Whether a recompute is currently running |
Archive.LastRecomputeStartedAt | Start of the most recent recompute |
Archive.LastRecomputeSuccessAt | Completion of the most recent successful recompute |
Archive.LastRecomputeFailureAt | Time of the most recent failure |
Archive.LastRecomputeFailureReason | Reason string for the most recent failure |
These (and the per-job history) are surfaced through:
- GraphQL —
rollupsFor(archiveRtId)for the live per-rollup health fields, andrecomputeJobsFor(archiveRtId)for the job history (newest first). - REST —
GET /archives/{archiveRtId}/rollupsandGET /archives/{archiveRtId}/recompute-jobs. - Refinery Studio — the rollups panel on the archive-detail view (the Recompute column and the recompute-job history table).
Triggers and commands
| Operation | GraphQL | octo-cli |
|---|---|---|
| Recompute a range | recomputeArchive(rtId, from, to, rtIdScope?) | RecomputeArchive |
| List recompute jobs | recomputeJobsFor(archiveRtId) | ListRecomputeJobs |
| Freeze / unfreeze | freezeRollupArchive / unfreezeRollupArchive | FreezeRollupArchive / UnfreezeRollupArchive |
| Rewind watermark | rewindRollupWatermark(rtId, toBucketEnd) | RewindRollupWatermark |
Activating a rollup does not retroactively aggregate the source archive's existing history — the watermark starts at now - bucketSize, so only data from activation forward is rolled up automatically. Populating history is an explicit operator action (a recomputeArchive over the historical range, or a rewind). This is by design: an implicit full-history scan on activation would be an unbounded, surprising operation.
Lifecycle interactions: activation, disable, delete (AB#4300)
Queued recompute work only ever drains for an Activated rollup — the periodic orchestrator skips any rollup that is not Activated. Two guards keep that from producing a job that can never make progress:
- Fail-fast on a non-activated rollup. A
backfillRolluporrecomputeArchiveagainst a rollup that isCreated,DisabledorFailedreturns aFailedjob immediately, with a reason such as "Archive is not activated (status Disabled) — activate it before backfilling." — instead of pre-creating aPendingjob + range that the drain would silently skip forever. Activate the rollup first, then queue the work. - Purge on disable / delete. Disabling or deleting an archive purges its queued recompute work: the
PendingRecomputeRangesare cleared and the single active (Pending/Running/Swapping) job is terminated asFailedwith an explanatory reason. This prevents a lingering un-processablePendingjob, and stops a delete-then-re-import that reuses the same runtime id from inheriting stale ranges/jobs.
A Pending job that was queued before a rollup was disabled (i.e. under an earlier engine version, without these guards) is not rewritten retroactively — but it self-heals the moment the rollup is re-activated: the next orchestrator tick adopts that Pending job and drives it to Completed.
See Also
- Stream Data Archives (Tech Guide) — the full archive model, schema, lifecycle and API surface (this page expands its Rollup Recompute section)
- Studio: Archives — Rollup-Specific Actions — the operator-facing UI walkthrough
- Stream Data Access (Overview) — querying the captured data
- Stream-data terminology glossary — tick, bucket, watermark, generation, and the rest
- octo-cli — RecomputeArchive and related rollup commands