Run a function over a mask with mandatory logging and reproducibility
Source:R/genproc.R
genproc.RdThis is the central function of the genproc package. It takes a function and an iteration mask (data.frame), calls the function once per row of the mask, and returns a structured result with:
a log data.frame (one row per case, with success/error/traceback/timing)
reproducibility information (R version, packages, environment, parallel spec)
the exact mask used
stable case IDs linking log rows to mask rows
Usage
genproc(
f,
mask,
f_mapping = NULL,
parallel = NULL,
nonblocking = NULL,
track_inputs = TRUE,
input_cols = NULL,
skip_input_cols = NULL
)Arguments
- f
A function to apply to each row of the mask. Each formal of
fshould correspond to a column inmask(or have a default value). Can be produced byfrom_example_to_function()or written by hand.- mask
A data.frame where each row is an iteration case and each column is a parameter value. Can be produced by
from_function_to_mask()and expanded by the user.- f_mapping
Optional named character vector to rename
f's parameters before execution. Passed torename_function_params(). Names are current parameter names, values are new names matchingmaskcolumns.- parallel
NULL(default, sequential execution) or agenproc_parallel_specobject produced byparallel_spec(). When supplied, cases are dispatched to workers viafuture.apply::future_lapply().- nonblocking
NULL(default, synchronous call) or agenproc_nonblocking_specobject produced bynonblocking_spec(). When supplied,genproc()returns immediately with agenproc_resultof status"running", and the run continues in a background future. Usestatus()to poll the state andawait()to block until resolution. Can be combined withparallel— the non-blocking wrapper envelops the parallel dispatch.- track_inputs
Logical. When
TRUE(default), genproc detects columns ofmaskthat reference input files and records their size + mtime inresult$reproducibility$inputs. Usediff_inputs()to compare two runs and detect silent input drift. Set toFALSEto skip input tracking entirely.- input_cols
NULL(default) or a character vector of mask column names. When supplied, the heuristic detection is bypassed and exactly these columns are tracked. Paths that do not exist at capture time are recorded withNAsize/mtime and a warning is emitted. Mutually exclusive withskip_input_cols.- skip_input_cols
NULL(default) or a character vector of mask column names to exclude from heuristic detection. Useful when a label column happens to match an existing file. Mutually exclusive withinput_cols.
Value
An object of class genproc_result (a named list) with
components:
- log
A data.frame with one row per case. Contains all parameter values, plus
case_id,success,error_message,traceback, andduration_secs.- reproducibility
A list of environment information captured at run start (R version, packages, OS, locale, timezone, mask snapshot, parallel and non-blocking specs if any, and
inputs— a stat-based fingerprint of every input file referenced by the mask). Seecapture_reproducibility().- n_success
Integer, number of successful cases.
- n_error
Integer, number of failed cases.
- duration_total_secs
Numeric, total wall-clock time for the entire run.
- status
Character.
"done"for a synchronous run that has completed;"running"for a non-blocking run whose future has not resolved yet;"done (not collected)"for a non-blocking run whose future has resolved but whose result has not been collected viaawait()yet;"error"when the background run errored out.
The genproc_result class is designed for forward compatibility.
Existing fields (log, reproducibility, n_success, n_error,
duration_total_secs) are guaranteed stable. Future versions may
add new fields (e.g. worker_id in the log for parallel runs,
or collect()/poll() methods for non-blocking execution) but
will never remove or rename existing ones.
Details
The logged and reproducibility layers are always active and
cannot be disabled. Three optional layers compose on top: parallel
(pass a parallel_spec() to parallel), non-blocking (pass a
nonblocking_spec() to nonblocking), and progress monitoring
(wrap the call in progressr::with_progress()).
Execution model
Cases are executed sequentially in row order by default. Supply
parallel = parallel_spec(...) to dispatch them in parallel via
the future ecosystem. The logging and reproducibility layers
remain active in both modes; the parallel layer is strictly
additive.
Parallel execution preserves the mask row order in the resulting
log data.frame, regardless of the order in which workers return.
Parallel execution requires genproc to be installed (not only
loaded via devtools::load_all()) on each worker, because the
logging layer serializes closures whose environments reference
genproc internals. The only exception is
parallel_spec(strategy = "sequential"), which runs in the
current process and needs nothing extra — this is the recommended
mode for deterministic testing.
Error handling
Errors in individual cases do not stop the run. Each case is
wrapped with add_trycatch_logrow(), which captures the error
message and the real traceback (via withCallingHandlers). The
run continues with the next case. This holds identically in
sequential and parallel mode.
Progress monitoring
genproc() emits one progressr signal per completed case in
sequential and parallel modes. The signals are no-op unless the
calling code is wrapped in progressr::with_progress(...), in
which case the user sees a progress bar (or any other handler
chosen via progressr::handlers()):
library(progressr)
with_progress(
result <- genproc(my_fn, mask, parallel = parallel_spec(workers = 4))
)Without with_progress(), there is zero overhead and zero visible
change: the integration is a hook, not a default behaviour.
progressr is declared in Suggests; the integration is
conditional on its installation. The non-blocking path does not
yet emit signals — live monitoring of background runs is on the
roadmap.
Composing parallel and non-blocking
When both parallel and nonblocking are supplied, the
non-blocking wrapper envelops the parallel dispatch (one outer
future submits the run, inner workers process the cases). On
platforms where the wrapper subprocess R inherits a restrictive
default for getOption("mc.cores") (typically 1 on Windows and in
some RStudio configurations), parallelly would otherwise refuse
to spawn the inner workers. genproc() works around this with
two surgical adjustments inside the wrapper subprocess, applied
only in the composed case and only when the user has not set
their own values:
Set
R_PARALLELLY_AVAILABLECORES_METHODS = "system"so thatavailableCores()ignores the legacymc.coresoption and reports the true detected core count (lifts the hard-limit refusal).Raise
options(mc.cores)from 1 to the system core count, so thatparallelly's soft-limit warning no longer fires with a misleading "only 1 CPU cores available" message.
The calling session is never modified by either adjustment.
Case IDs
Each row of the mask receives a case_id (currently index-based:
case_0001, case_0002, ...). This ID appears in the log and
can be used for replay, monitoring, and cross-referencing.
Parameter matching
The mask does not need to contain a column for every parameter of
f. Parameters not present in the mask will use their default
values. However, parameters without defaults that are also missing
from the mask will cause an error before execution starts.
Extra columns in the mask (not matching any parameter) are silently ignored.
See also
Optional execution layers: parallel_spec(), nonblocking_spec(),
status(), await().
Inspecting a result: errors(), summary.genproc_result(),
rerun_failed().
Reproducibility tooling: diff_inputs(), rerun_affected().
Examples
# Sequential run (the default). Returns immediately when done.
result <- genproc(
f = function(x, y) x + y,
mask = data.frame(x = c(1, 2, 3), y = c(10, 20, 30))
)
result$log
#> case_id x y success error_message traceback duration_secs
#> 1 case_0001 1 10 TRUE <NA> <NA> 0
#> 2 case_0002 2 20 TRUE <NA> <NA> 0
#> 3 case_0003 3 30 TRUE <NA> <NA> 0
# One-off parallel call: genproc installs a temporary multisession
# plan and restores the previous one on exit. Capped at 2 workers
# to comply with the CRAN policy on parallelism in examples.
# \donttest{
result <- genproc(
f = function(x) x * 2,
mask = data.frame(x = 1:4),
parallel = parallel_spec(workers = 2)
)
result$log
#> case_id x success error_message traceback duration_secs
#> 1 case_0001 1 TRUE <NA> <NA> 0
#> 2 case_0002 2 TRUE <NA> <NA> 0
#> 3 case_0003 3 TRUE <NA> <NA> 0
#> 4 case_0004 4 TRUE <NA> <NA> 0
# }
# Non-blocking + parallel composed: launch in the background,
# keep the console, collect later with await().
# \donttest{
job <- genproc(
f = function(x) x * 2,
mask = data.frame(x = 1:4),
parallel = parallel_spec(workers = 2),
nonblocking = nonblocking_spec()
)
status(job) # "running" until the future resolves
#> [1] "running"
job <- await(job) # blocks; idempotent on already-resolved jobs
job$log
#> case_id x success error_message traceback duration_secs
#> 1 case_0001 1 TRUE <NA> <NA> 0
#> 2 case_0002 2 TRUE <NA> <NA> 0
#> 3 case_0003 3 TRUE <NA> <NA> 0
#> 4 case_0004 4 TRUE <NA> <NA> 0
# }