File Readers#
In the catalog import pipeline, we use InputReader objects to iterate through
the input files. This allows us to take in a variety of file formats, without having
to re-write the entire processing pipeline for each kind of file format we encounter.
For the curious, see the API documentation for
hats_import.catalog.file_readers.InputReader.
Index Readers#
If you have many small files (think 400k+ CSV files with a few rows each), you may benefit from “index” file readers. These allow you to explicitly create batches for tasks by providing a set of index files, where each file is a text file that contains only paths to data files.
Benefits:
If you have 400k+ input files, you don’t want to create 400k+ dask tasks to process these files.
If the files are very small, batching them in this way allows the import process to combine several small files into a single chunk for processing. This will result in fewer intermediate files during the
splittingstage.If you have parquet files over a slow networked file system, we support pyarrow’s readahead protocol through index readers.
Warnings:
If you have 20 dask workers in your pool, you may be tempted to create 20 index files. This is not always an efficient use of resources! You’d be better served by 200 index files, so that:
dask can spread the load if some lists of files take longer to process than others
if the pipeline dies after successfully processing 15 lists, when you retry the pipeline, you’ll only be processing 5 lists with those same 20 workers and many workers will be sitting idle.
The read_index_file method is provided as a convenience to read these
index files, and returns a list of paths to be read by your method.
def read(self, input_file, read_columns=None):
file_paths = self.read_index_file(
input_file=input_file, upath_kwargs=self.upath_kwargs, **self.kwargs
)
Pandas vs Pyarrow#
The read method can either yield a pandas.DataFrame object or a pyarrow.Table.
If yielding a pandas dataframe, note that further pipeline stages will convert
into a pyarrow.Table anyway.
We provide alternative implementations of some common readers that will use pyarrow file readers. This can be faster, as it avoids unneccessary conversion between table formats, but you may encounter rougher edges.
Iterate by Row Groups#
For Parquet files, both the ParquetPandasReader and ParquetPyarrowReader
support reading by row groups, rather than by chunks of rows. This can be more
efficient if your Parquet files are already organized into row groups that are
suitable for your processing.
To enable this, set the iterate_by_row_groups parameter to True when
instantiating the reader. Then, when you call the read method, it will yield
dataframes or tables corresponding to each row group in the Parquet file.
args = ImportArguments(
file_reader=ParquetPyarrowReader(iterate_by_row_groups=True),
...
)
Built-in Classes and Functions#
|
Get a generator file reader for common file types |
Base class for chunking file readers. |
|
CSV reader for the most common CSV reading arguments. |
|
CSV reader that uses the pyarrow library for reading. |
|
Reads an index file, containing paths to CSV files to be read and batched |
|
Parquet reader for the most common Parquet reading arguments. |
|
Parquet reader that uses the pyarrow library for reading. |
|
Reads an index file, containing paths to parquet files to be read and batched |
|
Reads astropy ascii .ecsv files. |
|
Chunked FITS file reader. |
Defining your own class#
If you need to write your own input file reader, it should subclass InputReader
and must provide a read method.
Important
On the first pass through the data, we just need to read the RA and Dec columns, and so
the read_columns value will be a list with just these two columns. You can use this
information to read less data, if possible given the file format.
You can either yield a pandas.DataFrame object or a pyarrow.Table. If yielding
a pandas dataframe, note that further pipeline stages will convert into a pyarrow.Table
anyway.
You must yield the result, and should utilize chunking to avoid memory issues.
Below is an InputReader implementation for the fictional Starr file format.
This showcases that the read method does all of the format-specific logic.
Here, we are even using some fictional shortcut to get the radec of rows in the table
when the read_columns are provided. This can potentially speed up early pipeline
execution by avoiding more expensive reads when they’re not needed.
class StarrReader(InputReader):
"""Class for fictional Starr file format."""
def __init__(self, chunksize=500_000, **kwargs):
self.chunksize = chunksize
self.kwargs = kwargs
def read(self, input_file, read_columns=None):
if read_columns:
## shortcut - just need positions
starr_manifest = starr_io.read_manifest(input_file, **self.kwargs)
ra_array, dec_array = starr_manifest.read_positions()
yield pd.Dataframe({"ra" : ra_array, "dec" : dec_array})
else:
## get all of the fields
starr_file = starr_io.read_table(input_file, **self.kwargs)
for smaller_table in starr_file.to_batches(max_chunksize=self.chunksize):
smaller_table = join_to_starr_data(smaller_table)
yield smaller_table.to_pandas()
And here we show how the custom reader would be instantiated and passed along to the catalog import pipeline.
args = ImportArguments(
...
## Locates files like "/directory/to/files/**starr"
input_path="/directory/to/files/",
## NB - you need the parens here!
file_reader=StarrReader(),
)
While this can provide self-service for importing new file formats, we’re here for you if this is all too much. Contact us.