File Readers
===========================

In the catalog import pipeline, we use ``InputReader`` objects to iterate through
the input files. This allows us to take in a variety of file formats, without having
to re-write the entire processing pipeline for each kind of file format we encounter.

For the curious, see the API documentation for 
:py:class:`hats_import.catalog.file_readers.InputReader`.

Index Readers
~~~~~~~~~~~~~~~~~~~~~~~~~~

If you have many small files (think 400k+ CSV files with a few rows each), you
may benefit from "index" file readers. These allow you to explicitly create 
batches for tasks by providing a set of index files, where each file is a 
text file that contains only paths to data files.

Benefits:

1. If you have 400k+ input files, you don't want to create 400k+ dask tasks
   to process these files.
2. If the files are very small, batching them in this way allows the import 
   process to *combine* several small files into a single chunk for processing.
   This will result in fewer intermediate files during the ``splitting`` stage.
3. If you have parquet files over a slow networked file system, we support
   pyarrow's readahead protocol through index readers.

Warnings:

1. If you have 20 dask workers in your pool, you may be tempted to create 
   20 index files. This is not always an efficient use of resources! 
   You'd be better served by 200 index files, so that:

   a. dask can spread the load if some lists of files take longer to process
      than others
   b. if the pipeline dies after successfully processing 15 lists, when you 
      retry the pipeline, you'll only be processing 5 lists with those same 20 
      workers and many workers will be sitting idle.


The ``read_index_file`` method is provided as a convenience to read these 
index files, and returns a list of paths to be read by your method.

.. code-block:: python

    def read(self, input_file, read_columns=None):
        file_paths = self.read_index_file(
            input_file=input_file, upath_kwargs=self.upath_kwargs, **self.kwargs
        )

Pandas vs Pyarrow
~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``read`` method can either yield a ``pandas.DataFrame`` object or a ``pyarrow.Table``.
If yielding a pandas dataframe, note that further pipeline stages will convert 
into a ``pyarrow.Table`` anyway.

We provide alternative implementations of some common readers that will use pyarrow file
readers. This can be faster, as it avoids unneccessary conversion between table
formats, but you may encounter rougher edges. 

Iterate by Row Groups
~~~~~~~~~~~~~~~~~~~~~~~~~~

For Parquet files, both the ``ParquetPandasReader`` and ``ParquetPyarrowReader``
support reading by row groups, rather than by chunks of rows. This can be more
efficient if your Parquet files are already organized into row groups that are
suitable for your processing.

To enable this, set the ``iterate_by_row_groups`` parameter to ``True`` when
instantiating the reader. Then, when you call the ``read`` method, it will yield
dataframes or tables corresponding to each row group in the Parquet file.

.. code-block:: python

    args = ImportArguments(
        file_reader=ParquetPyarrowReader(iterate_by_row_groups=True),
        ...
    )

Built-in Classes and Functions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. currentmodule:: hats_import.catalog.file_readers

.. autosummary::
    :toctree: api/

    get_file_reader
    InputReader
    CsvReader
    CsvPyarrowReader
    IndexedCsvReader
    ParquetPandasReader
    ParquetPyarrowReader
    IndexedParquetReader 
    AstropyEcsvReader
    FitsReader

Defining your own class
~~~~~~~~~~~~~~~~~~~~~~~~~~

If you need to write your own input file reader, it should subclass ``InputReader``
and must provide a ``read`` method.

.. important::

   On the first pass through the data, we just need to read the RA and Dec columns, and so
   the ``read_columns`` value will be a list with just these two columns. You can use this 
   information to read less data, if possible given the file format.

You can either yield a ``pandas.DataFrame`` object or a ``pyarrow.Table``. If yielding
a pandas dataframe, note that further pipeline stages will convert into a ``pyarrow.Table`` 
anyway.

You must yield the result, and should utilize chunking to avoid memory issues.

Below is an ``InputReader`` implementation for the fictional Starr file format.
This showcases that the ``read`` method does all of the format-specific logic.

Here, we are even using some fictional shortcut to get the radec of rows in the table
when the ``read_columns`` are provided. This can potentially speed up early pipeline
execution by avoiding more expensive reads when they're not needed.

.. code-block:: python

    class StarrReader(InputReader):
        """Class for fictional Starr file format."""
        def __init__(self, chunksize=500_000, **kwargs):
            self.chunksize = chunksize
            self.kwargs = kwargs

        def read(self, input_file, read_columns=None):
            if read_columns:
               ## shortcut - just need positions
               starr_manifest = starr_io.read_manifest(input_file, **self.kwargs)
               ra_array, dec_array = starr_manifest.read_positions()
               yield pd.Dataframe({"ra" : ra_array, "dec" : dec_array})
            else:
               ## get all of the fields
               starr_file = starr_io.read_table(input_file, **self.kwargs)
               for smaller_table in starr_file.to_batches(max_chunksize=self.chunksize):
                  smaller_table = join_to_starr_data(smaller_table)
                  yield smaller_table.to_pandas()

And here we show how the custom reader would be instantiated and passed along
to the catalog import pipeline.

.. code-block:: python

    args = ImportArguments(
        ...
        ## Locates files like "/directory/to/files/**starr"
        input_path="/directory/to/files/",
        ## NB - you need the parens here!
        file_reader=StarrReader(),
    )

While this can provide self-service for importing new file formats, we're here
for you if this is all too much. :doc:`/guide/contact`.
