hipscat_import.pipeline_resume_plan
#
Utility to hold a pipeline’s execution and resume plan.
Module Contents#
Classes#
Container class for holding the state of pipeline plan. |
Functions#
|
Create a path for intermediate pixel data. |
- class PipelineResumePlan[source]#
Container class for holding the state of pipeline plan.
- resume: bool = True[source]#
if there are existing intermediate resume files, should we read those and continue to run pipeline where we left off
- progress_bar: bool = True[source]#
if true, a tqdm progress bar will be displayed for user feedback of planning progress
- safe_to_resume()[source]#
Check that we are ok to resume an in-progress pipeline, if one exists.
- Raises:
ValueError – if the tmp_path already exists and contains some files.
- done_file_exists(stage_name)[source]#
Is there a file at a given path? For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete.
- Parameters:
stage_name (str) – name of the stage (e.g. mapping, reducing)
- Returns:
boolean, True if the done file exists at tmp_path. False otherwise.
- touch_stage_done_file(stage_name)[source]#
Touch (create) a done file for a whole pipeline stage. For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete.
- Parameters:
stage_name (str) – name of the stage (e.g. mapping, reducing)
- classmethod touch_key_done_file(tmp_path, stage_name, key)[source]#
Touch (create) a done file for a single key, within a pipeline stage.
- Parameters:
stage_name (str) – name of the stage (e.g. mapping, reducing)
stage_name – name of the stage (e.g. mapping, reducing)
key (str) – unique string for each task (e.g. “map_57”)
- read_done_keys(stage_name)[source]#
Inspect the stage’s directory of done files, fetching the keys from done file names.
- Parameters:
stage_name (str) – name of the stage (e.g. mapping, reducing)
- Returns:
List[str] - all keys found in done directory
- static get_keys_from_file_names(directory, extension)[source]#
Gather keys for successful tasks from result file names.
- Parameters:
directory – where to look for result files. this is NOT a recursive lookup
extension (str) – file suffix to look for and to remove from all file names. if you expect a file like “map_01.csv”, extension should be “.csv”
- Returns:
list of keys taken from files like /resume/path/{key}{extension}
- wait_for_futures(futures, stage_name)[source]#
Wait for collected futures to complete.
As each future completes, check the returned status.
- Parameters:
futures (List[future]) – collected futures
stage_name (str) – name of the stage (e.g. mapping, reducing)
- Raises:
RuntimeError – if any future returns an error status.
- static get_formatted_stage_name(stage_name) str [source]#
Create a stage name of consistent minimum length. Ensures that the tqdm progress bars can line up nicely when multiple stages must run.
- Parameters:
stage_name (str) – name of the stage (e.g. mapping, reducing)
- check_original_input_paths(input_paths)[source]#
Validate that we’re operating on the same file set as the original pipeline, or save the inputs as the originals if not found.
- Parameters:
input_paths (list[str]) – input paths that will be processed by a pipeline.
- Raises:
ValueError – if the retrieved file set differs from input_paths.
- get_pixel_cache_directory(cache_path, pixel: hipscat.pixel_math.healpix_pixel.HealpixPixel)[source]#
Create a path for intermediate pixel data.
You can use this over the paths.get_pixel_directory method, as it will include the pixel number in the path. Further, it will just look different from a real hipscat path, so it’s clearer that it’s a temporary directory:
{cache_path}/order_{order}/dir_{dir}/pixel_{pixel}/
- Parameters:
cache_path (str) – root path to cache
pixel (HealpixPixel) – pixel partition data