Understanding the Pipeline Stages in HATS Import#

This notebook walks through each stage of the HATS import pipeline using toy data to demonstrate what happens at each step:

  1. Mapping - Count objects in high-resolution HEALPix pixels

  2. Binning - Create alignment from source to destination pixels

  3. Splitting - Distribute objects to their destination pixel shards

  4. Reducing - Combine shards into final catalog pixels

  5. Finishing - Write metadata and validate catalog

To read more about temporary files, how they’re used, and where to store them, check out our Temporary Files and Disk Usage notebook.

Setup: Create Toy Dataset#

[1]:
import tempfile
from pathlib import Path

import numpy as np
import pandas as pd

# Create toy dataset - split into 3 files to demonstrate reducing stage
# When we process multiple input files, objects going to the same destination
# pixel will create multiple shards that need to be combined during reducing.
toy_data_1 = pd.DataFrame({"id": [0, 1, 2], "ra": [10, 10, 20], "dec": [10, 60, 80]})

toy_data_2 = pd.DataFrame({"id": [3, 4], "ra": [-10, 100], "dec": [80, -80]})

toy_data_3 = pd.DataFrame({"id": [5, 6, 7], "ra": [200, 200, 15], "dec": [0, 60, 70]})

# Create a temporary directory for our demo
demo_dir = tempfile.TemporaryDirectory(prefix="hats_demo_")
demo_path = Path(demo_dir.name)

# Write to separate CSV files
input_csv_1 = demo_path / "toy_catalog_1.csv"
input_csv_2 = demo_path / "toy_catalog_2.csv"
input_csv_3 = demo_path / "toy_catalog_3.csv"

toy_data_1.to_csv(input_csv_1, index=False)
toy_data_2.to_csv(input_csv_2, index=False)
toy_data_3.to_csv(input_csv_3, index=False)

# Combine for display
toy_data = pd.concat([toy_data_1, toy_data_2, toy_data_3], ignore_index=True)
input_files = [input_csv_1, input_csv_2, input_csv_3]

print("Toy Dataset (split across 3 files):")
print(toy_data)
print(f"\nFile 1: {input_csv_1.name} ({len(toy_data_1)} rows)")
print(f"File 2: {input_csv_2.name} ({len(toy_data_2)} rows)")
print(f"File 3: {input_csv_3.name} ({len(toy_data_3)} rows)")
Toy Dataset (split across 3 files):
   id   ra  dec
0   0   10   10
1   1   10   60
2   2   20   80
3   3  -10   80
4   4  100  -80
5   5  200    0
6   6  200   60
7   7   15   70

File 1: toy_catalog_1.csv (3 rows)
File 2: toy_catalog_2.csv (2 rows)
File 3: toy_catalog_3.csv (3 rows)

Helper Function#

[2]:
def show_directory_tree(base_path, stage_name, max_depth=4):
    """Display the directory structure after a pipeline stage.

    Args:
        base_path: Root directory to display
        stage_name: Name of the stage (for display)
        max_depth: Maximum depth to traverse
    """
    print(f"\n{'='*60}")
    print(f"Directory structure after {stage_name}:")
    print(f"{'='*60}")

    def print_tree(path, prefix="", depth=0):
        if depth > max_depth:
            return

        if not path.exists():
            return

        contents = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))

        for i, item in enumerate(contents):
            is_last = i == len(contents) - 1
            current_prefix = "└── " if is_last else "├── "
            next_prefix = "    " if is_last else "│   "

            # Add file size for files
            if item.is_file():
                size = item.stat().st_size
                if size < 1024:
                    size_str = f"{size} B"
                elif size < 1024**2:
                    size_str = f"{size/1024:.1f} KB"
                else:
                    size_str = f"{size/1024**2:.1f} MB"
                print(f"{prefix}{current_prefix}{item.name} ({size_str})")
            else:
                # Count items in directory
                try:
                    num_items = len(list(item.iterdir()))
                    print(f"{prefix}{current_prefix}{item.name}/ ({num_items} items)")
                except PermissionError:
                    print(f"{prefix}{current_prefix}{item.name}/")
                print_tree(item, prefix + next_prefix, depth + 1)

    print(f"{base_path.name}/")
    print_tree(base_path)
    print()
[3]:
# Show initial directory structure
show_directory_tree(demo_path, "SETUP")

============================================================
Directory structure after SETUP:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Stage 1: Mapping#

In the mapping stage, we:

  1. Read each input file

  2. Map each object to a HEALPix pixel at mapping_healpix_order

  3. Create a histogram counting objects per pixel

The mapping_healpix_order is the highest resolution we use for initial binning. For our 7 objects, we’ll use order 3 (768 pixels) to see meaningful groupings.

Since we have multiple input files, we’ll process each one and aggregate the results.

[4]:
import hats.pixel_math.healpix_shim as hp
from hats.pixel_math.sparse_histogram import HistogramAggregator, supplemental_count_histogram

# Mapping parameters
mapping_healpix_order = 3
ra_column = "ra"
dec_column = "dec"

print(f"=== MAPPING STAGE ===")
print(f"Mapping to HEALPix order {mapping_healpix_order}")
print(f"Total pixels at this order: {hp.order2npix(mapping_healpix_order)}\n")

# Aggregate histogram across all input files
row_count_histo = HistogramAggregator(mapping_healpix_order)

# Process each input file
for file_idx, input_file in enumerate(input_files):
    print(f"Processing file {file_idx + 1}/{len(input_files)}: {input_file.name}")

    # Read the data
    data = pd.read_csv(input_file)

    # Map each object to its HEALPix pixel
    mapped_pixels = hp.radec2pix(
        mapping_healpix_order,
        data[ra_column].to_numpy(dtype=float),
        data[dec_column].to_numpy(dtype=float),
    )

    print("  Object to Pixel Mapping:")
    for idx, row in data.iterrows():
        print(
            f"    ID {row['id']}: (RA={row['ra']:6.1f}, Dec={row['dec']:6.1f}) → Pixel {mapped_pixels[idx]}"
        )

    # Create histogram for this file
    row_count_partial, _ = supplemental_count_histogram(
        mapped_pixels, None, highest_order=mapping_healpix_order
    )
    row_count_histo.add(row_count_partial)
    print()

# Get the combined histogram
raw_histogram = row_count_histo.full_histogram
non_empty_pixels = np.where(raw_histogram > 0)[0]

print(f"Combined Histogram Summary:")
print(f"  Total pixels at order {mapping_healpix_order}: {len(raw_histogram)}")
print(f"  Non-empty pixels: {len(non_empty_pixels)}")
print(f"\nNon-empty pixel counts:")
for pixel_num in non_empty_pixels:
    print(f"  Pixel {pixel_num}: {raw_histogram[pixel_num]} object(s)")
=== MAPPING STAGE ===
Mapping to HEALPix order 3
Total pixels at this order: 768

Processing file 1/3: toy_catalog_1.csv
  Object to Pixel Mapping:
    ID 0: (RA=  10.0, Dec=  10.0) → Pixel 305
    ID 1: (RA=  10.0, Dec=  60.0) → Pixel 47
    ID 2: (RA=  20.0, Dec=  80.0) → Pixel 62

Processing file 2/3: toy_catalog_2.csv
  Object to Pixel Mapping:
    ID 3: (RA= -10.0, Dec=  80.0) → Pixel 253
    ID 4: (RA= 100.0, Dec= -80.0) → Pixel 578

Processing file 3/3: toy_catalog_3.csv
  Object to Pixel Mapping:
    ID 5: (RA= 200.0, Dec=   0.0) → Pixel 409
    ID 6: (RA= 200.0, Dec=  60.0) → Pixel 184
    ID 7: (RA=  15.0, Dec=  70.0) → Pixel 59

Combined Histogram Summary:
  Total pixels at order 3: 768
  Non-empty pixels: 8

Non-empty pixel counts:
  Pixel 47: 1 object(s)
  Pixel 59: 1 object(s)
  Pixel 62: 1 object(s)
  Pixel 184: 1 object(s)
  Pixel 253: 1 object(s)
  Pixel 305: 1 object(s)
  Pixel 409: 1 object(s)
  Pixel 578: 1 object(s)
[5]:
show_directory_tree(demo_path, "MAPPING")

============================================================
Directory structure after MAPPING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Stage 2: Binning (Alignment Generation)#

In the binning stage, we:

  1. Use the histogram from mapping

  2. Generate an “alignment” - a mapping from source pixels to destination pixels

  3. Group high-resolution pixels into coarser pixels based on pixel_threshold

The alignment uses adaptive partitioning, creating different-sized pixels based on object density. Areas with many objects get higher-order (smaller) pixels, while sparse areas get lower-order (larger) pixels.

The pixel_threshold controls the maximum objects per pixel (we’ll use 2).

[6]:
from hats import pixel_math
from hats.pixel_math.healpix_pixel import HealpixPixel

# Binning parameters
pixel_threshold = 3
highest_healpix_order = 3
lowest_healpix_order = 0
drop_empty_siblings = True

print(f"=== BINNING STAGE ===")
print(f"Pixel threshold: {pixel_threshold} objects/pixel")
print(f"Order range: {lowest_healpix_order} to {highest_healpix_order}")
print(f"Drop empty siblings: {drop_empty_siblings}\n")

# Generate alignment
alignment = pixel_math.generate_alignment(
    raw_histogram,
    highest_order=highest_healpix_order,
    lowest_order=lowest_healpix_order,
    threshold=pixel_threshold,
    drop_empty_siblings=drop_empty_siblings,
)

# Convert alignment to structured format
alignment_array = np.array([x if x is not None else [-1, -1, 0] for x in alignment], dtype=np.int64)

print("Alignment Array (non-empty source pixels only):")
print("Format: [destination_order, destination_pixel, row_count]\n")
for i in non_empty_pixels:
    entry = alignment_array[i]
    print(f"  Source pixel {i} → Order {entry[0]}, Pixel {entry[1]} ({entry[2]} objects)")

# Get unique destination pixels
pixel_list = np.unique(alignment_array, axis=0)
destination_pixel_map = {
    HealpixPixel(int(order), int(pix)): int(row_count)
    for (order, pix, row_count) in pixel_list
    if int(row_count) > 0
}

print(f"\nDestination Pixels ({len(destination_pixel_map)} total):")
for hp_pixel, count in sorted(destination_pixel_map.items(), key=lambda x: (x[0].order, x[0].pixel)):
    print(f"  Order {hp_pixel.order}, Pixel {hp_pixel.pixel}: {count} objects")

# Show mapping from source to destination for our specific objects
print("\nObject Routing (Source → Destination):")
for input_file in input_files:
    file_data = pd.read_csv(input_file)
    file_mapped_pixels = hp.radec2pix(
        mapping_healpix_order,
        file_data[ra_column].to_numpy(dtype=float),
        file_data[dec_column].to_numpy(dtype=float),
    )
    print(f"  From {input_file.name}:")
    for idx, row in file_data.iterrows():
        source_pix = file_mapped_pixels[idx]
        dest_info = alignment_array[source_pix]
        print(f"    ID {row['id']}: Source pixel {source_pix} → Order {dest_info[0]}, Pixel {dest_info[1]}")
=== BINNING STAGE ===
Pixel threshold: 3 objects/pixel
Order range: 0 to 3
Drop empty siblings: True

Alignment Array (non-empty source pixels only):
Format: [destination_order, destination_pixel, row_count]

  Source pixel 47 → Order 0, Pixel 0 (3 objects)
  Source pixel 59 → Order 0, Pixel 0 (3 objects)
  Source pixel 62 → Order 0, Pixel 0 (3 objects)
  Source pixel 184 → Order 3, Pixel 184 (1 objects)
  Source pixel 253 → Order 3, Pixel 253 (1 objects)
  Source pixel 305 → Order 3, Pixel 305 (1 objects)
  Source pixel 409 → Order 3, Pixel 409 (1 objects)
  Source pixel 578 → Order 3, Pixel 578 (1 objects)

Destination Pixels (6 total):
  Order 0, Pixel 0: 3 objects
  Order 3, Pixel 184: 1 objects
  Order 3, Pixel 253: 1 objects
  Order 3, Pixel 305: 1 objects
  Order 3, Pixel 409: 1 objects
  Order 3, Pixel 578: 1 objects

Object Routing (Source → Destination):
  From toy_catalog_1.csv:
    ID 0: Source pixel 305 → Order 3, Pixel 305
    ID 1: Source pixel 47 → Order 0, Pixel 0
    ID 2: Source pixel 62 → Order 0, Pixel 0
  From toy_catalog_2.csv:
    ID 3: Source pixel 253 → Order 3, Pixel 253
    ID 4: Source pixel 578 → Order 3, Pixel 578
  From toy_catalog_3.csv:
    ID 5: Source pixel 409 → Order 3, Pixel 409
    ID 6: Source pixel 184 → Order 3, Pixel 184
    ID 7: Source pixel 59 → Order 0, Pixel 0
[7]:
show_directory_tree(demo_path, "BINNING")

============================================================
Directory structure after BINNING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Stage 3: Splitting#

In the splitting stage, we:

  1. Read the input files again (the original CSVs - we need the full object data, not just counts)

  2. Map each object to its source pixel (same as mapping stage)

  3. Use the alignment to determine destination pixel

  4. Write objects to intermediate “shard” files organized by destination pixel

This stage creates many small parquet files in subdirectories named by their destination pixel. Since we have 3 input files, objects going to the same destination pixel will create multiple shards (e.g., shard_split_0_0.parquet, shard_split_1_0.parquet, shard_split_2_0.parquet).

Why do we read the CSV files again from disk?

  1. Mapping stage: Reads CSVs → counts objects per pixel → writes histogram

  2. Binning stage: Reads histogram → generates alignment

  3. Splitting stage: Reads CSVs again → uses alignment to route each object → writes shards

The splitting stage needs the full object data (with all the columns), not just row (or bytewise) counts, so it re-reads the original input files. For speed, the intermediate histogram is much smaller than the full data.

[8]:
import pyarrow as pa
import pyarrow.parquet as pq
import nested_pandas as npd
from hats.io import file_io
from hats_import.pipeline_resume_plan import get_pixel_cache_directory

# Create cache directory for shards
cache_shard_path = demo_path / "cache_shards"
cache_shard_path.mkdir(exist_ok=True)

print(f"=== SPLITTING STAGE ===")
print(f"Creating intermediate shard files...\n")

shard_files_created = []

# Process each input file
for file_idx, input_file in enumerate(input_files):
    print(f"Processing file {file_idx + 1}/{len(input_files)}: {input_file.name}")
    splitting_key = f"split_{file_idx}"

    # Read data and map to pixels
    data = pd.read_csv(input_file)
    mapped_pixels = hp.radec2pix(
        mapping_healpix_order, data[ra_column].to_numpy(dtype=float), data[dec_column].to_numpy(dtype=float)
    )

    # Apply alignment to get destination pixels
    aligned_pixels = alignment_array[mapped_pixels]

    print("  Object Distribution:")
    for idx, row in data.iterrows():
        dest_info = aligned_pixels[idx]
        print(f"    ID {row['id']}: → Order {dest_info[0]}, Pixel {dest_info[1]}")

    # Find unique destination pixels in this chunk
    unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True, axis=0)

    print(f"  Writing {len(unique_pixels)} shard(s) for this file...")

    # For each unique destination pixel, write a shard file
    for unique_index, pixel_alignment_count in enumerate(unique_pixels):
        order = pixel_alignment_count[0]
        pixel = pixel_alignment_count[1]

        # Create directory for this pixel
        healpix_pixel = HealpixPixel(order, pixel)
        pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
        pixel_dir.mkdir(parents=True, exist_ok=True)

        # Filter data for this destination pixel
        filtered_data = data.iloc[unique_inverse == unique_index]

        # Write to parquet shard - note the splitting_key in the filename!
        output_file = pixel_dir / f"shard_{splitting_key}_0.parquet"
        table = pa.Table.from_pandas(filtered_data, preserve_index=False).replace_schema_metadata()
        pq.write_table(table, output_file)

        shard_files_created.append((healpix_pixel, output_file, len(filtered_data)))

        print(
            f"    Order {order}, Pixel {pixel}: {len(filtered_data)} object(s) → {output_file.relative_to(demo_path)}"
        )
        print(f"      Objects: {filtered_data['id'].tolist()}")
    print()

print(f"Shard Summary:")
print(f"  Total shards created: {len(shard_files_created)}")
print(f"\nGrouping shards by destination pixel:")
from collections import defaultdict

shards_by_pixel = defaultdict(list)
for hp_pixel, shard_file, count in shard_files_created:
    shards_by_pixel[(hp_pixel.order, hp_pixel.pixel)].append((shard_file.name, count))

for (order, pixel), shards in sorted(shards_by_pixel.items()):
    print(f"  Order {order}, Pixel {pixel}: {len(shards)} shard(s)")
    for shard_name, count in shards:
        print(f"    - {shard_name} ({count} rows)")
=== SPLITTING STAGE ===
Creating intermediate shard files...

Processing file 1/3: toy_catalog_1.csv
  Object Distribution:
    ID 0: → Order 3, Pixel 305
    ID 1: → Order 0, Pixel 0
    ID 2: → Order 0, Pixel 0
  Writing 2 shard(s) for this file...
    Order 0, Pixel 0: 2 object(s) → cache_shards/order_0/dir_0/pixel_0/shard_split_0_0.parquet
      Objects: [1, 2]
    Order 3, Pixel 305: 1 object(s) → cache_shards/order_3/dir_0/pixel_305/shard_split_0_0.parquet
      Objects: [0]

Processing file 2/3: toy_catalog_2.csv
  Object Distribution:
    ID 3: → Order 3, Pixel 253
    ID 4: → Order 3, Pixel 578
  Writing 2 shard(s) for this file...
    Order 3, Pixel 253: 1 object(s) → cache_shards/order_3/dir_0/pixel_253/shard_split_1_0.parquet
      Objects: [3]
    Order 3, Pixel 578: 1 object(s) → cache_shards/order_3/dir_0/pixel_578/shard_split_1_0.parquet
      Objects: [4]

Processing file 3/3: toy_catalog_3.csv
  Object Distribution:
    ID 5: → Order 3, Pixel 409
    ID 6: → Order 3, Pixel 184
    ID 7: → Order 0, Pixel 0
  Writing 3 shard(s) for this file...
    Order 0, Pixel 0: 1 object(s) → cache_shards/order_0/dir_0/pixel_0/shard_split_2_0.parquet
      Objects: [7]
    Order 3, Pixel 184: 1 object(s) → cache_shards/order_3/dir_0/pixel_184/shard_split_2_0.parquet
      Objects: [6]
    Order 3, Pixel 409: 1 object(s) → cache_shards/order_3/dir_0/pixel_409/shard_split_2_0.parquet
      Objects: [5]

Shard Summary:
  Total shards created: 7

Grouping shards by destination pixel:
  Order 0, Pixel 0: 2 shard(s)
    - shard_split_0_0.parquet (2 rows)
    - shard_split_2_0.parquet (1 rows)
  Order 3, Pixel 184: 1 shard(s)
    - shard_split_2_0.parquet (1 rows)
  Order 3, Pixel 253: 1 shard(s)
    - shard_split_1_0.parquet (1 rows)
  Order 3, Pixel 305: 1 shard(s)
    - shard_split_0_0.parquet (1 rows)
  Order 3, Pixel 409: 1 shard(s)
    - shard_split_2_0.parquet (1 rows)
  Order 3, Pixel 578: 1 shard(s)
    - shard_split_1_0.parquet (1 rows)
[9]:
show_directory_tree(demo_path, "SPLITTING")

============================================================
Directory structure after SPLITTING:
============================================================
hats_demo_tujdc9qf/
├── cache_shards/ (2 items)
│   ├── order_0/ (1 items)
│   │   └── dir_0/ (1 items)
│   │       └── pixel_0/ (2 items)
│   │           ├── shard_split_0_0.parquet (1.0 KB)
│   │           └── shard_split_2_0.parquet (1.0 KB)
│   └── order_3/ (1 items)
│       └── dir_0/ (5 items)
│           ├── pixel_184/ (1 items)
│           │   └── shard_split_2_0.parquet (1.0 KB)
│           ├── pixel_253/ (1 items)
│           │   └── shard_split_1_0.parquet (1.0 KB)
│           ├── pixel_305/ (1 items)
│           │   └── shard_split_0_0.parquet (1.0 KB)
│           ├── pixel_409/ (1 items)
│           │   └── shard_split_2_0.parquet (1.0 KB)
│           └── pixel_578/ (1 items)
│               └── shard_split_1_0.parquet (1.0 KB)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Stage 4: Reducing#

In the reducing stage, we:

  1. For each destination pixel, read all its shard files (from multiple input files)

  2. Combine them into a single table

  3. Add the _healpix_29 spatial index column

  4. Sort by spatial index (and optionally other columns)

  5. Write the final pixel parquet file

This is where the map-reduce magic happens! When multiple input files contribute objects to the same destination pixel, we’ll see multiple shards being combined.

The _healpix_29 column is a 64-bit spatial index computed at order 29 that enables efficient spatial queries.

[10]:
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN
from hats.io import paths

# Create output directory for final catalog
output_path = demo_path / "toy_catalog"
output_path.mkdir(exist_ok=True)

print(f"=== REDUCING STAGE ===")
print(f"Combining shards into final catalog pixels...\n")

reduced_files = []

for hp_pixel, expected_count in destination_pixel_map.items():
    print(f"Processing Order {hp_pixel.order}, Pixel {hp_pixel.pixel}:")

    # Get the shard directory
    pixel_dir = get_pixel_cache_directory(cache_shard_path, hp_pixel)

    # List all shard files for this pixel
    shard_files = list(pixel_dir.glob("*.parquet"))
    print(f"  Found {len(shard_files)} shard file(s):")
    for shard_file in sorted(shard_files):
        shard_rows = pq.read_metadata(shard_file).num_rows
        print(f"    - {shard_file.name} ({shard_rows} rows)")

    # Read all shards for this pixel
    merged_table = pq.read_table(pixel_dir)

    print(f"  Combined {len(merged_table)} total rows")
    print(f"  Objects: {merged_table['id'].to_pylist()}")

    # Verify row count
    if len(merged_table) != expected_count:
        raise ValueError(f"Expected {expected_count} rows, got {len(merged_table)}")

    # Add _healpix_29 spatial index column
    merged_table = merged_table.add_column(
        0,
        SPATIAL_INDEX_COLUMN,
        [
            pixel_math.compute_spatial_index(
                merged_table[ra_column].to_numpy().astype(np.float64),
                merged_table[dec_column].to_numpy().astype(np.float64),
            )
        ],
    )

    print(f"  Added {SPATIAL_INDEX_COLUMN} column")

    # Sort by spatial index
    ordering = [(SPATIAL_INDEX_COLUMN, "ascending")]
    merged_table = merged_table.sort_by(ordering)

    print(f"  Sorted by {SPATIAL_INDEX_COLUMN}")

    # Write final pixel file (note: passing HealpixPixel object directly)
    destination_file = paths.pixel_catalog_file(output_path, hp_pixel)
    destination_file.parent.mkdir(parents=True, exist_ok=True)

    pq.write_table(
        merged_table,
        destination_file,
        sorting_columns=pq.SortingColumn.from_ordering(merged_table.schema, ordering),
    )

    reduced_files.append((hp_pixel, destination_file, len(merged_table)))

    print(f"  Wrote {destination_file.relative_to(demo_path)}\n")

    # Show a sample of the data with spatial index
    df_sample = merged_table.to_pandas()
    print(f"  Sample data:")
    print(df_sample.to_string(index=False))
    print()
=== REDUCING STAGE ===
Combining shards into final catalog pixels...

Processing Order 0, Pixel 0:
  Found 2 shard file(s):
    - shard_split_0_0.parquet (2 rows)
    - shard_split_2_0.parquet (1 rows)
  Combined 3 total rows
  Objects: [1, 2, 7]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=0/Dir=0/Npix=0.parquet

  Sample data:
       _healpix_29  id  ra  dec
212759614188637996   1  10   60
266502695753865174   7  15   70
282722782596018419   2  20   80

Processing Order 3, Pixel 184:
  Found 1 shard file(s):
    - shard_split_2_0.parquet (1 rows)
  Combined 1 total rows
  Objects: [6]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=184.parquet

  Sample data:
       _healpix_29  id  ra  dec
831529732826217957   6 200   60

Processing Order 3, Pixel 253:
  Found 1 shard file(s):
    - shard_split_1_0.parquet (1 rows)
  Combined 1 total rows
  Objects: [3]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=253.parquet

  Sample data:
        _healpix_29  id  ra  dec
1141572291578781236   3 -10   80

Processing Order 3, Pixel 305:
  Found 1 shard file(s):
    - shard_split_0_0.parquet (1 rows)
  Combined 1 total rows
  Objects: [0]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=305.parquet

  Sample data:
        _healpix_29  id  ra  dec
1375225032727417701   0  10   10

Processing Order 3, Pixel 409:
  Found 1 shard file(s):
    - shard_split_2_0.parquet (1 rows)
  Combined 1 total rows
  Objects: [5]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=409.parquet

  Sample data:
        _healpix_29  id  ra  dec
1843565829001140885   5 200    0

Processing Order 3, Pixel 578:
  Found 1 shard file(s):
    - shard_split_1_0.parquet (1 rows)
  Combined 1 total rows
  Objects: [4]
  Added _healpix_29 column
  Sorted by _healpix_29
  Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=578.parquet

  Sample data:
        _healpix_29  id  ra  dec
2605422598393471435   4 100  -80

[11]:
# Clean up intermediate cache shards
# In the real pipeline, this happens automatically after each pixel is successfully reduced.
print("\n=== CLEANING UP INTERMEDIATE FILES ===")
print(f"Removing cache_shards directory: {cache_shard_path}")
file_io.remove_directory(cache_shard_path, ignore_errors=False)
print("Cache shards cleaned up successfully.")

=== CLEANING UP INTERMEDIATE FILES ===
Removing cache_shards directory: /tmp/hats_demo_tujdc9qf/cache_shards
Cache shards cleaned up successfully.
[12]:
show_directory_tree(demo_path, "REDUCING (after cleanup)")

============================================================
Directory structure after REDUCING (after cleanup):
============================================================
hats_demo_tujdc9qf/
├── toy_catalog/ (1 items)
│   └── dataset/ (2 items)
│       ├── Norder=0/ (1 items)
│       │   └── Dir=0/ (1 items)
│       │       └── Npix=0.parquet (1.4 KB)
│       └── Norder=3/ (1 items)
│           └── Dir=0/ (5 items)
│               ├── Npix=184.parquet (1.3 KB)
│               ├── Npix=253.parquet (1.3 KB)
│               ├── Npix=305.parquet (1.3 KB)
│               ├── Npix=409.parquet (1.3 KB)
│               └── Npix=578.parquet (1.3 KB)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Stage 5: Finishing#

In the finishing stage, we:

  1. Write partition_info.csv - lists all pixels and their row counts

  2. Write _metadata and _common_metadata - Parquet metadata files

  3. Write point_map.fits - FITS skymap of object distribution

  4. Write properties - catalog metadata (schema, total rows, etc.)

  5. Validate the catalog

After this stage, we have a complete HATS catalog ready for use with LSDB!

[13]:
from hats.catalog import PartitionInfo, TableProperties
from hats.io.parquet_metadata import write_parquet_metadata
from hats.io import file_io as io
from hats.io.validation import is_valid_catalog

print(f"=== FINISHING STAGE ===")
print(f"Writing catalog metadata and validating...\n")

# 1. Write partition_info.csv
print("1. Writing partition_info.csv...")
partition_info = PartitionInfo.from_healpix(list(destination_pixel_map.keys()))
partition_info_file = paths.get_partition_info_pointer(output_path)
partition_info.write_to_file(partition_info_file)
print(f"   → {partition_info_file.relative_to(demo_path)}")
print(f"   Content:")
print(partition_info.as_dataframe().to_string(index=False))
print()

# 2. Write Parquet metadata
print("2. Writing Parquet metadata files...")
parquet_rows = write_parquet_metadata(output_path)
print(f"   → _metadata")
print(f"   → _common_metadata")
print(f"   Validated {parquet_rows} total rows")

# Read schema
nested_schema = npd.read_parquet(paths.get_common_metadata_pointer(output_path))
column_names = list(nested_schema.columns) + nested_schema.get_subcolumns()
print(f"   Columns: {column_names}")
print()

# 3. Write point_map.fits
print("3. Writing point_map.fits...")
io.write_fits_image(raw_histogram, paths.get_point_map_file_pointer(output_path))
print(f"   → {paths.get_point_map_file_pointer(output_path).relative_to(demo_path)}")
print(f"   (FITS skymap of {len(non_empty_pixels)} non-empty pixels at order {mapping_healpix_order})")
print()

# 4. Write catalog properties
print("4. Writing catalog properties...")
total_rows = sum(destination_pixel_map.values())
catalog_info = TableProperties(
    catalog_name="toy_catalog",
    catalog_type="object",
    total_rows=total_rows,
    ra_column=ra_column,
    dec_column=dec_column,
)
catalog_info.to_properties_file(output_path)
# The properties file is simply 'properties' in the catalog base directory
properties_file = output_path / "properties"
print(f"   → {properties_file.relative_to(demo_path)}")
print(f"   Total rows: {total_rows}")
print(f"   Highest order: {partition_info.get_highest_order()}")
print()

# 5. Validate catalog
print("5. Validating catalog...")
is_valid = is_valid_catalog(output_path)
print(f"   ✓ Catalog is valid: {is_valid}")
print()

print("=== IMPORT COMPLETE ===")
print(f"\nCatalog location: {output_path}")
=== FINISHING STAGE ===
Writing catalog metadata and validating...

1. Writing partition_info.csv...
   → toy_catalog/partition_info.csv
   Content:
 Norder  Npix
      0     0
      3   184
      3   253
      3   305
      3   409
      3   578

2. Writing Parquet metadata files...
   → _metadata
   → _common_metadata
   Validated 8 total rows
   Columns: ['_healpix_29', 'id', 'ra', 'dec']

3. Writing point_map.fits...
   → toy_catalog/point_map.fits
   (FITS skymap of 8 non-empty pixels at order 3)

4. Writing catalog properties...
   → toy_catalog/properties
   Total rows: 8
   Highest order: 3

5. Validating catalog...
   ✓ Catalog is valid: True

=== IMPORT COMPLETE ===

Catalog location: /tmp/hats_demo_tujdc9qf/toy_catalog
[14]:
show_directory_tree(demo_path, "FINISHING")

============================================================
Directory structure after FINISHING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog/ (5 items)
│   ├── dataset/ (4 items)
│   │   ├── Norder=0/ (1 items)
│   │   │   └── Dir=0/ (1 items)
│   │   │       └── Npix=0.parquet (1.4 KB)
│   │   ├── Norder=3/ (1 items)
│   │   │   └── Dir=0/ (5 items)
│   │   │       ├── Npix=184.parquet (1.3 KB)
│   │   │       ├── Npix=253.parquet (1.3 KB)
│   │   │       ├── Npix=305.parquet (1.3 KB)
│   │   │       ├── Npix=409.parquet (1.3 KB)
│   │   │       └── Npix=578.parquet (1.3 KB)
│   │   ├── _common_metadata (524 B)
│   │   └── _metadata (3.9 KB)
│   ├── hats.properties (136 B)
│   ├── partition_info.csv (46 B)
│   ├── point_map.fits (8.4 KB)
│   └── properties (136 B)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)

Exploring the Final Catalog#

Let’s examine the final HATS catalog structure and files:

[15]:
print("=== CATALOG FILES EXPLAINED ===")
print()

print("1. partition_info.csv")
print("   - Lists all HEALPix pixels in the catalog")
print("   - Shows the directory structure (Norder=/Npix=)")
print("   Content:")
partition_df = pd.read_csv(partition_info_file)
print(partition_df.to_string(index=False))
print()

print("2. properties")
print("   - Catalog metadata (name, type, row count, schema)")
print("   Content (excerpt):")
with open(properties_file, "r") as f:
    for i, line in enumerate(f):
        if i < 10:
            print(f"   {line.rstrip()}")
print("   ...")
print()

print("3. Pixel parquet files (Norder=/Npix=/Npix=.parquet)")
print("   - Actual catalog data, one file per HEALPix pixel")
print("   - Organized in directories by order and pixel")
print("   - Sorted by _healpix_29 spatial index")
print()
for hp_pixel, file_path, count in reduced_files:
    print(f"   {file_path.relative_to(output_path)} ({count} rows):")
    df = pd.read_parquet(file_path)
    print(df.to_string(index=False))
    print()

print("4. _metadata & _common_metadata")
print("   - Parquet metadata for the entire catalog")
print("   - _common_metadata: schema shared by all files")
print("   - _metadata: references to all parquet files")
print()

print("5. point_map.fits")
print("   - FITS skymap showing object distribution")
print(f"   - Resolution: HEALPix order {mapping_healpix_order}")
print(f"   - Non-empty pixels: {len(non_empty_pixels)}")
=== CATALOG FILES EXPLAINED ===

1. partition_info.csv
   - Lists all HEALPix pixels in the catalog
   - Shows the directory structure (Norder=/Npix=)
   Content:
 Norder  Npix
      0     0
      3   184
      3   253
      3   305
      3   409
      3   578

2. properties
   - Catalog metadata (name, type, row count, schema)
   Content (excerpt):
   #HATS catalog
   obs_collection=toy_catalog
   dataproduct_type=object
   hats_nrows=8
   hats_col_ra=ra
   hats_col_dec=dec
   hats_npix_suffix=.parquet
   ...

3. Pixel parquet files (Norder=/Npix=/Npix=.parquet)
   - Actual catalog data, one file per HEALPix pixel
   - Organized in directories by order and pixel
   - Sorted by _healpix_29 spatial index

   dataset/Norder=0/Dir=0/Npix=0.parquet (3 rows):
       _healpix_29  id  ra  dec
212759614188637996   1  10   60
266502695753865174   7  15   70
282722782596018419   2  20   80

   dataset/Norder=3/Dir=0/Npix=184.parquet (1 rows):
       _healpix_29  id  ra  dec
831529732826217957   6 200   60

   dataset/Norder=3/Dir=0/Npix=253.parquet (1 rows):
        _healpix_29  id  ra  dec
1141572291578781236   3 -10   80

   dataset/Norder=3/Dir=0/Npix=305.parquet (1 rows):
        _healpix_29  id  ra  dec
1375225032727417701   0  10   10

   dataset/Norder=3/Dir=0/Npix=409.parquet (1 rows):
        _healpix_29  id  ra  dec
1843565829001140885   5 200    0

   dataset/Norder=3/Dir=0/Npix=578.parquet (1 rows):
        _healpix_29  id  ra  dec
2605422598393471435   4 100  -80

4. _metadata & _common_metadata
   - Parquet metadata for the entire catalog
   - _common_metadata: schema shared by all files
   - _metadata: references to all parquet files

5. point_map.fits
   - FITS skymap showing object distribution
   - Resolution: HEALPix order 3
   - Non-empty pixels: 8

Loading with LSDB#

Now that we have a complete HATS catalog, we can load it with LSDB:

[16]:
import lsdb

print("=== LOADING WITH LSDB ===")
print()

# Load the catalog
catalog = lsdb.read_hats(output_path)
computed_df = catalog.compute(progress_bar=False)

print(f"Catalog: {catalog}")
print("")
print(f"Partitions: {catalog.get_healpix_pixels()}")
print("")
print("Data:")
computed_df
=== LOADING WITH LSDB ===

Catalog: Dask NestedFrame Structure:
                                 id              ra             dec
npartitions=6
0                    int64[pyarrow]  int64[pyarrow]  int64[pyarrow]
828662331436171264              ...             ...             ...
...                             ...             ...             ...
2603080584620146688             ...             ...             ...
2607584184247517184             ...             ...             ...
Dask Name: nestedframe, 3 expressions
Expr=MapPartitions(NestedFrame)

Partitions: [Order: 0, Pixel: 0, Order: 3, Pixel: 184, Order: 3, Pixel: 253, Order: 3, Pixel: 305, Order: 3, Pixel: 409, Order: 3, Pixel: 578]

Data:
[16]:
id ra dec
_healpix_29
212759614188637996 1 10 60
266502695753865174 7 15 70
282722782596018419 2 20 80
831529732826217957 6 200 60
1141572291578781236 3 -10 80
1375225032727417701 0 10 10
1843565829001140885 5 200 0
2605422598393471435 4 100 -80

8 rows × 3 columns

Summary#

We’ve walked through all five stages of the HATS import pipeline:

  1. Mapping: Counted objects in high-resolution HEALPix pixels (order 3, 768 pixels) across 3 input files

  2. Binning: Created alignment using adaptive partitioning (threshold=2 objects/pixel)

  3. Splitting: Distributed objects to intermediate shard files by destination pixel - creating multiple shards per pixel from different input files

  4. Reducing: Combined multiple shards for each pixel, added spatial index, sorted, wrote final pixel files

  5. Finishing: Wrote metadata, validated catalog, cleaned up intermediate files

The result is a HATS catalog with:

  • Adaptive HEALPix partitioning (different pixel orders based on density)

  • Spatial indexing via _healpix_29 column for efficient queries

  • Complete metadata for LSDB compatibility

Key Concepts:#

  • mapping_healpix_order: Highest resolution for initial binning (we used 3)

  • pixel_threshold: Max objects per pixel; controls final partition size (we used 2)

  • lowest_healpix_order: Prevents creation of overly large pixels (we used 0)

  • Alignment: Mapping from source (high-res) to destination (adaptive) pixels

  • Adaptive partitioning: Dense regions get smaller pixels, sparse regions get larger pixels

  • Spatial index: _healpix_29 enables efficient spatial operations

  • Map-reduce pattern: Multiple input files → multiple shards per pixel → combined in reducing stage

  • Intermediate file cleanup: Cache shards are automatically deleted after successful reduction

[17]:
# Cleanup

demo_dir.cleanup()
print(f"\nCleaned up demo directory at: {demo_dir.name}")

Cleaned up demo directory at: /tmp/hats_demo_tujdc9qf