Understanding the Pipeline Stages in HATS Import#
This notebook walks through each stage of the HATS import pipeline using toy data to demonstrate what happens at each step:
Mapping - Count objects in high-resolution HEALPix pixels
Binning - Create alignment from source to destination pixels
Splitting - Distribute objects to their destination pixel shards
Reducing - Combine shards into final catalog pixels
Finishing - Write metadata and validate catalog
To read more about temporary files, how they’re used, and where to store them, check out our Temporary Files and Disk Usage notebook.
Setup: Create Toy Dataset#
[1]:
import tempfile
from pathlib import Path
import numpy as np
import pandas as pd
# Create toy dataset - split into 3 files to demonstrate reducing stage
# When we process multiple input files, objects going to the same destination
# pixel will create multiple shards that need to be combined during reducing.
toy_data_1 = pd.DataFrame({"id": [0, 1, 2], "ra": [10, 10, 20], "dec": [10, 60, 80]})
toy_data_2 = pd.DataFrame({"id": [3, 4], "ra": [-10, 100], "dec": [80, -80]})
toy_data_3 = pd.DataFrame({"id": [5, 6, 7], "ra": [200, 200, 15], "dec": [0, 60, 70]})
# Create a temporary directory for our demo
demo_dir = tempfile.TemporaryDirectory(prefix="hats_demo_")
demo_path = Path(demo_dir.name)
# Write to separate CSV files
input_csv_1 = demo_path / "toy_catalog_1.csv"
input_csv_2 = demo_path / "toy_catalog_2.csv"
input_csv_3 = demo_path / "toy_catalog_3.csv"
toy_data_1.to_csv(input_csv_1, index=False)
toy_data_2.to_csv(input_csv_2, index=False)
toy_data_3.to_csv(input_csv_3, index=False)
# Combine for display
toy_data = pd.concat([toy_data_1, toy_data_2, toy_data_3], ignore_index=True)
input_files = [input_csv_1, input_csv_2, input_csv_3]
print("Toy Dataset (split across 3 files):")
print(toy_data)
print(f"\nFile 1: {input_csv_1.name} ({len(toy_data_1)} rows)")
print(f"File 2: {input_csv_2.name} ({len(toy_data_2)} rows)")
print(f"File 3: {input_csv_3.name} ({len(toy_data_3)} rows)")
Toy Dataset (split across 3 files):
id ra dec
0 0 10 10
1 1 10 60
2 2 20 80
3 3 -10 80
4 4 100 -80
5 5 200 0
6 6 200 60
7 7 15 70
File 1: toy_catalog_1.csv (3 rows)
File 2: toy_catalog_2.csv (2 rows)
File 3: toy_catalog_3.csv (3 rows)
Helper Function#
[2]:
def show_directory_tree(base_path, stage_name, max_depth=4):
"""Display the directory structure after a pipeline stage.
Args:
base_path: Root directory to display
stage_name: Name of the stage (for display)
max_depth: Maximum depth to traverse
"""
print(f"\n{'='*60}")
print(f"Directory structure after {stage_name}:")
print(f"{'='*60}")
def print_tree(path, prefix="", depth=0):
if depth > max_depth:
return
if not path.exists():
return
contents = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))
for i, item in enumerate(contents):
is_last = i == len(contents) - 1
current_prefix = "└── " if is_last else "├── "
next_prefix = " " if is_last else "│ "
# Add file size for files
if item.is_file():
size = item.stat().st_size
if size < 1024:
size_str = f"{size} B"
elif size < 1024**2:
size_str = f"{size/1024:.1f} KB"
else:
size_str = f"{size/1024**2:.1f} MB"
print(f"{prefix}{current_prefix}{item.name} ({size_str})")
else:
# Count items in directory
try:
num_items = len(list(item.iterdir()))
print(f"{prefix}{current_prefix}{item.name}/ ({num_items} items)")
except PermissionError:
print(f"{prefix}{current_prefix}{item.name}/")
print_tree(item, prefix + next_prefix, depth + 1)
print(f"{base_path.name}/")
print_tree(base_path)
print()
[3]:
# Show initial directory structure
show_directory_tree(demo_path, "SETUP")
============================================================
Directory structure after SETUP:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Stage 1: Mapping#
In the mapping stage, we:
Read each input file
Map each object to a HEALPix pixel at
mapping_healpix_orderCreate a histogram counting objects per pixel
The mapping_healpix_order is the highest resolution we use for initial binning. For our 7 objects, we’ll use order 3 (768 pixels) to see meaningful groupings.
Since we have multiple input files, we’ll process each one and aggregate the results.
[4]:
import hats.pixel_math.healpix_shim as hp
from hats.pixel_math.sparse_histogram import HistogramAggregator, supplemental_count_histogram
# Mapping parameters
mapping_healpix_order = 3
ra_column = "ra"
dec_column = "dec"
print(f"=== MAPPING STAGE ===")
print(f"Mapping to HEALPix order {mapping_healpix_order}")
print(f"Total pixels at this order: {hp.order2npix(mapping_healpix_order)}\n")
# Aggregate histogram across all input files
row_count_histo = HistogramAggregator(mapping_healpix_order)
# Process each input file
for file_idx, input_file in enumerate(input_files):
print(f"Processing file {file_idx + 1}/{len(input_files)}: {input_file.name}")
# Read the data
data = pd.read_csv(input_file)
# Map each object to its HEALPix pixel
mapped_pixels = hp.radec2pix(
mapping_healpix_order,
data[ra_column].to_numpy(dtype=float),
data[dec_column].to_numpy(dtype=float),
)
print(" Object to Pixel Mapping:")
for idx, row in data.iterrows():
print(
f" ID {row['id']}: (RA={row['ra']:6.1f}, Dec={row['dec']:6.1f}) → Pixel {mapped_pixels[idx]}"
)
# Create histogram for this file
row_count_partial, _ = supplemental_count_histogram(
mapped_pixels, None, highest_order=mapping_healpix_order
)
row_count_histo.add(row_count_partial)
print()
# Get the combined histogram
raw_histogram = row_count_histo.full_histogram
non_empty_pixels = np.where(raw_histogram > 0)[0]
print(f"Combined Histogram Summary:")
print(f" Total pixels at order {mapping_healpix_order}: {len(raw_histogram)}")
print(f" Non-empty pixels: {len(non_empty_pixels)}")
print(f"\nNon-empty pixel counts:")
for pixel_num in non_empty_pixels:
print(f" Pixel {pixel_num}: {raw_histogram[pixel_num]} object(s)")
=== MAPPING STAGE ===
Mapping to HEALPix order 3
Total pixels at this order: 768
Processing file 1/3: toy_catalog_1.csv
Object to Pixel Mapping:
ID 0: (RA= 10.0, Dec= 10.0) → Pixel 305
ID 1: (RA= 10.0, Dec= 60.0) → Pixel 47
ID 2: (RA= 20.0, Dec= 80.0) → Pixel 62
Processing file 2/3: toy_catalog_2.csv
Object to Pixel Mapping:
ID 3: (RA= -10.0, Dec= 80.0) → Pixel 253
ID 4: (RA= 100.0, Dec= -80.0) → Pixel 578
Processing file 3/3: toy_catalog_3.csv
Object to Pixel Mapping:
ID 5: (RA= 200.0, Dec= 0.0) → Pixel 409
ID 6: (RA= 200.0, Dec= 60.0) → Pixel 184
ID 7: (RA= 15.0, Dec= 70.0) → Pixel 59
Combined Histogram Summary:
Total pixels at order 3: 768
Non-empty pixels: 8
Non-empty pixel counts:
Pixel 47: 1 object(s)
Pixel 59: 1 object(s)
Pixel 62: 1 object(s)
Pixel 184: 1 object(s)
Pixel 253: 1 object(s)
Pixel 305: 1 object(s)
Pixel 409: 1 object(s)
Pixel 578: 1 object(s)
[5]:
show_directory_tree(demo_path, "MAPPING")
============================================================
Directory structure after MAPPING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Stage 2: Binning (Alignment Generation)#
In the binning stage, we:
Use the histogram from mapping
Generate an “alignment” - a mapping from source pixels to destination pixels
Group high-resolution pixels into coarser pixels based on
pixel_threshold
The alignment uses adaptive partitioning, creating different-sized pixels based on object density. Areas with many objects get higher-order (smaller) pixels, while sparse areas get lower-order (larger) pixels.
The pixel_threshold controls the maximum objects per pixel (we’ll use 2).
[6]:
from hats import pixel_math
from hats.pixel_math.healpix_pixel import HealpixPixel
# Binning parameters
pixel_threshold = 3
highest_healpix_order = 3
lowest_healpix_order = 0
drop_empty_siblings = True
print(f"=== BINNING STAGE ===")
print(f"Pixel threshold: {pixel_threshold} objects/pixel")
print(f"Order range: {lowest_healpix_order} to {highest_healpix_order}")
print(f"Drop empty siblings: {drop_empty_siblings}\n")
# Generate alignment
alignment = pixel_math.generate_alignment(
raw_histogram,
highest_order=highest_healpix_order,
lowest_order=lowest_healpix_order,
threshold=pixel_threshold,
drop_empty_siblings=drop_empty_siblings,
)
# Convert alignment to structured format
alignment_array = np.array([x if x is not None else [-1, -1, 0] for x in alignment], dtype=np.int64)
print("Alignment Array (non-empty source pixels only):")
print("Format: [destination_order, destination_pixel, row_count]\n")
for i in non_empty_pixels:
entry = alignment_array[i]
print(f" Source pixel {i} → Order {entry[0]}, Pixel {entry[1]} ({entry[2]} objects)")
# Get unique destination pixels
pixel_list = np.unique(alignment_array, axis=0)
destination_pixel_map = {
HealpixPixel(int(order), int(pix)): int(row_count)
for (order, pix, row_count) in pixel_list
if int(row_count) > 0
}
print(f"\nDestination Pixels ({len(destination_pixel_map)} total):")
for hp_pixel, count in sorted(destination_pixel_map.items(), key=lambda x: (x[0].order, x[0].pixel)):
print(f" Order {hp_pixel.order}, Pixel {hp_pixel.pixel}: {count} objects")
# Show mapping from source to destination for our specific objects
print("\nObject Routing (Source → Destination):")
for input_file in input_files:
file_data = pd.read_csv(input_file)
file_mapped_pixels = hp.radec2pix(
mapping_healpix_order,
file_data[ra_column].to_numpy(dtype=float),
file_data[dec_column].to_numpy(dtype=float),
)
print(f" From {input_file.name}:")
for idx, row in file_data.iterrows():
source_pix = file_mapped_pixels[idx]
dest_info = alignment_array[source_pix]
print(f" ID {row['id']}: Source pixel {source_pix} → Order {dest_info[0]}, Pixel {dest_info[1]}")
=== BINNING STAGE ===
Pixel threshold: 3 objects/pixel
Order range: 0 to 3
Drop empty siblings: True
Alignment Array (non-empty source pixels only):
Format: [destination_order, destination_pixel, row_count]
Source pixel 47 → Order 0, Pixel 0 (3 objects)
Source pixel 59 → Order 0, Pixel 0 (3 objects)
Source pixel 62 → Order 0, Pixel 0 (3 objects)
Source pixel 184 → Order 3, Pixel 184 (1 objects)
Source pixel 253 → Order 3, Pixel 253 (1 objects)
Source pixel 305 → Order 3, Pixel 305 (1 objects)
Source pixel 409 → Order 3, Pixel 409 (1 objects)
Source pixel 578 → Order 3, Pixel 578 (1 objects)
Destination Pixels (6 total):
Order 0, Pixel 0: 3 objects
Order 3, Pixel 184: 1 objects
Order 3, Pixel 253: 1 objects
Order 3, Pixel 305: 1 objects
Order 3, Pixel 409: 1 objects
Order 3, Pixel 578: 1 objects
Object Routing (Source → Destination):
From toy_catalog_1.csv:
ID 0: Source pixel 305 → Order 3, Pixel 305
ID 1: Source pixel 47 → Order 0, Pixel 0
ID 2: Source pixel 62 → Order 0, Pixel 0
From toy_catalog_2.csv:
ID 3: Source pixel 253 → Order 3, Pixel 253
ID 4: Source pixel 578 → Order 3, Pixel 578
From toy_catalog_3.csv:
ID 5: Source pixel 409 → Order 3, Pixel 409
ID 6: Source pixel 184 → Order 3, Pixel 184
ID 7: Source pixel 59 → Order 0, Pixel 0
[7]:
show_directory_tree(demo_path, "BINNING")
============================================================
Directory structure after BINNING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Stage 3: Splitting#
In the splitting stage, we:
Read the input files again (the original CSVs - we need the full object data, not just counts)
Map each object to its source pixel (same as mapping stage)
Use the alignment to determine destination pixel
Write objects to intermediate “shard” files organized by destination pixel
This stage creates many small parquet files in subdirectories named by their destination pixel. Since we have 3 input files, objects going to the same destination pixel will create multiple shards (e.g., shard_split_0_0.parquet, shard_split_1_0.parquet, shard_split_2_0.parquet).
Why do we read the CSV files again from disk?
Mapping stage: Reads CSVs → counts objects per pixel → writes histogram
Binning stage: Reads histogram → generates alignment
Splitting stage: Reads CSVs again → uses alignment to route each object → writes shards
The splitting stage needs the full object data (with all the columns), not just row (or bytewise) counts, so it re-reads the original input files. For speed, the intermediate histogram is much smaller than the full data.
[8]:
import pyarrow as pa
import pyarrow.parquet as pq
import nested_pandas as npd
from hats.io import file_io
from hats_import.pipeline_resume_plan import get_pixel_cache_directory
# Create cache directory for shards
cache_shard_path = demo_path / "cache_shards"
cache_shard_path.mkdir(exist_ok=True)
print(f"=== SPLITTING STAGE ===")
print(f"Creating intermediate shard files...\n")
shard_files_created = []
# Process each input file
for file_idx, input_file in enumerate(input_files):
print(f"Processing file {file_idx + 1}/{len(input_files)}: {input_file.name}")
splitting_key = f"split_{file_idx}"
# Read data and map to pixels
data = pd.read_csv(input_file)
mapped_pixels = hp.radec2pix(
mapping_healpix_order, data[ra_column].to_numpy(dtype=float), data[dec_column].to_numpy(dtype=float)
)
# Apply alignment to get destination pixels
aligned_pixels = alignment_array[mapped_pixels]
print(" Object Distribution:")
for idx, row in data.iterrows():
dest_info = aligned_pixels[idx]
print(f" ID {row['id']}: → Order {dest_info[0]}, Pixel {dest_info[1]}")
# Find unique destination pixels in this chunk
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True, axis=0)
print(f" Writing {len(unique_pixels)} shard(s) for this file...")
# For each unique destination pixel, write a shard file
for unique_index, pixel_alignment_count in enumerate(unique_pixels):
order = pixel_alignment_count[0]
pixel = pixel_alignment_count[1]
# Create directory for this pixel
healpix_pixel = HealpixPixel(order, pixel)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)
pixel_dir.mkdir(parents=True, exist_ok=True)
# Filter data for this destination pixel
filtered_data = data.iloc[unique_inverse == unique_index]
# Write to parquet shard - note the splitting_key in the filename!
output_file = pixel_dir / f"shard_{splitting_key}_0.parquet"
table = pa.Table.from_pandas(filtered_data, preserve_index=False).replace_schema_metadata()
pq.write_table(table, output_file)
shard_files_created.append((healpix_pixel, output_file, len(filtered_data)))
print(
f" Order {order}, Pixel {pixel}: {len(filtered_data)} object(s) → {output_file.relative_to(demo_path)}"
)
print(f" Objects: {filtered_data['id'].tolist()}")
print()
print(f"Shard Summary:")
print(f" Total shards created: {len(shard_files_created)}")
print(f"\nGrouping shards by destination pixel:")
from collections import defaultdict
shards_by_pixel = defaultdict(list)
for hp_pixel, shard_file, count in shard_files_created:
shards_by_pixel[(hp_pixel.order, hp_pixel.pixel)].append((shard_file.name, count))
for (order, pixel), shards in sorted(shards_by_pixel.items()):
print(f" Order {order}, Pixel {pixel}: {len(shards)} shard(s)")
for shard_name, count in shards:
print(f" - {shard_name} ({count} rows)")
=== SPLITTING STAGE ===
Creating intermediate shard files...
Processing file 1/3: toy_catalog_1.csv
Object Distribution:
ID 0: → Order 3, Pixel 305
ID 1: → Order 0, Pixel 0
ID 2: → Order 0, Pixel 0
Writing 2 shard(s) for this file...
Order 0, Pixel 0: 2 object(s) → cache_shards/order_0/dir_0/pixel_0/shard_split_0_0.parquet
Objects: [1, 2]
Order 3, Pixel 305: 1 object(s) → cache_shards/order_3/dir_0/pixel_305/shard_split_0_0.parquet
Objects: [0]
Processing file 2/3: toy_catalog_2.csv
Object Distribution:
ID 3: → Order 3, Pixel 253
ID 4: → Order 3, Pixel 578
Writing 2 shard(s) for this file...
Order 3, Pixel 253: 1 object(s) → cache_shards/order_3/dir_0/pixel_253/shard_split_1_0.parquet
Objects: [3]
Order 3, Pixel 578: 1 object(s) → cache_shards/order_3/dir_0/pixel_578/shard_split_1_0.parquet
Objects: [4]
Processing file 3/3: toy_catalog_3.csv
Object Distribution:
ID 5: → Order 3, Pixel 409
ID 6: → Order 3, Pixel 184
ID 7: → Order 0, Pixel 0
Writing 3 shard(s) for this file...
Order 0, Pixel 0: 1 object(s) → cache_shards/order_0/dir_0/pixel_0/shard_split_2_0.parquet
Objects: [7]
Order 3, Pixel 184: 1 object(s) → cache_shards/order_3/dir_0/pixel_184/shard_split_2_0.parquet
Objects: [6]
Order 3, Pixel 409: 1 object(s) → cache_shards/order_3/dir_0/pixel_409/shard_split_2_0.parquet
Objects: [5]
Shard Summary:
Total shards created: 7
Grouping shards by destination pixel:
Order 0, Pixel 0: 2 shard(s)
- shard_split_0_0.parquet (2 rows)
- shard_split_2_0.parquet (1 rows)
Order 3, Pixel 184: 1 shard(s)
- shard_split_2_0.parquet (1 rows)
Order 3, Pixel 253: 1 shard(s)
- shard_split_1_0.parquet (1 rows)
Order 3, Pixel 305: 1 shard(s)
- shard_split_0_0.parquet (1 rows)
Order 3, Pixel 409: 1 shard(s)
- shard_split_2_0.parquet (1 rows)
Order 3, Pixel 578: 1 shard(s)
- shard_split_1_0.parquet (1 rows)
[9]:
show_directory_tree(demo_path, "SPLITTING")
============================================================
Directory structure after SPLITTING:
============================================================
hats_demo_tujdc9qf/
├── cache_shards/ (2 items)
│ ├── order_0/ (1 items)
│ │ └── dir_0/ (1 items)
│ │ └── pixel_0/ (2 items)
│ │ ├── shard_split_0_0.parquet (1.0 KB)
│ │ └── shard_split_2_0.parquet (1.0 KB)
│ └── order_3/ (1 items)
│ └── dir_0/ (5 items)
│ ├── pixel_184/ (1 items)
│ │ └── shard_split_2_0.parquet (1.0 KB)
│ ├── pixel_253/ (1 items)
│ │ └── shard_split_1_0.parquet (1.0 KB)
│ ├── pixel_305/ (1 items)
│ │ └── shard_split_0_0.parquet (1.0 KB)
│ ├── pixel_409/ (1 items)
│ │ └── shard_split_2_0.parquet (1.0 KB)
│ └── pixel_578/ (1 items)
│ └── shard_split_1_0.parquet (1.0 KB)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Stage 4: Reducing#
In the reducing stage, we:
For each destination pixel, read all its shard files (from multiple input files)
Combine them into a single table
Add the
_healpix_29spatial index columnSort by spatial index (and optionally other columns)
Write the final pixel parquet file
This is where the map-reduce magic happens! When multiple input files contribute objects to the same destination pixel, we’ll see multiple shards being combined.
The _healpix_29 column is a 64-bit spatial index computed at order 29 that enables efficient spatial queries.
[10]:
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN
from hats.io import paths
# Create output directory for final catalog
output_path = demo_path / "toy_catalog"
output_path.mkdir(exist_ok=True)
print(f"=== REDUCING STAGE ===")
print(f"Combining shards into final catalog pixels...\n")
reduced_files = []
for hp_pixel, expected_count in destination_pixel_map.items():
print(f"Processing Order {hp_pixel.order}, Pixel {hp_pixel.pixel}:")
# Get the shard directory
pixel_dir = get_pixel_cache_directory(cache_shard_path, hp_pixel)
# List all shard files for this pixel
shard_files = list(pixel_dir.glob("*.parquet"))
print(f" Found {len(shard_files)} shard file(s):")
for shard_file in sorted(shard_files):
shard_rows = pq.read_metadata(shard_file).num_rows
print(f" - {shard_file.name} ({shard_rows} rows)")
# Read all shards for this pixel
merged_table = pq.read_table(pixel_dir)
print(f" Combined {len(merged_table)} total rows")
print(f" Objects: {merged_table['id'].to_pylist()}")
# Verify row count
if len(merged_table) != expected_count:
raise ValueError(f"Expected {expected_count} rows, got {len(merged_table)}")
# Add _healpix_29 spatial index column
merged_table = merged_table.add_column(
0,
SPATIAL_INDEX_COLUMN,
[
pixel_math.compute_spatial_index(
merged_table[ra_column].to_numpy().astype(np.float64),
merged_table[dec_column].to_numpy().astype(np.float64),
)
],
)
print(f" Added {SPATIAL_INDEX_COLUMN} column")
# Sort by spatial index
ordering = [(SPATIAL_INDEX_COLUMN, "ascending")]
merged_table = merged_table.sort_by(ordering)
print(f" Sorted by {SPATIAL_INDEX_COLUMN}")
# Write final pixel file (note: passing HealpixPixel object directly)
destination_file = paths.pixel_catalog_file(output_path, hp_pixel)
destination_file.parent.mkdir(parents=True, exist_ok=True)
pq.write_table(
merged_table,
destination_file,
sorting_columns=pq.SortingColumn.from_ordering(merged_table.schema, ordering),
)
reduced_files.append((hp_pixel, destination_file, len(merged_table)))
print(f" Wrote {destination_file.relative_to(demo_path)}\n")
# Show a sample of the data with spatial index
df_sample = merged_table.to_pandas()
print(f" Sample data:")
print(df_sample.to_string(index=False))
print()
=== REDUCING STAGE ===
Combining shards into final catalog pixels...
Processing Order 0, Pixel 0:
Found 2 shard file(s):
- shard_split_0_0.parquet (2 rows)
- shard_split_2_0.parquet (1 rows)
Combined 3 total rows
Objects: [1, 2, 7]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=0/Dir=0/Npix=0.parquet
Sample data:
_healpix_29 id ra dec
212759614188637996 1 10 60
266502695753865174 7 15 70
282722782596018419 2 20 80
Processing Order 3, Pixel 184:
Found 1 shard file(s):
- shard_split_2_0.parquet (1 rows)
Combined 1 total rows
Objects: [6]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=184.parquet
Sample data:
_healpix_29 id ra dec
831529732826217957 6 200 60
Processing Order 3, Pixel 253:
Found 1 shard file(s):
- shard_split_1_0.parquet (1 rows)
Combined 1 total rows
Objects: [3]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=253.parquet
Sample data:
_healpix_29 id ra dec
1141572291578781236 3 -10 80
Processing Order 3, Pixel 305:
Found 1 shard file(s):
- shard_split_0_0.parquet (1 rows)
Combined 1 total rows
Objects: [0]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=305.parquet
Sample data:
_healpix_29 id ra dec
1375225032727417701 0 10 10
Processing Order 3, Pixel 409:
Found 1 shard file(s):
- shard_split_2_0.parquet (1 rows)
Combined 1 total rows
Objects: [5]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=409.parquet
Sample data:
_healpix_29 id ra dec
1843565829001140885 5 200 0
Processing Order 3, Pixel 578:
Found 1 shard file(s):
- shard_split_1_0.parquet (1 rows)
Combined 1 total rows
Objects: [4]
Added _healpix_29 column
Sorted by _healpix_29
Wrote toy_catalog/dataset/Norder=3/Dir=0/Npix=578.parquet
Sample data:
_healpix_29 id ra dec
2605422598393471435 4 100 -80
[11]:
# Clean up intermediate cache shards
# In the real pipeline, this happens automatically after each pixel is successfully reduced.
print("\n=== CLEANING UP INTERMEDIATE FILES ===")
print(f"Removing cache_shards directory: {cache_shard_path}")
file_io.remove_directory(cache_shard_path, ignore_errors=False)
print("Cache shards cleaned up successfully.")
=== CLEANING UP INTERMEDIATE FILES ===
Removing cache_shards directory: /tmp/hats_demo_tujdc9qf/cache_shards
Cache shards cleaned up successfully.
[12]:
show_directory_tree(demo_path, "REDUCING (after cleanup)")
============================================================
Directory structure after REDUCING (after cleanup):
============================================================
hats_demo_tujdc9qf/
├── toy_catalog/ (1 items)
│ └── dataset/ (2 items)
│ ├── Norder=0/ (1 items)
│ │ └── Dir=0/ (1 items)
│ │ └── Npix=0.parquet (1.4 KB)
│ └── Norder=3/ (1 items)
│ └── Dir=0/ (5 items)
│ ├── Npix=184.parquet (1.3 KB)
│ ├── Npix=253.parquet (1.3 KB)
│ ├── Npix=305.parquet (1.3 KB)
│ ├── Npix=409.parquet (1.3 KB)
│ └── Npix=578.parquet (1.3 KB)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Stage 5: Finishing#
In the finishing stage, we:
Write
partition_info.csv- lists all pixels and their row countsWrite
_metadataand_common_metadata- Parquet metadata filesWrite
point_map.fits- FITS skymap of object distributionWrite
properties- catalog metadata (schema, total rows, etc.)Validate the catalog
After this stage, we have a complete HATS catalog ready for use with LSDB!
[13]:
from hats.catalog import PartitionInfo, TableProperties
from hats.io.parquet_metadata import write_parquet_metadata
from hats.io import file_io as io
from hats.io.validation import is_valid_catalog
print(f"=== FINISHING STAGE ===")
print(f"Writing catalog metadata and validating...\n")
# 1. Write partition_info.csv
print("1. Writing partition_info.csv...")
partition_info = PartitionInfo.from_healpix(list(destination_pixel_map.keys()))
partition_info_file = paths.get_partition_info_pointer(output_path)
partition_info.write_to_file(partition_info_file)
print(f" → {partition_info_file.relative_to(demo_path)}")
print(f" Content:")
print(partition_info.as_dataframe().to_string(index=False))
print()
# 2. Write Parquet metadata
print("2. Writing Parquet metadata files...")
parquet_rows = write_parquet_metadata(output_path)
print(f" → _metadata")
print(f" → _common_metadata")
print(f" Validated {parquet_rows} total rows")
# Read schema
nested_schema = npd.read_parquet(paths.get_common_metadata_pointer(output_path))
column_names = list(nested_schema.columns) + nested_schema.get_subcolumns()
print(f" Columns: {column_names}")
print()
# 3. Write point_map.fits
print("3. Writing point_map.fits...")
io.write_fits_image(raw_histogram, paths.get_point_map_file_pointer(output_path))
print(f" → {paths.get_point_map_file_pointer(output_path).relative_to(demo_path)}")
print(f" (FITS skymap of {len(non_empty_pixels)} non-empty pixels at order {mapping_healpix_order})")
print()
# 4. Write catalog properties
print("4. Writing catalog properties...")
total_rows = sum(destination_pixel_map.values())
catalog_info = TableProperties(
catalog_name="toy_catalog",
catalog_type="object",
total_rows=total_rows,
ra_column=ra_column,
dec_column=dec_column,
)
catalog_info.to_properties_file(output_path)
# The properties file is simply 'properties' in the catalog base directory
properties_file = output_path / "properties"
print(f" → {properties_file.relative_to(demo_path)}")
print(f" Total rows: {total_rows}")
print(f" Highest order: {partition_info.get_highest_order()}")
print()
# 5. Validate catalog
print("5. Validating catalog...")
is_valid = is_valid_catalog(output_path)
print(f" ✓ Catalog is valid: {is_valid}")
print()
print("=== IMPORT COMPLETE ===")
print(f"\nCatalog location: {output_path}")
=== FINISHING STAGE ===
Writing catalog metadata and validating...
1. Writing partition_info.csv...
→ toy_catalog/partition_info.csv
Content:
Norder Npix
0 0
3 184
3 253
3 305
3 409
3 578
2. Writing Parquet metadata files...
→ _metadata
→ _common_metadata
Validated 8 total rows
Columns: ['_healpix_29', 'id', 'ra', 'dec']
3. Writing point_map.fits...
→ toy_catalog/point_map.fits
(FITS skymap of 8 non-empty pixels at order 3)
4. Writing catalog properties...
→ toy_catalog/properties
Total rows: 8
Highest order: 3
5. Validating catalog...
✓ Catalog is valid: True
=== IMPORT COMPLETE ===
Catalog location: /tmp/hats_demo_tujdc9qf/toy_catalog
[14]:
show_directory_tree(demo_path, "FINISHING")
============================================================
Directory structure after FINISHING:
============================================================
hats_demo_tujdc9qf/
├── toy_catalog/ (5 items)
│ ├── dataset/ (4 items)
│ │ ├── Norder=0/ (1 items)
│ │ │ └── Dir=0/ (1 items)
│ │ │ └── Npix=0.parquet (1.4 KB)
│ │ ├── Norder=3/ (1 items)
│ │ │ └── Dir=0/ (5 items)
│ │ │ ├── Npix=184.parquet (1.3 KB)
│ │ │ ├── Npix=253.parquet (1.3 KB)
│ │ │ ├── Npix=305.parquet (1.3 KB)
│ │ │ ├── Npix=409.parquet (1.3 KB)
│ │ │ └── Npix=578.parquet (1.3 KB)
│ │ ├── _common_metadata (524 B)
│ │ └── _metadata (3.9 KB)
│ ├── hats.properties (136 B)
│ ├── partition_info.csv (46 B)
│ ├── point_map.fits (8.4 KB)
│ └── properties (136 B)
├── toy_catalog_1.csv (34 B)
├── toy_catalog_2.csv (29 B)
└── toy_catalog_3.csv (35 B)
Exploring the Final Catalog#
Let’s examine the final HATS catalog structure and files:
[15]:
print("=== CATALOG FILES EXPLAINED ===")
print()
print("1. partition_info.csv")
print(" - Lists all HEALPix pixels in the catalog")
print(" - Shows the directory structure (Norder=/Npix=)")
print(" Content:")
partition_df = pd.read_csv(partition_info_file)
print(partition_df.to_string(index=False))
print()
print("2. properties")
print(" - Catalog metadata (name, type, row count, schema)")
print(" Content (excerpt):")
with open(properties_file, "r") as f:
for i, line in enumerate(f):
if i < 10:
print(f" {line.rstrip()}")
print(" ...")
print()
print("3. Pixel parquet files (Norder=/Npix=/Npix=.parquet)")
print(" - Actual catalog data, one file per HEALPix pixel")
print(" - Organized in directories by order and pixel")
print(" - Sorted by _healpix_29 spatial index")
print()
for hp_pixel, file_path, count in reduced_files:
print(f" {file_path.relative_to(output_path)} ({count} rows):")
df = pd.read_parquet(file_path)
print(df.to_string(index=False))
print()
print("4. _metadata & _common_metadata")
print(" - Parquet metadata for the entire catalog")
print(" - _common_metadata: schema shared by all files")
print(" - _metadata: references to all parquet files")
print()
print("5. point_map.fits")
print(" - FITS skymap showing object distribution")
print(f" - Resolution: HEALPix order {mapping_healpix_order}")
print(f" - Non-empty pixels: {len(non_empty_pixels)}")
=== CATALOG FILES EXPLAINED ===
1. partition_info.csv
- Lists all HEALPix pixels in the catalog
- Shows the directory structure (Norder=/Npix=)
Content:
Norder Npix
0 0
3 184
3 253
3 305
3 409
3 578
2. properties
- Catalog metadata (name, type, row count, schema)
Content (excerpt):
#HATS catalog
obs_collection=toy_catalog
dataproduct_type=object
hats_nrows=8
hats_col_ra=ra
hats_col_dec=dec
hats_npix_suffix=.parquet
...
3. Pixel parquet files (Norder=/Npix=/Npix=.parquet)
- Actual catalog data, one file per HEALPix pixel
- Organized in directories by order and pixel
- Sorted by _healpix_29 spatial index
dataset/Norder=0/Dir=0/Npix=0.parquet (3 rows):
_healpix_29 id ra dec
212759614188637996 1 10 60
266502695753865174 7 15 70
282722782596018419 2 20 80
dataset/Norder=3/Dir=0/Npix=184.parquet (1 rows):
_healpix_29 id ra dec
831529732826217957 6 200 60
dataset/Norder=3/Dir=0/Npix=253.parquet (1 rows):
_healpix_29 id ra dec
1141572291578781236 3 -10 80
dataset/Norder=3/Dir=0/Npix=305.parquet (1 rows):
_healpix_29 id ra dec
1375225032727417701 0 10 10
dataset/Norder=3/Dir=0/Npix=409.parquet (1 rows):
_healpix_29 id ra dec
1843565829001140885 5 200 0
dataset/Norder=3/Dir=0/Npix=578.parquet (1 rows):
_healpix_29 id ra dec
2605422598393471435 4 100 -80
4. _metadata & _common_metadata
- Parquet metadata for the entire catalog
- _common_metadata: schema shared by all files
- _metadata: references to all parquet files
5. point_map.fits
- FITS skymap showing object distribution
- Resolution: HEALPix order 3
- Non-empty pixels: 8
Loading with LSDB#
Now that we have a complete HATS catalog, we can load it with LSDB:
[16]:
import lsdb
print("=== LOADING WITH LSDB ===")
print()
# Load the catalog
catalog = lsdb.read_hats(output_path)
computed_df = catalog.compute(progress_bar=False)
print(f"Catalog: {catalog}")
print("")
print(f"Partitions: {catalog.get_healpix_pixels()}")
print("")
print("Data:")
computed_df
=== LOADING WITH LSDB ===
Catalog: Dask NestedFrame Structure:
id ra dec
npartitions=6
0 int64[pyarrow] int64[pyarrow] int64[pyarrow]
828662331436171264 ... ... ...
... ... ... ...
2603080584620146688 ... ... ...
2607584184247517184 ... ... ...
Dask Name: nestedframe, 3 expressions
Expr=MapPartitions(NestedFrame)
Partitions: [Order: 0, Pixel: 0, Order: 3, Pixel: 184, Order: 3, Pixel: 253, Order: 3, Pixel: 305, Order: 3, Pixel: 409, Order: 3, Pixel: 578]
Data:
[16]:
| id | ra | dec | |
|---|---|---|---|
| _healpix_29 | |||
| 212759614188637996 | 1 | 10 | 60 |
| 266502695753865174 | 7 | 15 | 70 |
| 282722782596018419 | 2 | 20 | 80 |
| 831529732826217957 | 6 | 200 | 60 |
| 1141572291578781236 | 3 | -10 | 80 |
| 1375225032727417701 | 0 | 10 | 10 |
| 1843565829001140885 | 5 | 200 | 0 |
| 2605422598393471435 | 4 | 100 | -80 |
8 rows × 3 columns
Summary#
We’ve walked through all five stages of the HATS import pipeline:
Mapping: Counted objects in high-resolution HEALPix pixels (order 3, 768 pixels) across 3 input files
Binning: Created alignment using adaptive partitioning (threshold=2 objects/pixel)
Splitting: Distributed objects to intermediate shard files by destination pixel - creating multiple shards per pixel from different input files
Reducing: Combined multiple shards for each pixel, added spatial index, sorted, wrote final pixel files
Finishing: Wrote metadata, validated catalog, cleaned up intermediate files
The result is a HATS catalog with:
Adaptive HEALPix partitioning (different pixel orders based on density)
Spatial indexing via
_healpix_29column for efficient queriesComplete metadata for LSDB compatibility
Key Concepts:#
mapping_healpix_order: Highest resolution for initial binning (we used 3)
pixel_threshold: Max objects per pixel; controls final partition size (we used 2)
lowest_healpix_order: Prevents creation of overly large pixels (we used 0)
Alignment: Mapping from source (high-res) to destination (adaptive) pixels
Adaptive partitioning: Dense regions get smaller pixels, sparse regions get larger pixels
Spatial index:
_healpix_29enables efficient spatial operationsMap-reduce pattern: Multiple input files → multiple shards per pixel → combined in reducing stage
Intermediate file cleanup: Cache shards are automatically deleted after successful reduction
[17]:
# Cleanup
demo_dir.cleanup()
print(f"\nCleaned up demo directory at: {demo_dir.name}")
Cleaned up demo directory at: /tmp/hats_demo_tujdc9qf