legenddataflowscripts.workflow package

Submodules

legenddataflowscripts.workflow.execenv module

legenddataflowscripts.workflow.execenv._execenv2str(cmd_expr, cmd_env)
Return type:

str

legenddataflowscripts.workflow.execenv.apptainer_env_vars(cmdenv)
Return type:

list[str]

legenddataflowscripts.workflow.execenv.cmdexec(args)

Load the data production environment and execute a given command.

Prepends the container prefix (if any) to args.command, adds the virtualenv bin directory to PATH, and runs the resulting command with subprocess.run().

Parameters:

args (argparse.Namespace) – Parsed command-line arguments (provided by dataflow()). Must include config_file, system, and command.

legenddataflowscripts.workflow.execenv.dataflow()

dataflow’s CLI for installing and loading the software in the data production environment.

$ dataflow --help
$ dataflow install --help  # help section for a specific sub-command
legenddataflowscripts.workflow.execenv.execenv_prefix(config, as_string=True)

Return the software environment command prefix.

Builds the command-line prefix (e.g. apptainer run image.sif) and the associated environment variable mapping from the execenv section of config. Supported container runtimes:

  • Apptainer / Singularity - environment variables are passed via --env=KEY=VAL flags and the XDG runtime directory is bind-mounted if present.

  • OCI engines (Docker, Podman, podman-hpc, Shifter) - environment variables are passed via --env=KEY=VAL flags; the XDG runtime directory is volume-mounted for all engines except Shifter.

Parameters:
  • config (dbetto.AttrsDict) – Workflow configuration containing an optional execenv key with sub-keys cmd (container command), arg (container image/args), and env (extra environment variables).

  • as_string (bool) – When True (default) a single space-separated string with a trailing space is returned. When False a (cmdline, cmdenv) tuple is returned for programmatic use.

Returns:

str or (list, dict) – The command prefix as a string (with trailing space) or as a (cmdline_list, env_dict) tuple.

Return type:

str | tuple[list, dict]

Note

If as_string is True, a space is appended to the returned string.

legenddataflowscripts.workflow.execenv.execenv_pyexe(config, exename, as_string=True)

Return the full command to invoke a virtualenv executable inside the container.

Extends the container prefix from execenv_prefix() with the absolute path {config.paths.install}/bin/{exename}. Example result: apptainer run image.sif /opt/sw/bin/par-geds-dsp-pz

Parameters:
  • config (dbetto.AttrsDict) – Workflow configuration. Must have a paths.install key pointing to the root of the Python virtual environment.

  • exename (str) – Name of the executable inside the virtualenv bin directory (e.g. "par-geds-dsp-pz").

  • as_string (bool) – When True (default) a single space-separated string with a trailing space is returned. When False a (cmdline, cmdenv) tuple is returned.

Returns:

str or (list, dict) – The full command as a string (with trailing space) or as a (cmdline_list, env_dict) tuple.

Return type:

str | tuple[list, dict]

Note

If as_string is True, a space is appended to the returned string.

legenddataflowscripts.workflow.execenv.install(args)

Install user software in the data production environment.

Creates a Python virtual environment at config.paths.install (inside the container if one is configured), upgrades pip, installs uv, and then uses uv pip install to install the workflow root directory (the directory containing the config file) as the package source.

$ dataflow install config.yaml
$ dataflow install --editable config.yaml   # editable install
$ dataflow install --remove  config.yaml    # wipe venv before installing
Parameters:

args (argparse.Namespace) – Parsed command-line arguments (provided by dataflow()).

legenddataflowscripts.workflow.execenv.oci_engine_env_vars(cmdenv)
Return type:

list[str]

legenddataflowscripts.workflow.filedb module

legenddataflowscripts.workflow.pre_compile_catalog module

legenddataflowscripts.workflow.pre_compile_catalog.pre_compile_catalog(validity_path)

Pre-compile a dbetto validity catalog for fast repeated access.

Reads the validity.yaml catalog from validity_path and, for each system and each entry in the catalog, eagerly loads the corresponding dbetto.TextDB state (instead of loading it lazily on first access). The resulting dbetto.catalog.Catalog can be serialised and reused across many Snakemake jobs without re-parsing YAML on every invocation.

Parameters:

validity_path (str or pathlib.Path) – Directory containing the validity.yaml file and all referenced database files.

Returns:

dbetto.catalog.Catalog – Pre-compiled catalog with all entries eagerly resolved.

legenddataflowscripts.workflow.utils module

Utility helpers for the LEGEND dataflow Snakemake workflow.

This module provides functions for variable substitution in configuration objects, dynamic Snakemake rule renaming, and read-only filesystem path translation. Most of these utilities are re-exported from the top-level legenddataflowscripts package for convenience.

legenddataflowscripts.workflow.utils.as_ro(config, path)

Translate a path (or list of paths) to its read-only filesystem equivalent.

Some HPC sites expose the same data under both a read-write and a read-only mount point. When config["read_only_fs_sub_pattern"] is set to a two-element list [pattern, replacement] this function applies re.sub() to convert path to the read-only mount. If the key is absent or None the original path is returned unchanged.

Parameters:
  • config (dict) – Workflow configuration dict. Inspected for the optional key "read_only_fs_sub_pattern".

  • path (str, pathlib.Path, or list) – The path or collection of paths to translate.

Returns:

str, pathlib.Path, or list – Translated path(s). The return type mirrors the input type.

legenddataflowscripts.workflow.utils.set_last_rule_name(workflow, new_name)

Sets the name of the most recently created rule to be new_name. Useful when creating rules dynamically (i.e. unnamed).

Warning

This could mess up the workflow. Use at your own risk.

legenddataflowscripts.workflow.utils.subst_vars(props, var_values=None, use_env=False, ignore_missing=False)

Substitute $VAR placeholders in a configuration object.

Thin wrapper around subst_vars_impl() that optionally merges the current process environment into the substitution table before expansion. Environment variables take lower priority than explicit entries in var_values.

Parameters:
  • props (str, dict, list, or other) – Configuration object to expand in-place.

  • var_values (dict, optional) – Explicit variable-name → value mapping. Takes precedence over environment variables.

  • use_env (bool) – When True the current environment (os.environ) is merged into the substitution table. Defaults to False.

  • ignore_missing (bool) – Passed through to subst_vars_impl(). Defaults to False.

Returns:

str, dict, list, or otherprops with all recognisable $VAR placeholders expanded.

legenddataflowscripts.workflow.utils.subst_vars_impl(x, var_values, ignore_missing=False)

Recursively substitute $VAR placeholders in a nested structure.

Traverses x depth-first. Any string value containing $ is treated as a string.Template and expanded using var_values.

Parameters:
  • x (str, dict, list, or other) – The value to process. Strings are returned with substitutions applied; dicts and lists are traversed and modified in place. All other types are returned unchanged.

  • var_values (dict) – Mapping of variable names to substitution values.

  • ignore_missing (bool) – When True unknown placeholders are left as-is (uses string.Template.safe_substitute()). When False (default) an unknown placeholder raises KeyError.

Returns:

str, dict, list, or otherx with all $VAR placeholders replaced.

legenddataflowscripts.workflow.utils.subst_vars_in_snakemake_config(workflow, config)

Expand $VAR placeholders in a Snakemake workflow configuration dict.

Reads the path of the first Snakemake config file, sets $_ to its parent directory, and calls subst_vars() on config with environment variable expansion enabled. Afterwards the execenv key is resolved to the entry matching config["system"] (falling back to "bare").

This function is typically called at the top of a Snakefile:

from legenddataflowscripts.workflow import subst_vars_in_snakemake_config

subst_vars_in_snakemake_config(workflow, config)
Parameters:
  • workflow (snakemake.workflow.Workflow) – Active Snakemake workflow object (provides access to config file paths).

  • config (dict) – Snakemake configuration dictionary to expand in-place.

Raises:

RuntimeError – If no config file has been passed to Snakemake (workflow.overwrite_configfiles is empty).