Unequal schema problems#

There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.

For example, at the reduce stage, we’re combining several intermediate parquet files for a single spatial tile into the final parquet file. It’s possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.

e.g.

File1#

int_field

string_field

float_field

5

3.4

8

3.8

which will have a schema like:

optional int64 field_id=-1 int_field;
optional int32 field_id=-1 string_field **(Null)**;
optional double field_id=-1 float_field;

File2#

int_field

string_field

float_field

6

hello

4.1

7

3.9

will have a schema like:

optional int64 field_id=-1 int_field;
optional binary field_id=-1 string_field (String);
optional double field_id=-1 float_field;

When we try to merge these files together, we see an error like the following:

Key:       4_2666
Function:  reduce_pixel_shards
args:      ()
kwargs:    {...}
Exception: "ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')"

Error Demonstration#

Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we’re trying to combine partial parquet files into a single file with common metadata.

[1]:
import tempfile
import os
from dask.distributed import Client

from hipscat_import.pipeline import pipeline_with_client
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import get_file_reader

mixed_schema_csv_dir = "../../tests/hipscat_import/data/mixed_schema"
tmp_path = tempfile.TemporaryDirectory()

args = ImportArguments(
    output_artifact_name="mixed_csv_bad",
    input_file_list=[
        os.path.join(mixed_schema_csv_dir, "input_01.csv"),
        os.path.join(mixed_schema_csv_dir, "input_02.csv"),
    ],
    file_reader="csv",
    output_path=tmp_path.name,
    highest_healpix_order=1,
)

with Client(n_workers=1, threads_per_worker=1) as client:
    try:
        pipeline_with_client(args, client)
    except:
        pass  # we know it's going to fail!!
tmp_path.cleanup()
2024-05-09 16:28:58,413 - distributed.worker - WARNING - Compute Failed
Key:       reduce_pixel_shards-eef1802ff14f2cd284cbedf895aaba8c
Function:  reduce_pixel_shards
args:      ()
kwargs:    {'cache_shard_path': '/tmp/tmpmvbp_ryr/mixed_csv_bad/intermediate', 'resume_path': '/tmp/tmpmvbp_ryr/mixed_csv_bad/intermediate', 'reducing_key': '0_11', 'destination_pixel_order': 0, 'destination_pixel_number': 11, 'destination_pixel_size': 8, 'output_path': '/tmp/tmpmvbp_ryr/mixed_csv_bad', 'ra_column': 'ra', 'dec_column': 'dec', 'sort_columns': None, 'add_hipscat_index': True, 'use_schema_file': None, 'use_hipscat_index': False, 'storage_options': None}
Exception: 'ArrowInvalid("Failed to parse string: \'comet\' as a scalar of type double")'

reducing task reduce_pixel_shards-eef1802ff14f2cd284cbedf895aaba8c failed with message:
  ArrowInvalid: Failed to parse string: 'comet' as a scalar of type double
  Traceback (most recent call last):
    File "/home/docs/checkouts/readthedocs.org/user_builds/hipscat-import/envs/latest/lib/python3.10/site-packages/distributed/worker.py", line 2994, in apply_function_simple
    File "/home/docs/checkouts/readthedocs.org/user_builds/hipscat-import/envs/latest/lib/python3.10/site-packages/hipscat_import/catalog/map_reduce.py", line 250, in reduce_pixel_shards
    File "/home/docs/checkouts/readthedocs.org/user_builds/hipscat-import/envs/latest/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1811, in read_table
    File "/home/docs/checkouts/readthedocs.org/user_builds/hipscat-import/envs/latest/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1454, in read
    File "pyarrow/_dataset.pyx", line 562, in pyarrow._dataset.Dataset.to_table
    File "pyarrow/_dataset.pyx", line 3804, in pyarrow._dataset.Scanner.to_table
    File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
    File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status

We can overcome may of these issues by using a parquet schema file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).

Let’s take a look inside the schema structure and see the field types it expects to see:

[2]:
import pyarrow.parquet as pq

mixed_schema_csv_parquet = "../../tests/hipscat_import/data/mixed_schema/schema.parquet"

parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)
print(parquet_file.schema)
<pyarrow._parquet.ParquetSchema object at 0x7f74cc0cdd80>
required group field_id=-1 schema {
  optional int64 field_id=-1 id;
  optional double field_id=-1 ra;
  optional double field_id=-1 dec;
  optional int64 field_id=-1 ra_error;
  optional int64 field_id=-1 dec_error;
  optional binary field_id=-1 comment (String);
  optional binary field_id=-1 code (String);
  optional int64 field_id=-1 __index_level_0__;
}

We already have a parquet metadata file for this data set, but we’ll show you how to create one of your own later in this notebook.

[3]:
tmp_path = tempfile.TemporaryDirectory()
args = ImportArguments(
    output_artifact_name="mixed_csv_good",
    input_file_list=[
        os.path.join(mixed_schema_csv_dir, "input_01.csv"),
        os.path.join(mixed_schema_csv_dir, "input_02.csv"),
    ],
    output_path=tmp_path.name,
    highest_healpix_order=1,
    file_reader=get_file_reader("csv", schema_file=mixed_schema_csv_parquet),
    use_schema_file=mixed_schema_csv_parquet,
)
with Client(n_workers=1, threads_per_worker=1) as client:
    pipeline_with_client(args, client)

Making a new parquet schema file#

There are a few different strategies we can use to create a schema file:

  • using some string representations of pandas datatypes

  • using an explicit list of pyarrow data types

  • and many more!

We’ll stick to these two, since they exercise the most common code paths through schema generation.

Using pandas type strings#

Something like the tic_types.csv file contains a list of the columns that the TIC data will contain, in a table like:

name,type
ID,Int64
version,str
HIP,Int32
TYC,str
etc...

Such files are a common way to send type information when the data files have no header.

In this method, we will use pandas’ type parsing to convert these strings into understood data types, and create the relevant parquet metadata.

[4]:
import pandas as pd

## Fetch the name/type information from a file.
type_list_frame = pd.read_csv("../static/tic_types.csv")

## For each row, add to a dictionary with name and a pandas series with the parsed data type.
## "str" is not understood as "string", so add a special case.
type_map = {
    row["name"]: pd.Series(dtype=("string" if row["type"] == "str" else row["type"]))
    for _, row in type_list_frame.iterrows()
}
dtype_frame = pd.DataFrame(type_map)

## Now write our empty data frame to a parquet file.
schema_file = os.path.join(tmp_path.name, "schema_from_csv_list.parquet")
dtype_frame.to_parquet(schema_file)

Let’s look at the parquet file’s metadata and see if it matches what we’d expect.

You’ll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies.

[5]:
parquet_file = pq.ParquetFile(schema_file)
print(parquet_file.schema)
<pyarrow._parquet.ParquetSchema object at 0x7f747ad710c0>
required group field_id=-1 schema {
  optional int64 field_id=-1 ID;
  optional binary field_id=-1 version (String);
  optional int32 field_id=-1 HIP;
  optional binary field_id=-1 TYC (String);
  optional binary field_id=-1 UCAC (String);
  optional binary field_id=-1 TWOMASS (String);
  optional int64 field_id=-1 SDSS;
  optional binary field_id=-1 ALLWISE (String);
  optional binary field_id=-1 GAIA (String);
  optional binary field_id=-1 APASS (String);
  optional int32 field_id=-1 KIC;
  optional binary field_id=-1 objType (String);
  optional binary field_id=-1 typeSrc (String);
  optional double field_id=-1 ra;
  optional double field_id=-1 dec;
  optional binary field_id=-1 POSflag (String);
  optional double field_id=-1 pmRA;
  optional double field_id=-1 e_pmRA;
  optional double field_id=-1 pmDEC;
  optional double field_id=-1 e_pmDEC;
  optional binary field_id=-1 PMflag (String);
  optional double field_id=-1 plx;
  optional double field_id=-1 e_plx;
  optional binary field_id=-1 PARflag (String);
  optional double field_id=-1 gallong;
  optional double field_id=-1 gallat;
  optional double field_id=-1 eclong;
  optional double field_id=-1 eclat;
  optional double field_id=-1 Bmag;
  optional double field_id=-1 e_Bmag;
  optional double field_id=-1 Vmag;
  optional double field_id=-1 e_Vmag;
  optional double field_id=-1 umag;
  optional double field_id=-1 e_umag;
  optional double field_id=-1 gmag;
  optional double field_id=-1 e_gmag;
  optional double field_id=-1 rmag;
  optional double field_id=-1 e_rmag;
  optional double field_id=-1 imag;
  optional double field_id=-1 e_imag;
  optional double field_id=-1 zmag;
  optional double field_id=-1 e_zmag;
  optional double field_id=-1 Jmag;
  optional double field_id=-1 e_Jmag;
  optional double field_id=-1 Hmag;
  optional double field_id=-1 e_Hmag;
  optional double field_id=-1 Kmag;
  optional double field_id=-1 e_Kmag;
  optional binary field_id=-1 TWOMflag (String);
  optional double field_id=-1 prox;
  optional double field_id=-1 w1mag;
  optional double field_id=-1 e_w1mag;
  optional double field_id=-1 w2mag;
  optional double field_id=-1 e_w2mag;
  optional double field_id=-1 w3mag;
  optional double field_id=-1 e_w3mag;
  optional double field_id=-1 w4mag;
  optional double field_id=-1 e_w4mag;
  optional double field_id=-1 GAIAmag;
  optional double field_id=-1 e_GAIAmag;
  optional double field_id=-1 Tmag;
  optional double field_id=-1 e_Tmag;
  optional binary field_id=-1 TESSflag (String);
  optional binary field_id=-1 SPFlag (String);
  optional double field_id=-1 Teff;
  optional double field_id=-1 e_Teff;
  optional double field_id=-1 logg;
  optional double field_id=-1 e_logg;
  optional double field_id=-1 MH;
  optional double field_id=-1 e_MH;
  optional double field_id=-1 rad;
  optional double field_id=-1 e_rad;
  optional double field_id=-1 mass;
  optional double field_id=-1 e_mass;
  optional double field_id=-1 rho;
  optional double field_id=-1 e_rho;
  optional binary field_id=-1 lumclass (String);
  optional double field_id=-1 lum;
  optional double field_id=-1 e_lum;
  optional double field_id=-1 d;
  optional double field_id=-1 e_d;
  optional double field_id=-1 ebv;
  optional double field_id=-1 e_ebv;
  optional int32 field_id=-1 numcont;
  optional double field_id=-1 contratio;
  optional binary field_id=-1 disposition (String);
  optional int64 field_id=-1 duplicate_id;
  optional double field_id=-1 priority;
  optional double field_id=-1 eneg_EBV;
  optional double field_id=-1 epos_EBV;
  optional binary field_id=-1 EBVflag (String);
  optional double field_id=-1 eneg_Mass;
  optional double field_id=-1 epos_Mass;
  optional double field_id=-1 eneg_Rad;
  optional double field_id=-1 epos_Rad;
  optional double field_id=-1 eneg_rho;
  optional double field_id=-1 epos_rho;
  optional double field_id=-1 eneg_logg;
  optional double field_id=-1 epos_logg;
  optional double field_id=-1 eneg_lum;
  optional double field_id=-1 epos_lum;
  optional double field_id=-1 eneg_dist;
  optional double field_id=-1 epos_dist;
  optional binary field_id=-1 distflag (String);
  optional double field_id=-1 eneg_Teff;
  optional double field_id=-1 epos_Teff;
  optional binary field_id=-1 TeffFlag (String);
  optional double field_id=-1 gaiabp;
  optional double field_id=-1 e_gaiabp;
  optional double field_id=-1 gaiarp;
  optional double field_id=-1 e_gaiarp;
  optional int32 field_id=-1 gaiaqflag;
  optional binary field_id=-1 starchareFlag (String);
  optional binary field_id=-1 VmagFlag (String);
  optional binary field_id=-1 BmagFlag (String);
  optional binary field_id=-1 splists (String);
  optional double field_id=-1 e_RA;
  optional double field_id=-1 e_Dec;
  optional double field_id=-1 RA_orig;
  optional double field_id=-1 Dec_orig;
  optional double field_id=-1 e_RA_orig;
  optional double field_id=-1 e_Dec_orig;
  optional int32 field_id=-1 raddflag;
  optional int32 field_id=-1 wdflag;
  optional int64 field_id=-1 objID;
}

Explict list of pyarrow data types#

Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON’T want to use nullable types, but it requires some deeper knowledge of pyarrow data types.

[6]:
import pyarrow as pa

## List all of our columns as pyarrow fields.
schema_from_pyarrow = pa.schema(
    [
        pa.field("id", pa.int64()),
        pa.field("ra", pa.float64()),
        pa.field("dec", pa.float64()),
        pa.field("ra_error", pa.float64()),
        pa.field("dec_error", pa.float64()),
        pa.field("comment", pa.string()),
        pa.field("code", pa.string()),
    ]
)
schema_file = os.path.join(tmp_path.name, "schema_from_pyarrow.parquet")
pq.write_metadata(schema_from_pyarrow, schema_file)

Again, we’ll check that the generated parquet metadata is what we expect:

[7]:
parquet_file = pq.ParquetFile(schema_file)
print(parquet_file.schema)
<pyarrow._parquet.ParquetSchema object at 0x7f747c894b40>
required group field_id=-1 schema {
  optional int64 field_id=-1 id;
  optional double field_id=-1 ra;
  optional double field_id=-1 dec;
  optional double field_id=-1 ra_error;
  optional double field_id=-1 dec_error;
  optional binary field_id=-1 comment (String);
  optional binary field_id=-1 code (String);
}

Finally, let’s clean up.

[8]:
tmp_path.cleanup()