The README introduces the four layers on a minimal
example. This vignette walks through the pieces a user needs once they
start depending on genproc for real work: the shape of the
result, how errors are reported, how the optional layers compose, and
what the current edges are.
A small working example
The same synthetic file-conversion task as in the README — one row
per file, convert() is the per-case function.
src_dir <- file.path(tempdir(), "genproc-vignette-src")
dst_dir <- file.path(tempdir(), "genproc-vignette-dst")
dir.create(src_dir, showWarnings = FALSE, recursive = TRUE)
dir.create(dst_dir, showWarnings = FALSE, recursive = TRUE)
write.csv(head(iris), file.path(src_dir, "a.csv"), row.names = FALSE)
write.csv(head(mtcars), file.path(src_dir, "b.csv"), row.names = FALSE)
write.csv(head(airquality), file.path(src_dir, "c.csv"), row.names = FALSE)
convert <- function(src_dir, src_file, dst_dir, dst_file) {
df <- read.csv(file.path(src_dir, src_file))
saveRDS(df, file.path(dst_dir, dst_file))
}
mask <- data.frame(
src_dir = src_dir,
src_file = c("a.csv", "b.csv", "c.csv"),
dst_dir = dst_dir,
dst_file = c("a.rds", "b.rds", "c.rds"),
stringsAsFactors = FALSE
)
result <- genproc(convert, mask)Anatomy of genproc_result
result is an S3 list with a stable, documented set of
fields:
class(result)
#> [1] "genproc_result"
names(result)
#> [1] "log" "reproducibility" "n_success"
#> [4] "n_error" "duration_total_secs" "status"-
log: one row per case. Columns, in order:case_id, the mask parameter values (in the order they appear in the mask), thensuccess,error_message,traceback,duration_secs. -
reproducibility: the environment snapshot captured at run start (see below). -
n_success,n_error: summary counts. -
duration_total_secs: total wall-clock time. -
status:"done"for a synchronous run,"running"or"error"for a non-blocking run beforeawait().
These fields are guaranteed stable across minor versions; new fields
may be added (e.g. worker_id for parallel runs), but
existing ones will never be removed or renamed.
The log
Column order is designed for a human scanning a run:
result$log
#> case_id src_dir src_file
#> 1 case_0001 /tmp/Rtmpcwkj3g/genproc-vignette-src a.csv
#> 2 case_0002 /tmp/Rtmpcwkj3g/genproc-vignette-src b.csv
#> 3 case_0003 /tmp/Rtmpcwkj3g/genproc-vignette-src c.csv
#> dst_dir dst_file success error_message traceback
#> 1 /tmp/Rtmpcwkj3g/genproc-vignette-dst a.rds TRUE <NA> <NA>
#> 2 /tmp/Rtmpcwkj3g/genproc-vignette-dst b.rds TRUE <NA> <NA>
#> 3 /tmp/Rtmpcwkj3g/genproc-vignette-dst c.rds TRUE <NA> <NA>
#> duration_secs
#> 1 0.001
#> 2 0.001
#> 3 0.001case_id is stable and index-based
(case_0001, case_0002, …) for now. A
content-based variant is on the roadmap, for use cases where rows of the
mask can be reordered between runs.
The reproducibility snapshot
str(result$reproducibility, max.level = 1)
#> List of 10
#> $ timestamp : POSIXct[1:1], format: "2026-04-19 19:25:41"
#> $ r_version : chr "R version 4.5.3 (2026-03-11)"
#> $ platform : chr "x86_64-pc-linux-gnu"
#> $ os : chr "Linux 6.17.0-1010-azure"
#> $ locale : chr "LC_CTYPE=C.UTF-8;LC_NUMERIC=C;LC_TIME=C.UTF-8;LC_COLLATE=C.UTF-8;LC_MONETARY=C.UTF-8;LC_MESSAGES=C.UTF-8;LC_PAP"| __truncated__
#> $ timezone : chr "UTC"
#> $ packages : Named chr [1:33] "0.0.0.9000" "0.6.39" "1.4.3" "2.6.1" ...
#> ..- attr(*, "names")= chr [1:33] "genproc" "digest" "desc" "R6" ...
#> $ mask_snapshot:'data.frame': 3 obs. of 4 variables:
#> $ parallel : NULL
#> $ nonblocking : NULL-
timestamp:POSIXct, start time of the run. -
r_version,platform,os,locale,timezone: system info. -
packages: named character vector of package -> version, for every package attached or loaded via namespace at run start. -
mask_snapshot: the exact mask used (not a copy of a reference — the value). -
parallel:NULLfor a sequential run, or a plain list mirroring theparallel_spec()used. Dropped class to make the snapshot portable to serialization formats. -
nonblocking: same pattern, fornonblocking_spec().
The snapshot lives inside the result. You can compare two results by
comparing their $reproducibility slots directly.
How errors are reported
A case that throws does not stop the run. Here we delete a source file between two runs:
file.remove(file.path(src_dir, "b.csv"))
#> [1] TRUE
result_broken <- genproc(convert, mask)
#> Warning in file(file, "rt"): cannot open file
#> '/tmp/Rtmpcwkj3g/genproc-vignette-src/b.csv': No such file or directory
result_broken$n_success
#> [1] 2
result_broken$n_error
#> [1] 1The failing row carries the error message and a filtered traceback:
bad <- result_broken$log[!result_broken$log$success, ]
bad$error_message
#> [1] "cannot open the connection"
cat(bad$traceback[1], "\n")
#> 1. read.csv(file.path(src_dir, src_file))
#> 2. read.table(file = file, header = header, sep = sep, quote = quote, dec = dec, fill = fill, comment.char = comment.ch ...
#> 3. file(file, "rt")The traceback is captured via withCallingHandlers() at
the moment the error is thrown — it is the real R call stack, not a
string pulled out of conditionMessage(). The internal
tryCatch() and withCallingHandlers() frames
are filtered out so the trace reads like a normal R error.
Restore the file for subsequent sections:
Parameter renaming with f_mapping
If the function you already have uses parameter names that don’t
match your mask’s column names, f_mapping renames them in
place without touching the source:
# `f` uses generic names; the mask uses domain names.
f <- function(input_dir, input_file, output_dir, output_file) {
df <- read.csv(file.path(input_dir, input_file))
saveRDS(df, file.path(output_dir, output_file))
}
genproc(
f = f,
mask = mask,
f_mapping = c(
"input_dir" = "src_dir",
"input_file" = "src_file",
"output_dir" = "dst_dir",
"output_file" = "dst_file"
)
)Parallel execution in depth
parallel_spec() records intent only; the actual workers
are started lazily by future when the plan is resolved.
Power-user pattern: manage the plan yourself
Across many genproc() calls in the same session,
installing the plan once amortizes worker startup:
future::plan(future::multisession, workers = 6)
result_1 <- genproc(convert, mask, parallel = parallel_spec())
result_2 <- genproc(convert, mask, parallel = parallel_spec())
# reuses the same workersOne-off pattern: let genproc install the plan
When workers is passed without strategy,
genproc() auto-defaults to multisession for
that single call and restores the previous plan on exit. This avoids the
silent trap of workers = N being ignored because the
current plan is sequential:
genproc(convert, mask, parallel = parallel_spec(workers = 4))Non-blocking execution in depth
job <- genproc(convert, mask, nonblocking = nonblocking_spec())
status(job) # "running" or "done"
job <- await(job) # blocks until resolution
job$logWhat’s in the skeleton
Before await():
-
log,n_success,n_error,duration_total_secsareNULL. -
reproducibilityis already populated — the snapshot is captured synchronously, before the future is submitted, so it reflects t0. -
statusis"running". - The future itself is stored in
attr(x, "future").
After await():
- All fields populated as in a synchronous run.
-
attr(x, "future")is removed. -
statusis"done"— or"error"if the wrapper future itself crashed (a rare case: per-case errors are caught by the logging layer and don’t propagate to the wrapper).
Idempotence
await() is idempotent. Calling it on an object that has
already been materialized (or was synchronous to begin with) returns it
unchanged. This makes it safe to pepper in user code without tracking
whether a particular result has already been collected.
Default strategy: why "multisession"
Unlike parallel_spec(), nonblocking_spec()
defaults to strategy = "multisession", not
NULL. Rationale: a function named “non-blocking” must not
silently block because the current future::plan() is
sequential (the default in a fresh R session). Power-users
who manage their plan can pass strategy = NULL explicitly
to defer.
Plan lifetime
When nonblocking_spec() installs a plan, the previous
plan is not restored on genproc() exit —
it is restored by await() once the future has been
collected. Restoring earlier would call future::plan()
while the wrapper future is still running, which shuts down the
multisession workers and surfaces a “Future was canceled” error at
collection time. The trade-off is that if you never call
await(), the installed plan stays active for the rest of
the session. Power-users who pass strategy = NULL and
manage the plan themselves are not affected.
Composition: parallel × non-blocking
The two optional layers are orthogonal:
job <- genproc(
convert, mask,
parallel = parallel_spec(workers = 6),
nonblocking = nonblocking_spec()
)
# get control back immediately
# ... do other work ...
job <- await(job)The non-blocking layer launches one outer future. Inside it, the
parallel layer dispatches cases via future.apply. Note:
with both set to multisession, future.apply
detects it is already inside a future and degrades the inner layer to
sequential by default, to avoid worker explosion. For true
nested parallelism, install future::plan(list(...))
explicitly and pass strategy = NULL on both specs.
Current edges and roadmap
Not yet in the package, but explicitly planned:
-
Input file hashing: the
reproducibilitylayer records the mask, but does not yet hash file inputs referenced in the mask. When a file changes without its path changing, that drift is currently invisible — this will be flagged. -
Content-based
case_id: today case IDs are index-based. A content-based variant will make replay stable even if mask rows are reordered. -
Error replay:
replay(result, case_id)to rerun one failed case in isolation. - Monitored progress: opt-in progress reporting that survives non-blocking and parallel modes.
-
cancel()for non-blocking: backend-dependent, deferred.
The architecture is designed so that adding these layers does not
require changes to existing user code — new layers are composed as extra
arguments to genproc(), and extra fields on
genproc_result accumulate without removing existing
ones.