File Readers#

In the catalog import pipeline, we use InputReader objects to iterate through the input files. This allows us to take in a variety of file formats, without having to re-write the entire processing pipeline for each kind of file format we encounter.

For the curious, see the API documentation for hats_import.catalog.file_readers.InputReader.

Index Readers#

If you have many small files (think 400k+ CSV files with a few rows each), you may benefit from “index” file readers. These allow you to explicitly create batches for tasks by providing a set of index files, where each file is a text file that contains only paths to data files.

Benefits:

  1. If you have 400k+ input files, you don’t want to create 400k+ dask tasks to process these files.

  2. If the files are very small, batching them in this way allows the import process to combine several small files into a single chunk for processing. This will result in fewer intermediate files during the splitting stage.

  3. If you have parquet files over a slow networked file system, we support pyarrow’s readahead protocol through index readers.

Warnings:

  1. If you have 20 dask workers in your pool, you may be tempted to create 20 index files. This is not always an efficient use of resources! You’d be better served by 200 index files, so that:

    1. dask can spread the load if some lists of files take longer to process than others

    2. if the pipeline dies after successfully processing 15 lists, when you retry the pipeline, you’ll only be processing 5 lists with those same 20 workers and many workers will be sitting idle.

The read_index_file method is provided as a convenience to read these index files, and returns a list of paths to be read by your method.

def read(self, input_file, read_columns=None):
    file_paths = self.read_index_file(
        input_file=input_file, upath_kwargs=self.upath_kwargs, **self.kwargs
    )

Pandas vs Pyarrow#

The read method can either yield a pandas.DataFrame object or a pyarrow.Table. If yielding a pandas dataframe, note that further pipeline stages will convert into a pyarrow.Table anyway.

We provide alternative implementations of some common readers that will use pyarrow file readers. This can be faster, as it avoids unneccessary conversion between table formats, but you may encounter rougher edges.

Iterate by Row Groups#

For Parquet files, both the ParquetPandasReader and ParquetPyarrowReader support reading by row groups, rather than by chunks of rows. This can be more efficient if your Parquet files are already organized into row groups that are suitable for your processing.

To enable this, set the iterate_by_row_groups parameter to True when instantiating the reader. Then, when you call the read method, it will yield dataframes or tables corresponding to each row group in the Parquet file.

args = ImportArguments(
    file_reader=ParquetPyarrowReader(iterate_by_row_groups=True),
    ...
)

Built-in Classes and Functions#

get_file_reader(file_format[, chunksize, ...])

Get a generator file reader for common file types

InputReader

Base class for chunking file readers.

CsvReader

CSV reader for the most common CSV reading arguments.

CsvPyarrowReader

CSV reader that uses the pyarrow library for reading.

IndexedCsvReader

Reads an index file, containing paths to CSV files to be read and batched

ParquetPandasReader

Parquet reader for the most common Parquet reading arguments.

ParquetPyarrowReader

Parquet reader that uses the pyarrow library for reading.

IndexedParquetReader

Reads an index file, containing paths to parquet files to be read and batched

AstropyEcsvReader

Reads astropy ascii .ecsv files.

FitsReader

Chunked FITS file reader.

Defining your own class#

If you need to write your own input file reader, it should subclass InputReader and must provide a read method.

Important

On the first pass through the data, we just need to read the RA and Dec columns, and so the read_columns value will be a list with just these two columns. You can use this information to read less data, if possible given the file format.

You can either yield a pandas.DataFrame object or a pyarrow.Table. If yielding a pandas dataframe, note that further pipeline stages will convert into a pyarrow.Table anyway.

You must yield the result, and should utilize chunking to avoid memory issues.

Below is an InputReader implementation for the fictional Starr file format. This showcases that the read method does all of the format-specific logic.

Here, we are even using some fictional shortcut to get the radec of rows in the table when the read_columns are provided. This can potentially speed up early pipeline execution by avoiding more expensive reads when they’re not needed.

class StarrReader(InputReader):
    """Class for fictional Starr file format."""
    def __init__(self, chunksize=500_000, **kwargs):
        self.chunksize = chunksize
        self.kwargs = kwargs

    def read(self, input_file, read_columns=None):
        if read_columns:
           ## shortcut - just need positions
           starr_manifest = starr_io.read_manifest(input_file, **self.kwargs)
           ra_array, dec_array = starr_manifest.read_positions()
           yield pd.Dataframe({"ra" : ra_array, "dec" : dec_array})
        else:
           ## get all of the fields
           starr_file = starr_io.read_table(input_file, **self.kwargs)
           for smaller_table in starr_file.to_batches(max_chunksize=self.chunksize):
              smaller_table = join_to_starr_data(smaller_table)
              yield smaller_table.to_pandas()

And here we show how the custom reader would be instantiated and passed along to the catalog import pipeline.

args = ImportArguments(
    ...
    ## Locates files like "/directory/to/files/**starr"
    input_path="/directory/to/files/",
    ## NB - you need the parens here!
    file_reader=StarrReader(),
)

While this can provide self-service for importing new file formats, we’re here for you if this is all too much. Contact us.