
Incremental DAG Caching for Cohort Generation
Source:vignettes/a08_dag-caching.Rmd
a08_dag-caching.RmdOverview
When iterating on cohort definitions, you often change one part of a definition and re-run. Without caching, the entire pipeline re-executes from scratch: concept set expansion, primary events, qualified events, inclusion rules, and final cohort construction – even for the parts that haven’t changed.
atlasCohortGenerator implements incremental DAG
caching: a system that persists intermediate computation tables
in the database and skips recomputation of unchanged nodes. This is the
same principle behind build systems like Make and Bazel –
content-addressable caching with automatic invalidation via Merkle-tree
hashing.
This vignette explains:
- How caching works (Merkle-tree hashing, the registry, and cache-aware execution)
- How to enable caching in your workflow
- What gets cached vs. what’s always recomputed
- Cache management (inspection, garbage collection, clearing)
- Correctness guarantees
How It Works
The Execution DAG
Each cohort definition is decomposed into a directed acyclic graph (DAG) of typed computation nodes:
concept_set (CS)
|
primary_events (PE)
|
qualified_events (QE)
|
inclusion_rule (IR) [0..N]
|
included_events (IE)
|
cohort_exit (CE)
|
final_cohort (FC)
Each node is identified by a content hash – a deterministic 16-character hex string derived from the node’s definition. Two nodes with the same definition produce the same hash, regardless of which cohort they came from.
Merkle-Tree Hashing
The critical property that makes caching safe is Merkle-tree hashing: each node’s hash incorporates the hashes of its dependencies, not just its own definition.
For example, a qualified_events node’s hash is computed
from:
- Its own parameters (qualified limit type, sort order)
- The hash of its parent
primary_eventsnode - The hash of any additional criteria group
This means if you change a concept set, the change propagates up through the entire DAG:
CS (concept_id=123) --> hash: a1b2c3d4...
PE (uses CS hash) --> hash: e5f6a7b8...
QE (uses PE hash) --> hash: 1234abcd...
...
CS (concept_id=999) --> hash: ff00ee11... <-- changed
PE (uses CS hash) --> hash: 2233aabb... <-- also changed (different dep hash)
QE (uses PE hash) --> hash: ccdd4455... <-- also changed
...
No explicit invalidation logic is needed. A change anywhere automatically produces different hashes for all downstream nodes.
The Cache Registry
The cache registry is a database table
(dag_cache_registry) that maps node hashes to materialized
table names:
| Column | Description |
|---|---|
node_hash |
16-char hex content hash (primary key) |
node_type |
Node type (primary_events,
qualified_events, etc.) |
table_name |
Fully qualified table name of the materialized result |
created_at |
When the node was first materialized |
last_used_at |
Last time the node was accessed (for GC) |
cohort_ids |
Comma-separated list of cohort IDs using this node |
The registry is created automatically the first time you use
cache = TRUE.
Stable Table Naming
When caching is enabled, all node tables use a fixed
prefix (dagcache_) instead of the random
atlas_<uuid>_ prefix used for ephemeral runs. This
means the same computation always maps to the same table name:
-
dagcache_pe_a1b2c3d4– primary events node with hash startinga1b2c3d4 -
dagcache_qe_e5f6a7b8– qualified events node with hash startinge5f6a7b8
This determinism is what allows cache lookups to work across separate sessions.
Cache-Aware Execution
When you run with cache = TRUE, the execution proceeds
as follows:
- Build the DAG from cohort definitions (same as non-cached mode)
- Compute content hashes for all nodes (Merkle-tree, bottom-up)
- Query the registry for each node hash
- Validate that cached tables still physically exist in the database
- Skip nodes that are valid cache hits
- Execute SQL for cache misses only
- Register newly computed nodes in the registry
- Clean up ephemeral tables (staging, domain-filtered), but preserve cached node tables
Usage
Basic Usage
# First run: all nodes computed from scratch
cdm <- generateCohortSet2(cdm, cohortSet, name = "my_cohorts", cache = TRUE)
#> DAG cache: 0 hits, 12 misses (12 nodes to compute)
# Modify one cohort's inclusion rule and re-run:
cdm <- generateCohortSet2(cdm, cohortSet_v2, name = "my_cohorts", cache = TRUE)
#> DAG cache: 8 hits, 4 misses (4 nodes to compute)On the second run, only the nodes affected by the change are recomputed. Shared upstream nodes (concept sets, primary events) that haven’t changed are reused from the cache.
SQL-Only Usage
You can also use caching at the SQL generation level:
con <- DBI::dbConnect(...)
result <- atlas_json_to_sql_batch(
json_inputs = cohortSet,
cdm_schema = "cdm",
results_schema = "results",
target_dialect = "duckdb",
cache = TRUE,
con = con,
resolved_schema = "results"
)
# result$sql -- SQL string (only computes cache misses)
# result$cache_hits -- character vector of skipped node hashes
# result$cache_misses -- character vector of nodes to compute
# result$dag -- the full DAG objectWhat Gets Cached vs. Not
Cached (persistent across runs)
These are the intermediate computation tables that are expensive to compute and whose results depend only on their content hash:
| Node Type | Table Pattern | Description |
|---|---|---|
primary_events |
dagcache_pe_<hash> |
Events matching primary criteria |
qualified_events |
dagcache_qe_<hash> |
Events after additional criteria + limit |
inclusion_rule |
dagcache_ir_<hash> |
Per-rule matching person/event pairs |
included_events |
dagcache_ie_<hash> |
Events surviving all inclusion rules |
cohort_exit |
dagcache_ce_<hash> |
Cohort end dates |
Not Cached (rebuilt every run)
These are either cheap to rebuild, specific to a single run, or inherently transient:
| Table | Why Not Cached |
|---|---|
dagcache_codesets |
Fast to rebuild; shared across all nodes |
dagcache_all_concepts |
Derived from codesets |
| Domain filtered tables | Depend on the full set of concepts in the current batch |
| Staging tables | Transient accumulation tables |
dagcache_fc_<hash> |
Final cohort inserts into staging; table itself is dropped |
Auxiliary tables (_ie, _se,
_cr) |
Intermediate join tables within a node |
Why Concept Sets Aren’t Individually Cached
Concept sets are handled via a single global
dagcache_codesets table that contains all unique concept
set expressions assigned global IDs. This table is rebuilt each run
because:
- It’s cheap (just vocabulary lookups)
- It must contain exactly the concept sets needed by the current batch
- Its structure (single table with all sets) doesn’t fit the one-table-per-node caching model
However, concept set hashes are still used in the Merkle tree – they affect downstream node hashes, ensuring correctness.
Cache Management
Inspecting the Cache
# List all cached entries
dag_cache_list(con, schema = "results")
# Get summary statistics
dag_cache_stats(con, schema = "results")
#> $total_entries
#> [1] 15
#>
#> $by_type
#> primary_events qualified_events inclusion_rule included_events
#> 4 4 3 2
#> cohort_exit
#> 2Garbage Collection
Over time, as cohort definitions evolve, old cached tables become orphaned – they’re no longer referenced by any current cohort definition. The garbage collector removes these:
# Remove entries not used in the last 30 days
dag_cache_gc(con, schema = "results", max_age_days = 30)
# Preview what would be removed (dry run)
dag_cache_gc(con, schema = "results", max_age_days = 7, dry_run = TRUE)
# Remove everything
dag_cache_gc(con, schema = "results", max_age_days = 0)The GC also detects orphaned entries – registry rows whose backing table has been dropped externally – and cleans those up regardless of age.
Clearing the Cache
To remove all cached tables and start fresh:
dag_cache_clear(con, schema = "results")
#> Cleared 15 cache entries.Correctness Guarantees
The caching system provides these guarantees:
Content-addressed: Two computations with identical inputs always produce the same hash. There are no false cache hits from stale data.
-
Merkle-tree propagation: Changing any upstream definition (concept set, criteria, observation window, etc.) produces a different hash for every downstream node. This is tested directly:
# Changing concept_id from 123 to 999 changes ALL downstream hashes dag_a <- build_execution_dag(cohort_a, ...) dag_b <- build_execution_dag(cohort_b, ...) # PE, QE, IE, CE, FC hashes all differ between dag_a and dag_b Physical validation: Before declaring a cache hit, the system verifies that the backing table still exists in the database. If someone drops a cached table externally, it will be recomputed on the next run.
Immutability: Cached tables are never modified after creation. The hash is a promise that the table contents match the definition.
No cross-contamination: The
final_cohortnode includescohort_idin its hash, so two cohorts with identical logic but different IDs produce separate final cohort nodes (which are not cached anyway – they insert into staging and are dropped).
When to Use Caching
Caching is most beneficial when:
- Iterating on cohort definitions: changing one inclusion rule in a set of 20 cohorts → only the affected nodes recompute
- Adding/removing cohorts from a batch: unchanged cohorts reuse all their cached intermediates
- Re-running after a failed partial execution: successfully computed nodes persist even after errors
- Working with large CDM databases: primary events and qualified events scans are expensive; caching avoids repeating them
Caching adds minimal overhead (registry lookups are fast) and can be
disabled at any time by omitting cache = TRUE.
Example: Incremental Update
# Day 1: Generate 3 cohorts
cohortSet_v1 <- data.frame(
cohort_definition_id = c(1, 2, 3),
cohort = c(json_diabetes, json_hypertension, json_ckd)
)
cdm <- generateCohortSet2(cdm, cohortSet_v1, "cohorts", cache = TRUE)
#> DAG cache: 0 hits, 18 misses (18 nodes to compute)
# Day 2: Replace the CKD definition, keep diabetes and hypertension
cohortSet_v2 <- data.frame(
cohort_definition_id = c(1, 2, 3),
cohort = c(json_diabetes, json_hypertension, json_ckd_v2)
)
cdm <- generateCohortSet2(cdm, cohortSet_v2, "cohorts", cache = TRUE)
#> DAG cache: 12 hits, 6 misses (6 nodes to compute)
# Diabetes and hypertension nodes reused; only CKD nodes recomputed
# Day 3: Add a 4th cohort
cohortSet_v3 <- data.frame(
cohort_definition_id = c(1, 2, 3, 4),
cohort = c(json_diabetes, json_hypertension, json_ckd_v2, json_stroke)
)
cdm <- generateCohortSet2(cdm, cohortSet_v3, "cohorts", cache = TRUE)
#> DAG cache: 18 hits, 6 misses (6 nodes to compute)
# All 3 existing cohorts reused; only stroke computed
# Inspect what's cached
dag_cache_stats(con, "results")
#> $total_entries
#> [1] 24
# Clean up old entries
dag_cache_gc(con, "results", max_age_days = 90)