hipscat_import.catalog#

All modules for importing new catalogs.

Submodules#

Package Contents#

Classes#

ImportArguments

Container class for holding partitioning arguments

Functions#

map_to_pixels(input_file, file_reader, resume_path, ...)

Map a file of input objects to their healpix pixels.

reduce_pixel_shards(cache_shard_path, resume_path, ...)

Reduce sharded source pixels into destination pixels.

split_pixels(input_file, file_reader, splitting_key, ...)

Map a file of input objects to their healpix pixels and split into shards.

run(args, client)

Run catalog creation pipeline.

class ImportArguments[source]#

Bases: hipscat_import.runtime_arguments.RuntimeArguments

Container class for holding partitioning arguments

epoch: str = 'J2000'#

astronomical epoch for the data. defaults to “J2000”

catalog_type: str = 'object'#

level of catalog data, object (things in the sky) or source (detections)

input_path: hipscat.io.FilePointer | None#

path to search for the input data

input_file_list: List[hipscat.io.FilePointer]#

can be used instead of input_path to import only specified files

input_paths: List[hipscat.io.FilePointer]#

resolved list of all files that will be used in the importer

input_storage_options: Dict[Any, Any] | None#

optional dictionary of abstract filesystem credentials for the INPUT.

ra_column: str = 'ra'#

column for right ascension

dec_column: str = 'dec'#

column for declination

use_hipscat_index: bool = False#

use an existing hipscat spatial index as the position, instead of ra/dec

sort_columns: str | None#

column for survey identifier, or other sortable column. if sorting by multiple columns, they should be comma-separated. if add_hipscat_index=True, this sorting will be used to resolve the counter within the same higher-order pixel space

add_hipscat_index: bool = True#

add the hipscat spatial index field alongside the data

use_schema_file: str | None#

path to a parquet file with schema metadata. this will be used for column metadata when writing the files, if specified

constant_healpix_order: int#

healpix order to use when mapping. if this is a positive number, this will be the order of all final pixels and we will not combine pixels according to the threshold

lowest_healpix_order: int = 0#

the lowest possible healpix order that we will use for the final catalog partitioning. setting this higher than 0 will prevent creating partitions with a large area on the sky.

highest_healpix_order: int = 7#

healpix order to use when mapping. this will not necessarily be the order used in the final catalog, as we may combine pixels that don’t meed the threshold

pixel_threshold: int = 1000000#

maximum number of rows for a single resulting pixel. we may combine hierarchically until we near the pixel_threshold

mapping_healpix_order: int#

healpix order to use when mapping. will be highest_healpix_order unless a positive value is provided for constant_healpix_order

debug_stats_only: bool = False#

do not perform a map reduce and don’t create a new catalog. generate the partition info

file_reader: hipscat_import.catalog.file_readers.InputReader | str | None#

instance of input reader that specifies arguments necessary for reading from your input files

resume_plan: hipscat_import.catalog.resume_plan.ResumePlan | None#

container that handles read/write of log files for this pipeline

__post_init__()[source]#
_check_arguments()[source]#

Check existence and consistency of argument values

to_catalog_info(total_rows) hipscat.catalog.catalog.CatalogInfo[source]#

Catalog-type-specific dataset info.

additional_runtime_provenance_info() dict[source]#

Any additional runtime args to be included in provenance info from subclasses

map_to_pixels(input_file: hipscat.io.FilePointer, file_reader: hipscat_import.catalog.file_readers.InputReader, resume_path: hipscat.io.FilePointer, mapping_key, highest_order, ra_column, dec_column, use_hipscat_index=False)[source]#

Map a file of input objects to their healpix pixels.

Parameters:
  • input_file (FilePointer) – file to read for catalog data.

  • file_reader (hipscat_import.catalog.file_readers.InputReader) – instance of input reader that specifies arguments necessary for reading from the input file.

  • resume_path (FilePointer) – where to write resume partial results.

  • mapping_key (str) – unique counter for this input file, used when creating intermediate files

  • highest_order (int) – healpix order to use when mapping

  • ra_column (str) – where to find right ascension data in the dataframe

  • dec_column (str) – where to find declation in the dataframe

Returns:

one-dimensional numpy array of long integers where the value at each index corresponds to the number of objects found at the healpix pixel.

Raises:
  • ValueError – if the ra_column or dec_column cannot be found in the input file.

  • FileNotFoundError – if the file does not exist, or is a directory

reduce_pixel_shards(cache_shard_path, resume_path, reducing_key, destination_pixel_order, destination_pixel_number, destination_pixel_size, output_path, ra_column, dec_column, sort_columns: str = '', use_hipscat_index=False, add_hipscat_index=True, delete_input_files=True, use_schema_file='', storage_options: Dict[Any, Any] | None = None)[source]#

Reduce sharded source pixels into destination pixels.

In addition to combining multiple shards of data into a single parquet file, this method will add a few new columns:

  • Norder - the healpix order for the pixel

  • Dir - the directory part, corresponding to the pixel

  • Npix - the healpix pixel

  • _hipscat_index - optional - a spatially-correlated 64-bit index field.

Notes on _hipscat_index:

  • if we generate the field, we will promote any previous named pandas index field(s) to a column with that name.

  • see hipscat.pixel_math.hipscat_id for more in-depth discussion of this field.

Parameters:
  • cache_shard_path (FilePointer) – where to read intermediate parquet files.

  • resume_path (FilePointer) – where to write resume files.

  • reducing_key (str) – unique string for this task, used for resume files.

  • origin_pixel_numbers (list[int]) – high order pixels, with object data written to intermediate directories.

  • destination_pixel_order (int) – order of the final catalog pixel

  • destination_pixel_number (int) – pixel number at the above order

  • destination_pixel_size (int) – expected number of rows to write for the catalog’s final pixel

  • output_path (FilePointer) – where to write the final catalog pixel data

  • sort_columns (str) – column for survey identifier, or other sortable column

  • add_hipscat_index (bool) – should we add a _hipscat_index column to the resulting parquet file?

  • delete_input_files (bool) – should we delete the intermediate files used as input for this method.

  • use_schema_file (str) – use the parquet schema from the indicated parquet file.

Raises:

ValueError – if the number of rows written doesn’t equal provided destination_pixel_size

split_pixels(input_file: hipscat.io.FilePointer, file_reader: hipscat_import.catalog.file_readers.InputReader, splitting_key, highest_order, ra_column, dec_column, cache_shard_path: hipscat.io.FilePointer, resume_path: hipscat.io.FilePointer, alignment=None, use_hipscat_index=False)[source]#

Map a file of input objects to their healpix pixels and split into shards.

Parameters:
  • input_file (FilePointer) – file to read for catalog data.

  • file_reader (hipscat_import.catalog.file_readers.InputReader) – instance of input reader that specifies arguments necessary for reading from the input file.

  • splitting_key (str) – unique counter for this input file, used when creating intermediate files

  • highest_order (int) – healpix order to use when mapping

  • ra_column (str) – where to find right ascension data in the dataframe

  • dec_column (str) – where to find declation in the dataframe

  • cache_shard_path (FilePointer) – where to write intermediate parquet files.

  • resume_path (FilePointer) – where to write resume files.

Raises:
  • ValueError – if the ra_column or dec_column cannot be found in the input file.

  • FileNotFoundError – if the file does not exist, or is a directory

run(args, client)[source]#

Run catalog creation pipeline.