01 Extract Enigma
Jupyter notebook from the Lab Fitness Predicts Field Ecology at Oak Ridge project.
NB 01: Extract ENIGMA CORAL Data¶
Extract geochemistry, community abundance, ASV taxonomy, and sample metadata from the ENIGMA CORAL database via Spark Connect. Filter to the 108 samples with both geochemistry and community data.
Requires Spark — get_spark_session() from berdl_notebook_utils.
Outputs: 4 TSV files in data/.
In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()
print(f"Spark version: {spark.version}")
DATA_DIR = Path('../data')
DATA_DIR.mkdir(exist_ok=True)
Spark version: 4.0.1
1. Extract Geochemistry¶
In [2]:
# Extract geochemistry: sample x molecule concentrations
geochem_raw = spark.sql("""
SELECT sdt_sample_name, molecule_from_list_sys_oterm_name, concentration_micromolar
FROM enigma_coral.ddt_brick0000010
""").toPandas()
print(f"Raw geochemistry: {len(geochem_raw):,} rows")
print(f" Samples: {geochem_raw['sdt_sample_name'].nunique()}")
print(f" Molecules: {geochem_raw['molecule_from_list_sys_oterm_name'].nunique()}")
# Pivot to sample x molecule matrix
geochem = geochem_raw.pivot_table(
index='sdt_sample_name',
columns='molecule_from_list_sys_oterm_name',
values='concentration_micromolar',
aggfunc='mean'
)
# Shorten column names
geochem.columns = [c.replace(' atom', '').replace(' ', '_') for c in geochem.columns]
print(f"\nGeochemistry matrix: {geochem.shape[0]} samples x {geochem.shape[1]} molecules")
# Focus on key contaminants
key_metals = ['uranium', 'chromium', 'nickel', 'zinc', 'iron', 'copper', 'cadmium', 'lead', 'arsenic']
available_metals = [m for m in key_metals if m in geochem.columns]
print(f"Key metals available: {available_metals}")
print()
geochem[available_metals].describe().round(3)
Raw geochemistry: 52,884 rows Samples: 117 Molecules: 48 Geochemistry matrix: 117 samples x 48 molecules Key metals available: ['uranium', 'chromium', 'nickel', 'zinc', 'iron', 'copper', 'cadmium', 'lead', 'arsenic']
Out[2]:
| uranium | chromium | nickel | zinc | iron | copper | cadmium | lead | arsenic | |
|---|---|---|---|---|---|---|---|---|---|
| count | 117.000 | 117.000 | 117.000 | 117.000 | 117.000 | 117.000 | 117.000 | 117.000 | 117.000 |
| mean | 10.442 | 0.225 | 4.756 | 0.730 | 13.682 | 0.368 | 0.107 | 0.015 | 0.003 |
| std | 43.556 | 0.886 | 16.328 | 1.658 | 39.705 | 1.423 | 0.391 | 0.042 | 0.006 |
| min | 0.000 | 0.000 | 0.001 | 0.076 | 0.017 | 0.001 | 0.000 | 0.000 | 0.000 |
| 25% | 0.002 | 0.003 | 0.011 | 0.241 | 0.282 | 0.007 | 0.001 | 0.001 | 0.000 |
| 50% | 0.005 | 0.009 | 0.038 | 0.426 | 1.392 | 0.027 | 0.002 | 0.004 | 0.001 |
| 75% | 0.075 | 0.035 | 0.333 | 0.658 | 8.070 | 0.081 | 0.003 | 0.011 | 0.003 |
| max | 377.126 | 6.176 | 104.567 | 16.938 | 245.499 | 9.238 | 3.039 | 0.311 | 0.061 |
2. Extract Community Abundance¶
In [3]:
# Extract ASV counts from brick476 (80M rows) -- filter in Spark first
# brick476 is the table with 587 environmental communities and 108 geochemistry overlap
# brick459 had no environmental community matches
# Step 1: Get the community names for the 108 overlap samples
# First get all geochemistry sample names
geochem_sample_list = "','".join(geochem.index.tolist())
overlap_communities = spark.sql(f"""
SELECT c.sdt_community_name, c.sdt_sample_name
FROM enigma_coral.sdt_community c
WHERE c.community_type_sys_oterm_name = 'Environmental Community'
AND c.sdt_sample_name IN ('{geochem_sample_list}')
""").toPandas()
print(f"Overlap communities: {len(overlap_communities)}")
print(f" Samples: {overlap_communities['sdt_sample_name'].nunique()}")
# Step 2: Register as temp view for Spark join
spark.createDataFrame(overlap_communities).createOrReplaceTempView("overlap_comms")
# Step 3: Extract ASV counts only for overlap communities (pushes filter to Spark)
asv_counts_raw = spark.sql("""
SELECT b.sdt_asv_name, b.sdt_community_name,
CAST(b.count_count_unit AS LONG) as read_count,
oc.sdt_sample_name
FROM enigma_coral.ddt_brick0000476 b
JOIN overlap_comms oc ON b.sdt_community_name = oc.sdt_community_name
WHERE CAST(b.count_count_unit AS LONG) > 0
""").toPandas()
print(f"\nASV counts (filtered to overlap): {len(asv_counts_raw):,} rows")
print(f" Communities: {asv_counts_raw['sdt_community_name'].nunique()}")
print(f" Samples: {asv_counts_raw['sdt_sample_name'].nunique()}")
print(f" ASVs: {asv_counts_raw['sdt_asv_name'].nunique()}")
# Aggregate replicates: sum counts per ASV per sample
# brick476 has a replicate column -- aggregate across replicates
asv_counts = asv_counts_raw.groupby(
['sdt_asv_name', 'sdt_community_name', 'sdt_sample_name']
)['read_count'].sum().reset_index()
print(f"\nAfter aggregating replicates: {len(asv_counts):,} rows")
Overlap communities: 220 Samples: 108
ASV counts (filtered to overlap): 135,504 rows Communities: 212 Samples: 108 ASVs: 47528 After aggregating replicates: 132,356 rows
3. Extract ASV Taxonomy¶
In [4]:
# Extract ASV -> genus mapping from brick454
asv_taxonomy = spark.sql("""
SELECT sdt_asv_name, taxonomic_level_sys_oterm_name as tax_level, sdt_taxon_name as taxon
FROM enigma_coral.ddt_brick0000454
WHERE taxonomic_level_sys_oterm_name = 'Genus'
""").toPandas()
print(f"ASV genus taxonomy: {len(asv_taxonomy):,} rows")
print(f" Unique ASVs: {asv_taxonomy['sdt_asv_name'].nunique()}")
print(f" Unique genera: {asv_taxonomy['taxon'].nunique()}")
# Also get phylum-level for context
asv_phylum = spark.sql("""
SELECT sdt_asv_name, sdt_taxon_name as phylum
FROM enigma_coral.ddt_brick0000454
WHERE taxonomic_level_sys_oterm_name = 'Phylum'
""").toPandas()
print(f"ASV phylum taxonomy: {len(asv_phylum):,} rows")
# Merge genus + phylum
asv_tax_full = asv_taxonomy[['sdt_asv_name', 'taxon']].rename(columns={'taxon': 'genus'})
asv_tax_full = asv_tax_full.merge(asv_phylum, on='sdt_asv_name', how='left')
print(f"\nCombined taxonomy: {len(asv_tax_full):,} ASVs with genus + phylum")
ASV genus taxonomy: 96,822 rows Unique ASVs: 96822 Unique genera: 1830
ASV phylum taxonomy: 107,888 rows Combined taxonomy: 96,822 ASVs with genus + phylum
4. Build Sample Metadata¶
In [5]:
# Sample metadata: sample -> location -> date
sample_meta = spark.sql("""
SELECT s.sdt_sample_name, s.sdt_location_name, s.date, s.depth_meter,
l.latitude_degree, l.longitude_degree, l.region
FROM enigma_coral.sdt_sample s
LEFT JOIN enigma_coral.sdt_location l
ON s.sdt_location_name = l.sdt_location_name
""").toPandas()
print(f"Sample metadata: {len(sample_meta):,} samples")
print(f" Locations: {sample_meta['sdt_location_name'].nunique()}")
Sample metadata: 4,346 samples Locations: 202
5. Find Overlapping Samples¶
In [6]:
# Identify the overlap samples (already filtered in Spark extraction above)
overlap_samples = set(asv_counts['sdt_sample_name'].unique())
print(f"Geochemistry samples: {len(geochem)}")
print(f"Community samples (from brick476 overlap): {len(overlap_samples)}")
# Filter geochemistry to overlap
geochem_filtered = geochem.loc[geochem.index.isin(overlap_samples)].copy()
asv_counts_filtered = asv_counts.copy() # already filtered
sample_meta_filtered = sample_meta[sample_meta['sdt_sample_name'].isin(overlap_samples)].copy()
print(f"\nFiltered geochemistry: {len(geochem_filtered)} samples")
print(f"Filtered ASV counts: {len(asv_counts_filtered):,} rows")
print(f"Filtered sample metadata: {len(sample_meta_filtered)} samples")
Geochemistry samples: 117 Community samples (from brick476 overlap): 108 Filtered geochemistry: 108 samples Filtered ASV counts: 132,356 rows Filtered sample metadata: 108 samples
6. Save Extracted Data¶
In [7]:
# Save geochemistry
geochem_filtered.to_csv(DATA_DIR / 'site_geochemistry.tsv', sep='\t')
print(f"Saved: site_geochemistry.tsv ({len(geochem_filtered)} samples x {geochem_filtered.shape[1]} molecules)")
# Save ASV counts (only non-zero, filtered to overlap samples)
asv_counts_filtered[['sdt_asv_name', 'sdt_community_name', 'sdt_sample_name', 'read_count']].to_csv(
DATA_DIR / 'asv_counts.tsv', sep='\t', index=False
)
print(f"Saved: asv_counts.tsv ({len(asv_counts_filtered):,} non-zero counts)")
# Save ASV taxonomy
asv_tax_full.to_csv(DATA_DIR / 'asv_taxonomy.tsv', sep='\t', index=False)
print(f"Saved: asv_taxonomy.tsv ({len(asv_tax_full):,} ASVs)")
# Save sample metadata
sample_meta_filtered.to_csv(DATA_DIR / 'sample_metadata.tsv', sep='\t', index=False)
print(f"Saved: sample_metadata.tsv ({len(sample_meta_filtered)} samples)")
Saved: site_geochemistry.tsv (108 samples x 48 molecules)
Saved: asv_counts.tsv (132,356 non-zero counts)
Saved: asv_taxonomy.tsv (96,822 ASVs) Saved: sample_metadata.tsv (108 samples)
In [8]:
print("=" * 60)
print("NB01 SUMMARY: ENIGMA Data Extraction")
print("=" * 60)
print(f"Overlap samples: {len(overlap_samples)}")
print(f"Geochemistry: {geochem_filtered.shape[1]} molecules measured")
print(f" Uranium range: {geochem_filtered.get('uranium', pd.Series([0])).min():.3f} - {geochem_filtered.get('uranium', pd.Series([0])).max():.1f} µM")
print(f"Community: {asv_counts_filtered['sdt_asv_name'].nunique():,} ASVs in {asv_counts_filtered['sdt_community_name'].nunique()} communities")
print(f"Taxonomy: {asv_tax_full['genus'].nunique():,} genera")
print(f"Locations: {sample_meta_filtered['sdt_location_name'].nunique()}")
print("=" * 60)
============================================================ NB01 SUMMARY: ENIGMA Data Extraction ============================================================ Overlap samples: 108 Geochemistry: 48 molecules measured Uranium range: 0.000 - 188.2 µM Community: 47,528 ASVs in 212 communities Taxonomy: 1,830 genera Locations: 97 ============================================================