Zubercal#
Getting the data#
See docs at CalTech.
Challenges with this data set#
The
__index_level_0__
pandas index should be ignored when reading.it is identical to the
objectid
column, and is just bloatit is non-unique, and that makes it tricky when splitting the file up
The files are written out by band, and the band is included in the file name (but not as a field in the resulting data product). We use a regular expression to parse out the band and insert it as a column in the dataframe.
The parquet files are all a fine size for input files, so we don’t mess with another chunk size.
There are over 500 thousand data files, so we pass each directory to the map job workers, and loop over all files in the directory ourselves.
You may want to remove the input file,
F0065/ztf_0065_1990_g.parquet
, as this file seems to have corrupted snappy compression. All other files are readable as downloaded from caltech.
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import ParquetReader
import pyarrow.parquet as pq
import pyarrow as pa
import re
import glob
class ZubercalParquetReader(ParquetReader):
def read(self, input_file):
"""Reader for the specifics of zubercal parquet files."""
columns = [
"mjd",
"mag",
"objdec",
"objra",
"magerr",
"objectid",
"info",
"flag",
"rcidin",
"fieldid",
]
## Find all the parquet files in the directory
files = glob.glob(f"{input_file}/**.parquet")
files.sort()
for file in files:
## Parse the band from the file name, and hold onto it for later.
match = re.match(r".*ztf_[\d]+_[\d]+_([gir]).parquet", str(file))
band = match.group(1)
parquet_file = pq.read_table(file, columns=columns, **self.kwargs)
for smaller_table in parquet_file.to_batches(max_chunksize=self.chunksize):
frame = pa.Table.from_batches([smaller_table]).to_pandas()
frame["band"] = band
yield frame
files = glob.glob("/path/to/downloads/F**/")
files.sort()
args = ImportArguments(
output_artifact_name="zubercal",
input_file_list=files,
## NB - you need the parens here!
file_reader=ZubercalParquetReader(),
file_reader="parquet",
catalog_type="source",
ra_column="objra",
dec_column="objdec",
sort_columns="objectid",
highest_healpix_order=10,
pixel_threshold=20_000_000,
output_path="/path/to/catalogs/",
)
runner.pipeline(args)
Our performance#
Running on dedicated hardware, with only 5 workers, this import took around two weeks.
Mapping stage: around 15 hours
Splitting stage: around 243 hours
Reducing stage: around 70 hours