Catalog Import Arguments#

This page discusses a few topics around setting up a catalog pipeline.

Once your catalog has been imported, you can verify and inspect the metadata, either with the Verification Pipeline, or methods described in the Manual catalog verification notebook

At a minimum, you need arguments that include where to find the input files, the column names for RA, and DEC, and where to put the output files. A minimal arguments block will look something like:

from hats_import.catalog.arguments import ImportArguments

args = ImportArguments(
    sort_columns="ObjectID",
    ra_column="ObjectRA",
    dec_column="ObjectDec",
    input_path="./my_data",
    file_reader="csv",
    output_artifact_name="test_cat",
    output_path="./output",
)

More details on each of these parameters is provided in sections below.

For the curious, see the API documentation for hats_import.catalog.arguments.ImportArguments.

Pipeline setup#

Dask#

We will either use a user-provided dask Client, or create a new one with arguments:

dask_tmp - str - directory for dask worker space. this should be local to the execution of the pipeline, for speed of reads and writes. For much more information, see Temporary files and disk usage.

dask_n_workers - int - number of workers for the dask client. Defaults to 1.

dask_threads_per_worker - int - number of threads per dask worker. Defaults to 1.

If you find that you need additional parameters for your dask client (e.g are creating a SLURM worker pool), you can instead create your own dask client and pass along to the pipeline, ignoring the above arguments. This would look like:

from dask.distributed import Client
from hats_import.pipeline import pipeline_with_client

args = ...  # ImportArguments()
with Client('scheduler:port') as client:
    pipeline_with_client(args, client)

If you’re running within a .py file, we recommend you use a main guard to potentially avoid some python threading issues with dask:

from hats_import.pipeline import pipeline

def import_pipeline():
    args = ...
    pipeline(args)

if __name__ == '__main__':
    import_pipeline()

Resuming#

The import pipeline has the potential to be a very long-running process, if you’re importing large amounts of data, or performing complex transformations on the data before writing.

While the pipeline runs, we take notes of our progress so that the pipeline can be resumed at a later time, if the job is pre-empted or canceled for any reason.

When instantiating a pipeline, you can use the resume flag to indicate that we can resume from an earlier execution of the pipeline. By default, if any resume files are found, we will restore the pipeline’s previous progress.

If you want to start the pipeline from scratch you can simply set resume=False. Alternatively, go to the temp directory you’ve specified and remove any intermediate files created by the previous runs of the hats-import pipeline. You should also remove the output directory if it has any content. The resume argument performs these cleaning operations automatically for you.

For more information about the kinds of files we write to enable this feature, see Temporary files and disk usage.

By default, these notes files, and the intermediate parquet leaf files are removed once they’re no longer needed by the pipeline. If, for whatever reason, you would like to retain these files, you can use flags:

  • delete_resume_log_files=False will keep resume notes/logs files

  • delete_intermediate_parquet_files=False will retain ALL of the intermediate parquet leaf files.

Reading input files#

Catalog import reads through a list of files and converts them into a hats-sharded catalog.

If you already know the size of your catalog, and would like us to verify that the same number of rows exists at each checkpoint in the pipeline, pass expected_total_rows=<int>.

Which files?#

There are a few ways to specify the files to read:

  • input_path:

    will include all files in the indicated directory.

  • input_file_list:

    a list of fully-specified paths you want to read.

    • this strategy can be useful to first run the import on a single input file and validate the input, then run again on the full input set, or to debug a single input file with odd behavior.

    • if you have a mix of files in your target directory, you can use a glob statement like the following to gather input files:

in_file_paths = glob.glob("/data/object_and_source/object**.csv")
in_file_paths.sort()

Important

We will create one Dask task per element inside the input_file_list.

If you have only one large file, we can only read the file with one Dask worker. If you have lots of files, you might want to consider the Indexed batching strategy described on the page for File Readers.

How to read them?#

Specify an instance of InputReader for the file_reader parameter.

We use the InputReader class to read files in chunks and pass the chunks along to the map/reduce stages. We’ve provided reference implementations for reading CSV, FITS, and Parquet input files, but you can subclass the reader type to suit whatever input files you’ve got.

You only need to provide an object file_reader argument if you are using a custom file reader or passing parameters to the file reader. For example you might use file_reader=CsvReader(sep="\s+") to parse a whitespace separated file. Otherwise, you can use a short string to specify an existing file reader type e.g. file_reader="csv".

Note

For more information on file readers, including custom subclasses, tuning, and optimizations for small files see the page on File Readers.

If you’re reading from cloud storage, or otherwise have some filesystem credential dict, initialize input_file using universal_pathlib’s utilities.

Which fields?#

Specify the ra_column and dec_column for the dataset.

There are two fields that we require in order to make a valid hats-sharded catalog, the right ascension and declination. At this time, this is the only supported system for celestial coordinates.

If you’re importing data that has previously been hats-sharded, you may use use_healpix_29 = True. This will use that previously computed hats spatial index as the position, instead of ra/dec.

Healpix order and thresholds#

When creating a new catalog through the hats-import process, we try to create partitions with approximately the same number of rows per partition. This isn’t perfect, because the sky is uneven, but we still try to create smaller-area pixels in more dense areas, and larger-area pixels in less dense areas.

We use the argument pixel_threshold and will split a partition into smaller healpix pixels until the number of rows is smaller than pixel_threshold. We will only split by healpix pixels up to the highest_healpix_order. If we would need to split further, we’ll throw an error at the “Binning” stage, and you should adjust your parameters.

For more discussion of the pixel_threshold argument and a strategy for setting this parameter, see notebook Estimate pixel threshold

For more discussion of the “Binning” and all other stages, see Temporary files and disk usage.

Sparse Datasets#

For sparse datasets you might want to force your catalog partitioning to avoid partitions with very large area on they sky.

Why? If you have sparse data that you know you will want to cross-match or join to a catalog that is much denser, you may find yourself trying to match a large (in terms of area on the sky) pixel to thousands of smaller pixels in the denser catalog that occupy the same large region in the sky. Using more pixels of higher order will have some inefficiencies in terms of on-disk storage, but will be easier to compute joins and cross-matches to large datasets.

There are a few strategies for tweaking the partitioning:

  • order range - use the lowest_healpix_order argument, in addition to the highest_healpix_order.

  • constant order - use the constant_healpix_order argument. This will ignore the pixel_threshold, highest_healpix_order, and lowest_healpix_order arguments and the catalog will be partitioned by healpix pixels at the constant_healpix_order.

  • drop empty siblings - by default, the drop_empty_siblings flag is set to True, and will try to use smaller area pixels in sparse regions of the sky. This can make the catalog footprint closer to the actual survey footprint, but can be disabled if desired.

Progress Reporting#

By default, we will display some progress bars during pipeline execution. To disable these (e.g. when you expect no output to standard out), you can set progress_bar=False.

There are several stages to the pipeline execution, and you can expect progress reporting to look like the following:

Mapping  : 100%|██████████| 72/72 [58:55:18<00:00, 2946.09s/it]
Binning  : 100%|██████████| 1/1 [01:15<00:00, 75.16s/it]
Splitting: 100%|██████████| 72/72 [72:50:03<00:00, 3641.71s/it]
Reducing : 100%|██████████| 10895/10895 [7:46:07<00:00,  2.57s/it]
Finishing: 100%|██████████| 6/6 [08:03<00:00, 80.65s/it]

We use tqdm to render these beautiful progress bars. tqdm will try to make a guess about the type of output to provide: plain text as for a command line, or a pretty ipywidget. If it tries to use a pretty widget but your execution environment can’t support the widget, you can force the pipeline to use a simple progress bar with the simple_progress_bar argument. If you want to configure the progress bar any further, pass values to the tqdm_kwargs parameter. You can find more details on available arguments at the tqdm documentation.

For very long-running pipelines (e.g. multi-TB inputs), you can get an email notification when the pipeline completes using the completion_email_address argument. This will send a brief email, for either pipeline success or failure.

Output#

Where?#

You must specify a name for the catalog, using output_artifact_name.

You must specify where you want your catalog data to be written, using output_path. This path should be the base directory for your catalogs, as the full path for the catalog will take the form of output_path/output_artifact_name.

If you’re writing to cloud storage, or otherwise have some filesystem credential dict, initialize output_path using universal_pathlib’s utilities.

In addition, you can specify directories to use for various intermediate files:

  • dask worker space (dask_tmp)

  • sharded parquet files (tmp_dir)

  • intermediate resume files (resume_tmp)

Most users are going to be ok with simply setting the tmp_dir for all intermediate file use. For more information on these parameters, when you would use each, and demonstrations of temporary file use see Temporary files and disk usage

How to write the parquet files?#

You may want to tweak parameters of the final catalog output, and we have helper arguments for a few of those.

write_table_kwargs - any additional arguments to customize the pyarrow.parquet.ParquetWriter. This can be used for alternative compression strategies. By default we use ZSTD compression at level 15.

Parquet compression is also effected by the row groups. We can attempt to split the data partition into row group chunks according to a few strategies:

  • Number of rows - Chunk according to integer number of rows in num_rows arg.

  • HEALPix subtiles - Chunk according to finer spatial divisions within the HEALPix pixel of the data partition. This is controlled via the subtile_order_delta. Reasonable values here are 1, 2, or 3.

Hint

This behavior may be confusing, so let’s talk about it a little bit more.

Say we’re writing a data partition for a HEALPix order 6 tile.

  • With a 1, there are 4 maximum row groups, for each of the 4 subtiles of HEALPix order 7.

  • With a 2, there are 16 maximum row groups, for each of the 16 subtiles of HEALPix order 8.

  • With a 3, there are 64 maximum row groups, for each of the 64 subtiles of HEALPix order 9.

  • With a value any higher, there will be so many row groups in each file that you will likely stop seeing the benefits of these divisions.

Within the same pipeline, we would also write some partitions at HEALPix order 4, and they would have a similar number of row group divisions:

  • With a 1, there are 4 subtiles of HEALPix order 5.

  • With a 2, there are 16 subtiles of HEALPix order 6.

  • With a 3, there are 64 subtiles of HEALPix order 7.

This is why we describe the subdivisions as a “delta”, instead of an absolute HEALPix order.

If you’re interested in other row group strategies, please reach out and we can consider adding them.

add_healpix_29 - bool - whether or not to add the hats spatial index as a column in the resulting catalog. The _healpix_29 field is designed to make many dask operations more performant, but if you do not intend to publish your dataset and do not intend to use dask, then you can suppress generation of this column to save a little space in your final disk usage.

The _healpix_29 uses a high healpix order to create values that can order all points in the sky, according to a nested healpix scheme.

sort_columns - str - column for survey identifier, or other sortable column. If sorting by multiple columns, they should be comma-separated. If add_healpix_29=True, _healpix_29 will be the primary sort key, but the provided sorting will be used for any rows within the same higher-order pixel space.

use_schema_file - str - path to a parquet file with schema metadata. This will be used for column metadata when writing the files, if specified. For more information on why you would want this file and how to generate it, check out our notebook Unequal schema problems.

debug_stats_only - bool - If True, we will not create the leaf parquet files with the catalog data, and will only generate root-level metadata files representing the full statistics of the final catalog. This can be useful when probing the import process for effectiveness on processing a target dataset.

How to write the metadata files?#

catalog_type - "object" or "source". Indicates the level of catalog data, using the LSST nomenclature:

  • object - things in the sky (e.g. stars, galaxies)

  • source - detections of things in the sky at some point in time.

  • map - non-point-source catalogs (e.g. dust maps)

Some data providers split detection-level data into a separate catalog, to make object catalogs smaller, and reflects a relational data model.

create_thumbnail - optionally create a dataset/data_thumbnail.parquet file, that will contain the first row of each data partition. This can be useful as a representative sample of the catalog data.

should_write_skymap - defaults to True, but you can disable writing a skymap.fits file with this argument. Further, to create several down-sampled skymaps, pass a list of HEALPix orders to skymap_alt_orders (e.g. skymap_alt_orders=[2,4,6]).

The HATS format allows for many additional key-values in the high-level hats.properties file. Many of these values are automatically set by the import process itself, but catalog providers may want to set additional fields for data provenance.

This full set of properties is outlined on a separate page (HATS properties), but you can pass these key-value sets to the import process with the addl_hats_properties argument, and they will appear in the final hats.properties file:

addl_hats_properties={"hats_cols_default": "id, mjd", "obs_regime": "Optical"},

Incremental catalogs#

An “incremental catalog” is one that can be appended to by adding more files into the leaf data partition directory.

We need to start with a catalog that has leaf directories instead of leaf parquet files. You can import the starting data with additional arguments:

  • npix_suffix="/" - this tells the pipeline to create a directory per-leaf.

  • npix_parquet_name - this is the file name given to the first file in the directory. By default, this will be Npix=M.parquet.

To append to an incremental catalog, the pipeline should be run with the same partitioning, potentially adding new pixels to fill in coverage gaps. To make sure the same set of pixels are used, provide existing_pixels as a list of HEALPix (order, pixel) tuples.