hipscat_import.pipeline_resume_plan#

Utility to hold a pipeline’s execution and resume plan.

Module Contents#

Classes#

PipelineResumePlan

Container class for holding the state of pipeline plan.

Functions#

get_pixel_cache_directory(cache_path, pixel)

Create a path for intermediate pixel data.

class PipelineResumePlan[source]#

Container class for holding the state of pipeline plan.

tmp_path: hipscat.io.FilePointer[source]#

path for any intermediate files

resume: bool = True[source]#

if there are existing intermediate resume files, should we read those and continue to run pipeline where we left off

progress_bar: bool = True[source]#

if true, a tqdm progress bar will be displayed for user feedback of planning progress

ORIGINAL_INPUT_PATHS = 'input_paths.txt'[source]#
safe_to_resume()[source]#

Check that we are ok to resume an in-progress pipeline, if one exists.

Raises:

ValueError – if the tmp_path already exists and contains some files.

done_file_exists(stage_name)[source]#

Is there a file at a given path? For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete.

Parameters:

stage_name (str) – name of the stage (e.g. mapping, reducing)

Returns:

boolean, True if the done file exists at tmp_path. False otherwise.

touch_stage_done_file(stage_name)[source]#

Touch (create) a done file for a whole pipeline stage. For a done file, the existence of the file is the only signal needed to indicate a pipeline segment is complete.

Parameters:

stage_name (str) – name of the stage (e.g. mapping, reducing)

classmethod touch_key_done_file(tmp_path, stage_name, key)[source]#

Touch (create) a done file for a single key, within a pipeline stage.

Parameters:
  • stage_name (str) – name of the stage (e.g. mapping, reducing)

  • stage_name – name of the stage (e.g. mapping, reducing)

  • key (str) – unique string for each task (e.g. “map_57”)

read_done_keys(stage_name)[source]#

Inspect the stage’s directory of done files, fetching the keys from done file names.

Parameters:

stage_name (str) – name of the stage (e.g. mapping, reducing)

Returns:

List[str] - all keys found in done directory

static get_keys_from_file_names(directory, extension)[source]#

Gather keys for successful tasks from result file names.

Parameters:
  • directory – where to look for result files. this is NOT a recursive lookup

  • extension (str) – file suffix to look for and to remove from all file names. if you expect a file like “map_01.csv”, extension should be “.csv”

Returns:

list of keys taken from files like /resume/path/{key}{extension}

clean_resume_files()[source]#

Remove all intermediate files created in execution.

wait_for_futures(futures, stage_name)[source]#

Wait for collected futures to complete.

As each future completes, check the returned status.

Parameters:
  • futures (List[future]) – collected futures

  • stage_name (str) – name of the stage (e.g. mapping, reducing)

Raises:

RuntimeError – if any future returns an error status.

static get_formatted_stage_name(stage_name) str[source]#

Create a stage name of consistent minimum length. Ensures that the tqdm progress bars can line up nicely when multiple stages must run.

Parameters:

stage_name (str) – name of the stage (e.g. mapping, reducing)

check_original_input_paths(input_paths)[source]#

Validate that we’re operating on the same file set as the original pipeline, or save the inputs as the originals if not found.

Parameters:

input_paths (list[str]) – input paths that will be processed by a pipeline.

Raises:

ValueError – if the retrieved file set differs from input_paths.

get_pixel_cache_directory(cache_path, pixel: hipscat.pixel_math.healpix_pixel.HealpixPixel)[source]#

Create a path for intermediate pixel data.

You can use this over the paths.get_pixel_directory method, as it will include the pixel number in the path. Further, it will just look different from a real hipscat path, so it’s clearer that it’s a temporary directory:

{cache_path}/order_{order}/dir_{dir}/pixel_{pixel}/
Parameters:
  • cache_path (str) – root path to cache

  • pixel (HealpixPixel) – pixel partition data