The README introduces the five layers on a minimal
example. This vignette walks through the pieces a user needs once they
start depending on genproc for real work: the shape of the
result, how errors are reported, how the optional layers compose, and
what the current edges are.
A small working example
The same synthetic file-conversion task as in the README — one row
per file, convert() is the per-case function.
src_dir <- file.path(tempdir(), "genproc-vignette-src")
dst_dir <- file.path(tempdir(), "genproc-vignette-dst")
dir.create(src_dir, showWarnings = FALSE, recursive = TRUE)
dir.create(dst_dir, showWarnings = FALSE, recursive = TRUE)
write.csv(head(iris), file.path(src_dir, "a.csv"), row.names = FALSE)
write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE)
write.csv(head(airquality), file.path(src_dir, "c.csv"), row.names = FALSE)
convert <- function(src_dir, src_file, dst_dir, dst_file) {
df <- read.csv(file.path(src_dir, src_file))
saveRDS(df, file.path(dst_dir, dst_file))
}
mask <- data.frame(
src_dir = src_dir,
src_file = c("a.csv", "b.csv", "c.csv"),
dst_dir = dst_dir,
dst_file = c("a.rds", "b.rds", "c.rds"),
stringsAsFactors = FALSE
)
result <- genproc(convert, mask)Anatomy of genproc_result
result is an S3 list with a stable, documented set of
fields:
class(result)
#> [1] "genproc_result"
names(result)
#> [1] "log" "reproducibility" "n_success"
#> [4] "n_error" "duration_total_secs" "status"-
log: one row per case. Columns, in order:case_id, the mask parameter values (in the order they appear in the mask), thensuccess,error_message,traceback,duration_secs. -
reproducibility: the environment snapshot captured at run start (see below). -
n_success,n_error: summary counts. -
duration_total_secs: total wall-clock time. -
status:"done"for a completed synchronous run; for a non-blocking run, one of"running"(future not resolved yet),"done (not collected)"(future resolved but result not yet collected viaawait()),"done"(collected), or"error".
These fields are guaranteed stable across minor versions; new fields
may be added (e.g. worker_id for parallel runs), but
existing ones will never be removed or renamed.
The log
Column order is designed for a human scanning a run:
result$log
#> case_id src_dir src_file
#> 1 case_0001 /tmp/RtmppDgPLX/genproc-vignette-src a.csv
#> 2 case_0002 /tmp/RtmppDgPLX/genproc-vignette-src b.csv
#> 3 case_0003 /tmp/RtmppDgPLX/genproc-vignette-src c.csv
#> dst_dir dst_file success error_message traceback
#> 1 /tmp/RtmppDgPLX/genproc-vignette-dst a.rds TRUE <NA> <NA>
#> 2 /tmp/RtmppDgPLX/genproc-vignette-dst b.rds TRUE <NA> <NA>
#> 3 /tmp/RtmppDgPLX/genproc-vignette-dst c.rds TRUE <NA> <NA>
#> duration_secs
#> 1 0.000
#> 2 0.001
#> 3 0.001case_id is stable and index-based
(case_0001, case_0002, …) for now. A
content-based variant is on the roadmap, for use cases where rows of the
mask can be reordered between runs.
The reproducibility snapshot
str(result$reproducibility, max.level = 1)
#> List of 11
#> $ timestamp : POSIXct[1:1], format: "2026-05-21 17:22:04"
#> $ r_version : chr "R version 4.6.0 (2026-04-24)"
#> $ platform : chr "x86_64-pc-linux-gnu"
#> $ os : chr "Linux 6.17.0-1013-azure"
#> $ locale : chr "LC_CTYPE=C.UTF-8;LC_NUMERIC=C;LC_TIME=C.UTF-8;LC_COLLATE=C.UTF-8;LC_MONETARY=C.UTF-8;LC_MESSAGES=C;LC_PAPER=C.U"| __truncated__
#> $ timezone : chr "UTC"
#> $ packages : Named chr [1:33] "0.2.0" "0.6.39" "1.4.3" "2.6.1" ...
#> ..- attr(*, "names")= chr [1:33] "genproc" "digest" "desc" "R6" ...
#> $ mask_snapshot:'data.frame': 3 obs. of 4 variables:
#> $ parallel : NULL
#> $ nonblocking : NULL
#> $ inputs :List of 3-
timestamp:POSIXct, start time of the run. -
r_version,platform,os,locale,timezone: system info. -
packages: named character vector of package -> version, for every package attached or loaded via namespace at run start. -
mask_snapshot: the exact mask used (not a copy of a reference — the value). -
parallel:NULLfor a sequential run, or a plain list mirroring theparallel_spec()used. Dropped class to make the snapshot portable to serialization formats. The list also carrieseffective_strategy: the strategy actually applied bygenproc(), which differs fromstrategywhen the user passedworkerswithout an explicitstrategyandgenproc()auto-defaulted to"multisession". Both fields are recorded so the snapshot is self-explanatory:strategyis what the user asked for,effective_strategyis what was applied. -
nonblocking: same pattern, fornonblocking_spec(). -
inputs: a fingerprint of every file the mask refers to, orNULLif input tracking was disabled (track_inputs = FALSE). See the next section.
The snapshot lives inside the result. You can compare two results by
comparing their $reproducibility slots directly.
Input fingerprinting
reproducibility$inputs is the layer that protects
against silent drift of upstream files. It is captured at t0 of the run,
alongside the rest of the snapshot.
str(result$reproducibility$inputs, max.level = 1)
#> List of 3
#> $ method: chr "stat"
#> $ files :'data.frame': 0 obs. of 3 variables:
#> $ refs :'data.frame': 0 obs. of 3 variables:-
method: currently"stat". Reserved for future extensions (e.g."md5"for content hashing). -
files: a deduplicated table of every file referenced by the mask, with its size in bytes and last-modified time. One row per unique path: a config file shared across 100 cases produces a single row. -
refs: the (case_id,column,path) triples saying who referenced what. Joins back tofilesbypath.
Heuristic detection
By default, every character column of the mask whose non-NA values
are existing files (and contain a path separator) is treated as an input
column. The mask used in this vignette has its paths split
across src_dir and src_file, so the heuristic
finds nothing useful — src_dir is a directory (excluded),
src_file values are bare names (no separator). For a mask
that holds absolute paths directly:
mask_paths <- data.frame(
csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
stringsAsFactors = FALSE
)
do_one <- function(csv_in) nrow(read.csv(csv_in))
run0 <- genproc(do_one, mask_paths)
run0$reproducibility$inputs$files
#> path size mtime
#> 1 /tmp/RtmppDgPLX/genproc-vignette-src/a.csv 214 2026-05-21 17:22:04
#> 2 /tmp/RtmppDgPLX/genproc-vignette-src/b.csv 296 2026-05-21 17:22:04
#> 3 /tmp/RtmppDgPLX/genproc-vignette-src/c.csv 154 2026-05-21 17:22:04Shared inputs are deduplicated
Many cases referencing the same upstream file
produce a single row in files but one row per case in
refs. This keeps the snapshot economical for masks where
every case shares a configuration, schema, or lookup table.
config_path <- file.path(src_dir, "config.yml")
writeLines("threshold: 10", config_path)
mask_with_config <- data.frame(
csv_in = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
config = config_path, # same value across rows
stringsAsFactors = FALSE
)
do_one_cfg <- function(csv_in, config) nrow(read.csv(csv_in))
run_shared <- genproc(do_one_cfg, mask_with_config)
# 4 rows: 3 unique csv_in + 1 config
nrow(run_shared$reproducibility$inputs$files)
#> [1] 4
# 6 rows: 3 cases x 2 input columns
nrow(run_shared$reproducibility$inputs$refs)
#> [1] 6Overrides
-
genproc(..., input_cols = c("col1", "col2"))bypasses the heuristic and tracks exactly the named columns. Paths that don’t exist at capture time are recorded withNAsize/mtime and a warning is emitted. -
genproc(..., skip_input_cols = c("col"))keeps the heuristic but excludes a column (useful when a label column happens to match an existing file in cwd). -
genproc(..., track_inputs = FALSE)disables tracking entirely.result$reproducibility$inputsisNULL.
input_cols and skip_input_cols are mutually
exclusive. Mixing them raises an error — the two flags express
contradictory intentions and the call should clarify.
Comparing runs with diff_inputs()
# Rewrite a.csv with strictly more content (size changes)
write.csv(iris, file.path(src_dir, "a.csv"), row.names = FALSE)
run1 <- genproc(do_one, mask_paths)
diff_inputs(run0, run1)
#> genproc input diff (method: stat)
#> Changed: 1
#> Unchanged: 2
#> Removed: 0
#> Added: 0
#> Cases affected: 1
#>
#> Changed files:
#> /tmp/RtmppDgPLX/genproc-vignette-src/a.csv
#> size: 214 B -> 3.9 KB
#> mtime: 2026-05-21 17:22:04 -> 2026-05-21 17:22:05
#>
#> Cases affected (use rerun_affected() to re-run):
#> case_0001diff_inputs() returns an S3 object
(genproc_input_diff) with a print method for human reading
and named list components for programmatic access
($changed, $unchanged, $removed,
$added). Files are matched by canonical absolute path;
cross-machine comparison would need a separate matcher and is out of
scope for this version.
diff_inputs() refuses to compare snapshots produced with
different methods (forward-compatible with a future hash
mode).
How errors are reported
A case that throws does not stop the run. Here we delete a source file between two runs:
file.remove(file.path(src_dir, "b.csv"))
#> [1] TRUE
result_broken <- genproc(convert, mask)
#> Warning in file(file, "rt"): cannot open file
#> '/tmp/RtmppDgPLX/genproc-vignette-src/b.csv': No such file or directory
result_broken$n_success
#> [1] 2
result_broken$n_error
#> [1] 1The failing row carries the error message and a filtered traceback.
The errors() helper subsets log on
success == FALSE:
bad <- errors(result_broken)
bad$error_message
#> [1] "cannot open the connection"
cat(bad$traceback[1], "\n")
#> 1. read.csv(file.path(src_dir, src_file))
#> 2. read.table(file = file, header = header, sep = sep, quote = quote, dec = dec, fill = fill, comment.char = comment.ch ...
#> 3. file(file, "rt")The traceback is captured via withCallingHandlers() at
the moment the error is thrown — it is the real R call stack, not a
string pulled out of conditionMessage(). The internal
tryCatch() and withCallingHandlers() frames
are filtered out so the trace reads like a normal R error.
Restore the file for subsequent sections:
Inspecting and re-running a result
Three helpers digest a genproc_result without touching
result$log directly.
errors(result) returns the subset of
failed cases as a data.frame:
errors(result_broken)summary(result) dispatches to
summary.genproc_result() and returns a printable digest
with the run status, success rate, duration statistics (mean / max /
slowest case_id) and the top recurring error messages:
summary(result_broken)
#> genproc result summary
#> Status : done
#> Cases : 3 (2 ok, 1 error)
#> Success : 67%
#> Total time : 0.02s
#> Per case : mean 0.001s, max 0.002s (slowest: case_0002)
#>
#> Top errors:
#> 1x cannot open the connectionThe summary object is a list — the printed view is rendered by
print.genproc_result_summary() for human reading; the
underlying fields (status, success_rate,
duration_stats, top_errors) are
programmatically accessible too.
Two more helpers close the loop by re-running a targeted
subset. Their case_ids are local to the subset
(re-numbered starting at case_0001); the link back to the
original run is via the matching rows of
r0$reproducibility$mask_snapshot.
rerun_failed(r0, f) re-runs only the
cases that failed in r0 — useful after fixing the function
or the input:
# Re-run with a hardened f that handles the missing file gracefully.
result_fixed <- rerun_failed(
result_broken,
f = function(src_dir, src_file, dst_dir, dst_file) {
in_path <- file.path(src_dir, src_file)
if (!file.exists(in_path)) return(NA)
df <- read.csv(in_path)
saveRDS(df, file.path(dst_dir, dst_file))
}
)rerun_affected(r0, diff, f) re-runs
only the cases referenced by a diff_inputs() result — the
natural action after detecting that some upstream input files have
drifted:
d <- diff_inputs(run0, run1)
refreshed <- rerun_affected(run0, d, f = do_one)This closes the reproducibility loop: detect drift → re-run only the cases whose inputs changed, not the whole mask.
Building blocks: extracting f and the mask from a
working example
The vignette so far assumed both f and mask
were already written by hand. In practice you often start from a
working script for one specific case and want to derive the
parameterized function and the mask template automatically. Three
exported helpers do this, in order.
1. from_example_to_function() — example expression to
function
Take an example expression that works on one case. Every external value (string literals, environment symbols that are not functions) becomes a parameter of the resulting function, with its current value stored as the default. Locally bound symbols (assignment targets, function formals) are protected.
# An example that works for ONE specific case
input_path <- file.path(src_dir, "a.csv")
output_path <- file.path(dst_dir, "a-from-example.rds")
example <- expression({
df <- read.csv(input_path)
saveRDS(df, output_path)
})
fn <- from_example_to_function(example)
formals(fn)
#> $param_1
#> [1] "/tmp/RtmppDgPLX/genproc-vignette-src/a.csv"
#>
#> $param_2
#> [1] "/tmp/RtmppDgPLX/genproc-vignette-dst/a-from-example.rds"2. from_function_to_mask() — function signature to mask
template
Once you have the function, derive a one-row template
data.frame that mirrors its signature. You can then
rbind() extra rows to build a full mask.
mask_template <- from_function_to_mask(fn)
mask_template
#> param_1
#> 1 /tmp/RtmppDgPLX/genproc-vignette-src/a.csv
#> param_2
#> 1 /tmp/RtmppDgPLX/genproc-vignette-dst/a-from-example.rds3. rename_function_params() — give the parameters
domain names
The auto-generated names (param_1, param_2,
…) are stable but not informative. Rename them in place —
formals and body are updated together, the function source
is not edited.
fn_named <- rename_function_params(
fn, c(param_1 = "input_path", param_2 = "output_path")
)
formals(fn_named)
#> $input_path
#> [1] "/tmp/RtmppDgPLX/genproc-vignette-src/a.csv"
#>
#> $output_path
#> [1] "/tmp/RtmppDgPLX/genproc-vignette-dst/a-from-example.rds"Putting it together: a renamed function plus a manually-built mask that follows the same column names.
mask_built <- data.frame(
input_path = file.path(src_dir, c("a.csv", "b.csv", "c.csv")),
output_path = file.path(dst_dir, c("a2.rds", "b2.rds", "c2.rds")),
stringsAsFactors = FALSE
)
genproc(fn_named, mask_built)$n_success
#> [1] 3The f_mapping argument shown in the next section is the
inline equivalent of step 3 — convenient when you don’t need a renamed
function for any other purpose than this single genproc()
call.
Parameter renaming with f_mapping
If the function you already have uses parameter names that don’t
match your mask’s column names, f_mapping renames them in
place without touching the source:
# `f` uses generic names; the mask uses domain names.
f <- function(input_dir, input_file, output_dir, output_file) {
df <- read.csv(file.path(input_dir, input_file))
saveRDS(df, file.path(output_dir, output_file))
}
genproc(
f = f,
mask = mask,
f_mapping = c(
"input_dir" = "src_dir",
"input_file" = "src_file",
"output_dir" = "dst_dir",
"output_file" = "dst_file"
)
)Parallel execution in depth
parallel_spec() records intent only; the actual workers
are started lazily by future when the plan is resolved.
Power-user pattern: manage the plan yourself
Across many genproc() calls in the same session,
installing the plan once amortizes worker startup:
future::plan(future::multisession, workers = 6)
result_1 <- genproc(convert, mask, parallel = parallel_spec())
result_2 <- genproc(convert, mask, parallel = parallel_spec())
# reuses the same workersOne-off pattern: let genproc install the plan
When workers is passed without strategy,
genproc() auto-defaults to multisession for
that single call and restores the previous plan on exit. This avoids the
silent trap of workers = N being ignored because the
current plan is sequential:
genproc(convert, mask, parallel = parallel_spec(workers = 4))Non-blocking execution in depth
job <- genproc(convert, mask, nonblocking = nonblocking_spec())
status(job) # "running", "done (not collected)", "done", or "error"
job <- await(job) # blocks until resolution
job$logWhat’s in the skeleton
Before await():
-
log,n_success,n_error,duration_total_secsareNULL. -
reproducibilityis already populated — the snapshot is captured synchronously, before the future is submitted, so it reflects t0. -
statusis"running". - The future itself is stored in
attr(x, "future").
After await():
- All fields populated as in a synchronous run.
-
attr(x, "future")is removed. -
statusis"done"— or"error"if the wrapper future itself crashed (a rare case: per-case errors are caught by the logging layer and don’t propagate to the wrapper).
Idempotence
await() is idempotent. Calling it on an object that has
already been materialized (or was synchronous to begin with) returns it
unchanged. This makes it safe to pepper in user code without tracking
whether a particular result has already been collected.
Default strategy: why "multisession"
Unlike parallel_spec(), nonblocking_spec()
defaults to strategy = "multisession", not
NULL. Rationale: a function named “non-blocking” must not
silently block because the current future::plan() is
sequential (the default in a fresh R session). Power-users
who manage their plan can pass strategy = NULL explicitly
to defer.
Plan lifetime
When nonblocking_spec() installs a plan, the previous
plan is not restored on genproc() exit —
it is restored by await() once the future has been
collected. Restoring earlier would call future::plan()
while the wrapper future is still running, which shuts down the
multisession workers and surfaces a “Future was canceled” error at
collection time. The trade-off is that if you never call
await(), the installed plan stays active for the rest of
the session. Power-users who pass strategy = NULL and
manage the plan themselves are not affected.
Composition: parallel × non-blocking
The two optional layers are orthogonal:
job <- genproc(
convert, mask,
parallel = parallel_spec(workers = 6),
nonblocking = nonblocking_spec()
)
# get control back immediately
# ... do other work ...
job <- await(job)The non-blocking layer launches one outer future. Inside it, the
parallel layer dispatches cases via future.apply. Note:
with both set to multisession, future.apply
detects it is already inside a future and degrades the inner layer to
sequential by default, to avoid worker explosion. For true
nested parallelism, install future::plan(list(...))
explicitly and pass strategy = NULL on both specs.
Default mc.cores in the wrapper subprocess
On Windows and in some RStudio configurations, the wrapper subprocess
inherits getOption("mc.cores") set to 1 (the
legacy default for parallel::mclapply(), which is a no-op
on Windows). Without intervention, parallelly would refuse
to spawn the inner workers because workers / 1 exceeds the
localhost hard limit, and the composed call would fail with a confusing
"only 1 CPU cores available for this R process (per 'mc.cores')"
error.
genproc() handles this transparently. In the composed
case (parallel != NULL && nonblocking != NULL), it
makes two adjustments inside the wrapper subprocess:
- Sets
R_PARALLELLY_AVAILABLECORES_METHODS = "system"so thatparallellyignoresmc.coresand uses the true detected core count for its hard-limit check. - Raises
options(mc.cores)from 1 to the system core count, so thatparallelly’s soft-limit warning (“only 1 CPU cores available… 200% load”) does not fire with a misleading message after the hard limit has been lifted.
Both adjustments only apply if the user has not set their own values, and only inside the wrapper subprocess. The calling session is never modified.
Progress monitoring
genproc() integrates with the progressr
framework. Wrap the call in progressr::with_progress(...)
to opt in:
library(progressr)
with_progress(
result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4))
)Behind the scenes, genproc() emits one progression
condition per completed case. progressr lets the user pick
any handler: the default text bar in the console, an RStudio gadget,
audible beeps, custom log lines, or any handler the user wires up via
progressr::handlers().
Without with_progress(), the integration is a complete
no-op — zero overhead, zero visible change. progressr is a
soft dependency declared in Suggests; the integration is
skipped if the package is not installed.
In parallel mode, signals from worker subprocesses are propagated
back to the parent session via future.apply. Live
monitoring of non-blocking runs is not yet supported (signals would
arrive in a burst at await() time rather than live during
the run); this is on the roadmap.
Current edges and roadmap
Not yet in the package, but explicitly planned:
-
Content-hash input fingerprinting: the current
inputslayer uses a stat-based fingerprint (size + mtime), which detects every legitimate file modification but can be fooled by an adversary who preserves both. Amethod = "md5"(or"xxhash64") opt-in is reserved in the API and will land later. -
Content-based
case_id: today case IDs are index-based. A content-based variant will make replay stable even if mask rows are reordered. -
Error replay:
replay(result, case_id)to rerun one failed case in isolation. -
Live monitoring of non-blocking runs: today the
progressrintegration covers the sequential and parallel paths only; the non-blocking path needs a different design (collect progress in the background, query on demand) and is planned. -
cancel()for non-blocking: backend-dependent, deferred.
The architecture is designed so that adding these layers does not
require changes to existing user code — new layers are composed as extra
arguments to genproc(), and extra fields on
genproc_result accumulate without removing existing
ones.