hipscat_import.catalog.map_reduce#

Import a set of non-hipscat files using dask for parallelization

Module Contents#

Functions#

_has_named_index(dataframe)

Heuristic to determine if a dataframe has some meaningful index.

_iterate_input_file(input_file, file_reader, ...[, ...])

Helper function to handle input file reading and healpix pixel calculation

map_to_pixels(input_file, file_reader, resume_path, ...)

Map a file of input objects to their healpix pixels.

split_pixels(input_file, file_reader, splitting_key, ...)

Map a file of input objects to their healpix pixels and split into shards.

reduce_pixel_shards(cache_shard_path, resume_path, ...)

Reduce sharded source pixels into destination pixels.

_has_named_index(dataframe)[source]#

Heuristic to determine if a dataframe has some meaningful index.

This will reject dataframes with no index name for a single index, or empty names for multi-index (e.g. [] or [None]).

_iterate_input_file(input_file: hipscat.io.FilePointer, file_reader: hipscat_import.catalog.file_readers.InputReader, highest_order, ra_column, dec_column, use_hipscat_index=False, read_columns=None)[source]#

Helper function to handle input file reading and healpix pixel calculation

map_to_pixels(input_file: hipscat.io.FilePointer, file_reader: hipscat_import.catalog.file_readers.InputReader, resume_path: hipscat.io.FilePointer, mapping_key, highest_order, ra_column, dec_column, use_hipscat_index=False)[source]#

Map a file of input objects to their healpix pixels.

Parameters:
  • input_file (FilePointer) – file to read for catalog data.

  • file_reader (hipscat_import.catalog.file_readers.InputReader) – instance of input reader that specifies arguments necessary for reading from the input file.

  • resume_path (FilePointer) – where to write resume partial results.

  • mapping_key (str) – unique counter for this input file, used when creating intermediate files

  • highest_order (int) – healpix order to use when mapping

  • ra_column (str) – where to find right ascension data in the dataframe

  • dec_column (str) – where to find declation in the dataframe

Returns:

one-dimensional numpy array of long integers where the value at each index corresponds to the number of objects found at the healpix pixel.

Raises:
  • ValueError – if the ra_column or dec_column cannot be found in the input file.

  • FileNotFoundError – if the file does not exist, or is a directory

split_pixels(input_file: hipscat.io.FilePointer, file_reader: hipscat_import.catalog.file_readers.InputReader, splitting_key, highest_order, ra_column, dec_column, cache_shard_path: hipscat.io.FilePointer, resume_path: hipscat.io.FilePointer, alignment=None, use_hipscat_index=False)[source]#

Map a file of input objects to their healpix pixels and split into shards.

Parameters:
  • input_file (FilePointer) – file to read for catalog data.

  • file_reader (hipscat_import.catalog.file_readers.InputReader) – instance of input reader that specifies arguments necessary for reading from the input file.

  • splitting_key (str) – unique counter for this input file, used when creating intermediate files

  • highest_order (int) – healpix order to use when mapping

  • ra_column (str) – where to find right ascension data in the dataframe

  • dec_column (str) – where to find declation in the dataframe

  • cache_shard_path (FilePointer) – where to write intermediate parquet files.

  • resume_path (FilePointer) – where to write resume files.

Raises:
  • ValueError – if the ra_column or dec_column cannot be found in the input file.

  • FileNotFoundError – if the file does not exist, or is a directory

reduce_pixel_shards(cache_shard_path, resume_path, reducing_key, destination_pixel_order, destination_pixel_number, destination_pixel_size, output_path, ra_column, dec_column, sort_columns: str = '', use_hipscat_index=False, add_hipscat_index=True, delete_input_files=True, use_schema_file='', storage_options: Dict[Any, Any] | None = None)[source]#

Reduce sharded source pixels into destination pixels.

In addition to combining multiple shards of data into a single parquet file, this method will add a few new columns:

  • Norder - the healpix order for the pixel

  • Dir - the directory part, corresponding to the pixel

  • Npix - the healpix pixel

  • _hipscat_index - optional - a spatially-correlated 64-bit index field.

Notes on _hipscat_index:

  • if we generate the field, we will promote any previous named pandas index field(s) to a column with that name.

  • see hipscat.pixel_math.hipscat_id for more in-depth discussion of this field.

Parameters:
  • cache_shard_path (FilePointer) – where to read intermediate parquet files.

  • resume_path (FilePointer) – where to write resume files.

  • reducing_key (str) – unique string for this task, used for resume files.

  • origin_pixel_numbers (list[int]) – high order pixels, with object data written to intermediate directories.

  • destination_pixel_order (int) – order of the final catalog pixel

  • destination_pixel_number (int) – pixel number at the above order

  • destination_pixel_size (int) – expected number of rows to write for the catalog’s final pixel

  • output_path (FilePointer) – where to write the final catalog pixel data

  • sort_columns (str) – column for survey identifier, or other sortable column

  • add_hipscat_index (bool) – should we add a _hipscat_index column to the resulting parquet file?

  • delete_input_files (bool) – should we delete the intermediate files used as input for this method.

  • use_schema_file (str) – use the parquet schema from the indicated parquet file.

Raises:

ValueError – if the number of rows written doesn’t equal provided destination_pixel_size