coco_pipe.dim_reduction.pipeline#

Checkpointed fit/eval pipeline for dimensionality-reduction runs.

This module contains the core execution primitives that any project using coco-pipe’s dim-reduction stack can share. Each function is intentionally side-effect-free aside from writing artifact files — the caller controls all paths and inventory updates.

Public API#

run_fit

Fit one reducer variant on one analysis unit, writing a checkpointed artifact directory.

run_eval

Run one post-hoc evaluation of a saved embedding, writing a checkpointed eval artifact directory.

build_auto_pooled_eval_spec

Build the automatic condition_separation eval spec used when pooling is active.

valid_n_components_for_container

Check whether n_components is feasible for a container’s matrix shape.

valid_component_sweep

Filter a list of component counts to feasible values.

prepare_eval_inputs

Align a DataContainer to saved fit ids and resolve eval labels/groups.

build_fit_request / build_eval_request

Construct request dictionaries that can be passed to run_fit and run_eval.

Functions#

supports_nested_components(method)

Whether method can synthesise its whole sweep from one max-n fit.

run_fit(fit_payload, container, out_path, output_root, ...)

Fit one reducer variant and return a fit-runs inventory record.

run_fit_group(requests, *[, errors])

Fit a group of requests sharing one analysis unit and reducer.

run_eval(fit_artifact, container, eval_spec, out_path, ...)

Run one post-hoc evaluation and return an eval-runs inventory record.

prepare_eval_inputs(container, fit_ids, eval_spec)

Align container observations to fit_ids and apply eval filters.

valid_n_components_for_container(container, n_components)

Return True if n_components is feasible for container's matrix.

valid_component_sweep(container, requested)

Filter requested to the component counts feasible for container.

build_auto_pooled_eval_spec(conditions, run_pooled)

Return a condition_separation eval spec, or None.

build_fit_request(*, container, scope, condition, ...)

Build a request dictionary suitable for passing to run_fit().

build_eval_request(*, fit_record, eval_spec, ...[, ...])

Build a request dictionary suitable for passing to run_eval().

Module Contents#

coco_pipe.dim_reduction.pipeline.supports_nested_components(method)#

Whether method can synthesise its whole sweep from one max-n fit.

Nested reducers (PCA family, SVD) decompose once at the largest n_components and slice the smaller sweep values out of that single fit; everything else (UMAP, t-SNE, PHATE, Isomap, ICA, …) must fit independently per dimension. Callers use this both to fit efficiently (run_fit_group()) and to decide the parallel grain: a non-nested reducer’s sweep is a set of independent fits that can run as separate tasks rather than one serial group. Cached because instantiating DimReduction only to read capabilities is wasteful to repeat per request.

Parameters:

method (str)

Return type:

bool

coco_pipe.dim_reduction.pipeline.run_fit(fit_payload, container, out_path, output_root, overwrite, *, errors='raise')#

Fit one reducer variant and return a fit-runs inventory record.

If _SUCCESS already exists in out_path and overwrite is False the existing artifact is loaded and its inventory record is returned immediately (checkpoint resume).

Parameters:
  • fit_payload (dict[str, Any]) – Provenance/config dict describing this fit (reducer, n_components, scope, condition, unit info, input signature, …).

  • container (coco_pipe.io.DataContainer) – Data container for this analysis unit. Must have ids.

  • out_path (pathlib.Path) – Artifact directory to write (or resume from).

  • output_root (pathlib.Path) – Root of the entire run output. Used for relative-path computation in the returned inventory record.

  • overwrite (bool) – When True, an existing out_path directory is deleted before fitting.

  • errors (coco_pipe.dim_reduction._constants.ErrorMode) – "raise" (default) propagates exceptions; "record" catches them, logs, and returns a failed inventory record of the same shape.

Returns:

A flat inventory record suitable for passing to update_runs().

Return type:

dict

coco_pipe.dim_reduction.pipeline.run_fit_group(requests, *, errors='raise')#

Fit a group of requests sharing one analysis unit and reducer.

All requests must describe the same container and reducer, differing only in n_components (as produced by build_fit_request() for one unit’s sweep). When the reducer is hierarchically nested, the largest n_components is fitted once and the smaller sweep values are synthesised by slicing the embedding, components, and explained-variance arrays — avoiding a redundant decomposition per sweep value. Non-nested reducers (or singleton groups) fall back to an independent run_fit() per request, so behaviour is unchanged for UMAP/t-SNE/ICA/etc.

Returns one inventory record per request, in the input order’s resolution (resumed first, then synthesised).

Parameters:
  • requests (list[dict[str, Any]])

  • errors (coco_pipe.dim_reduction._constants.ErrorMode)

Return type:

list[dict[str, Any]]

coco_pipe.dim_reduction.pipeline.run_eval(fit_artifact, container, eval_spec, out_path, output_root, overwrite, *, errors='raise')#

Run one post-hoc evaluation and return an eval-runs inventory record.

The fit provenance is read from fit_artifact["fit"]. If _SUCCESS already exists in out_path and overwrite is False the existing eval artifact is loaded and its inventory record is returned immediately (checkpoint resume).

Parameters:
  • fit_artifact (dict[str, Any]) – Full fit artifact dict as returned by load_fit_artifact().

  • container (coco_pipe.io.DataContainer) – Data container for the analysis unit (must contain the columns referenced by eval_spec).

  • eval_spec (dict[str, Any]) – Eval specification dict with keys name, target_col, group_col, filters, label_map.

  • out_path (pathlib.Path) – Artifact directory to write (or resume from).

  • output_root (pathlib.Path) – Root of the entire run output.

  • overwrite (bool) – When True, an existing out_path directory is deleted before evaluating.

  • errors (coco_pipe.dim_reduction._constants.ErrorMode) – "raise" (default) propagates exceptions; "record" catches them, logs, and returns a failed inventory record.

Returns:

A flat eval inventory record suitable for passing to update_runs().

Return type:

dict

coco_pipe.dim_reduction.pipeline.prepare_eval_inputs(container, fit_ids, eval_spec)#

Align container observations to fit_ids and apply eval filters.

The saved fit may cover a different (or differently ordered) subset of observations than the current container, so this function aligns by observation id (with occurrence-count disambiguation for duplicate ids), applies column filters from eval_spec, resolves labels and groups, and masks out missing values.

Parameters:
  • container (coco_pipe.io.DataContainer) – DataContainer that holds the metadata columns referenced by eval_spec.

  • fit_ids (numpy.ndarray) – Observation ids in the order they appear in the saved embedding.

  • eval_spec (dict[str, Any]) – Eval specification dict (name, target_col, group_col, filters, label_map).

Returns:

selected_index is the pandas Index into the aligned frame (suitable for slicing the embedding array). The remaining three are numpy arrays of strings.

Return type:

tuple of (selected_index, selected_ids, labels, groups)

Raises:
  • ValueError – On missing columns or structural issues.

  • RuntimeError – When no valid samples remain after alignment and filtering.

coco_pipe.dim_reduction.pipeline.valid_n_components_for_container(container, n_components)#

Return True if n_components is feasible for container’s matrix.

Parameters:
Return type:

bool

coco_pipe.dim_reduction.pipeline.valid_component_sweep(container, requested)#

Filter requested to the component counts feasible for container.

Logs a message if any values are skipped.

Parameters:
Return type:

list[int]

coco_pipe.dim_reduction.pipeline.build_auto_pooled_eval_spec(conditions, run_pooled)#

Return a condition_separation eval spec, or None.

The spec is only produced when run_pooled is True and at least two conditions are present — otherwise condition separation is not meaningful.

Parameters:
  • conditions (list[str]) – List of condition names that will be included in the pooled container.

  • run_pooled (bool) – Whether the caller intends to run a pooled analysis.

Return type:

dict[str, Any] | None

coco_pipe.dim_reduction.pipeline.build_fit_request(*, container, scope, condition, unit_spec, reducer, n_components, input_signature, output_root, overwrite=False, subject_col='subject', extra_payload=None, artifact_path=None, artifact_path_factory=None)#

Build a request dictionary suitable for passing to run_fit().

The caller owns project-specific input provenance via input_signature and optional extra_payload. coco-pipe owns the deterministic fit id, standard fit payload fields, and default flat artifact path.

Parameters:
Return type:

dict[str, Any]

coco_pipe.dim_reduction.pipeline.build_eval_request(*, fit_record, eval_spec, container, output_root, overwrite=False, fit_artifact=None, artifact_path=None, artifact_path_factory=None)#

Build a request dictionary suitable for passing to run_eval().

By default, the fit artifact is loaded from fit_record['artifact_path'] relative to output_root and the eval artifact is placed under the flat artifacts/evals directory.

Parameters:
Return type:

dict[str, Any]