03 Pathway Completeness
Jupyter notebook from the Community Metabolic Ecology via NMDC × Pangenome Integration project.
NB03: Community Pathway Completeness Matrix¶
Project: Community Metabolic Ecology via NMDC × Pangenome Integration
Requires: BERDL JupyterHub (Spark — get_spark_session() injected into kernel)
Purpose¶
Compute community-weighted GapMind pathway completeness scores per NMDC sample.
Formula:
completeness(sample s, pathway p) = Σᵢ [ (aᵢ / Σⱼ aⱼ_mapped) × frac_complete(taxon_i, p) ]
where aᵢ is centrifuge abundance for taxon i, and the denominator normalizes over
mapped taxa only, so weights sum to 1 per sample per pathway.
Inputs (from NB01/NB02)¶
data/taxon_bridge.tsv— centrifuge taxon → GTDB species clade_id + mapping tierdata/nmdc_sample_inventory.csv— sample × clf_file_id × met_file_iddata/bridge_quality.csv— per-file bridge coverage and QC flagnmdc_arkin.centrifuge_gold+nmdc_arkin.omics_files_table— species abundanceskbase_ke_pangenome.gapmind_pathways— 305M rows (filtered to bridged clades)kbase_ke_pangenome.gtdb_species_clade— clade_id → GTDB_species bridge
Outputs¶
data/species_pathway_completeness.csv— per-GTDB-clade × per-pathway completenessdata/community_pathway_matrix.csv— per-sample × per-pathway community-weighted completenessfigures/pathway_completeness_heatmap.png— mean completeness by pathway × ecosystem type
# On BERDL JupyterHub — get_spark_session() is injected into the kernel; no import needed
spark = get_spark_session()
spark
<pyspark.sql.connect.session.SparkSession at 0x7ccb646416a0>
import os
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname('__file__'), '..'))
DATA_DIR = os.path.join(PROJECT_DIR, 'data')
FIGURES_DIR = os.path.join(PROJECT_DIR, 'figures')
print(f'DATA_DIR: {DATA_DIR}')
print(f'FIGURES_DIR: {FIGURES_DIR}')
DATA_DIR: /home/cjneely/repos/BERIL-research-observatory/projects/nmdc_community_metabolic_ecology/data FIGURES_DIR: /home/cjneely/repos/BERIL-research-observatory/projects/nmdc_community_metabolic_ecology/figures
Part 1: Load NB02 Outputs¶
Load cached bridge and inventory files from NB02. These are loaded from disk (CSV/TSV),
so they are regular pandas DataFrames (no PyArrow ChunkedArray issue) and can be safely
passed to spark.createDataFrame() if needed.
# Load NB02 outputs from disk
bridge = pd.read_csv(os.path.join(DATA_DIR, 'taxon_bridge.tsv'), sep='\t')
inventory = pd.read_csv(os.path.join(DATA_DIR, 'nmdc_sample_inventory.csv'))
bridge_quality = pd.read_csv(os.path.join(DATA_DIR, 'bridge_quality.csv'))
print(f'Bridge rows: {len(bridge)} | mapping tiers:')
print(bridge['mapping_tier'].value_counts().to_string())
print(f'\nInventory rows: {len(inventory)}, unique samples: {inventory["sample_id"].nunique()}')
print(f'Bridge quality rows: {len(bridge_quality)}')
print(f'Samples passing QC (>=30%): {bridge_quality["passes_bridge_qc"].sum()}')
Bridge rows: 4894 | mapping tiers: mapping_tier unmapped 2022 species_exact 1375 genus_proxy_ambiguous 1352 genus_proxy_unique 145 Inventory rows: 646, unique samples: 221 Bridge quality rows: 220 Samples passing QC (>=30%): 220
# Determine the set of overlap samples and their classifier files
# Use samples that appear in the inventory AND pass bridge QC
qc_pass_files = set(bridge_quality[bridge_quality['passes_bridge_qc']]['file_id'].tolist())
# One clf_file per sample: take the QC-passing file (if multiple, take first)
inventory_qc = inventory[inventory['clf_file_id'].isin(qc_pass_files)].copy()
# Drop duplicate samples, keeping first clf_file_id
sample_file_map = (
inventory_qc[['sample_id', 'clf_file_id']]
.drop_duplicates(subset='sample_id')
.reset_index(drop=True)
)
overlap_sample_ids = sample_file_map['sample_id'].tolist()
print(f'Overlap samples passing bridge QC: {len(overlap_sample_ids)}')
print('Sample sample_ids:', overlap_sample_ids[:5])
# Get mapped clade IDs from bridge (native Python list — safe to use in SQL IN clauses)
mapped_bridge = bridge[bridge['mapping_tier'] != 'unmapped'].dropna(subset=['gtdb_species_clade_id'])
mapped_clade_ids = mapped_bridge['gtdb_species_clade_id'].unique().tolist()
print(f'\nUnique mapped GTDB clade IDs: {len(mapped_clade_ids)}')
Overlap samples passing bridge QC: 220 Sample sample_ids: ['nmdc:bsm-11-ahfq0n74', 'nmdc:bsm-11-v0hqhp22', 'nmdc:bsm-11-57ejpn32', 'nmdc:bsm-11-nx7zwq61', 'nmdc:bsm-11-h3e54f51'] Unique mapped GTDB clade IDs: 1837
Part 2: GapMind Species-Level Pathway Completeness¶
Run two-stage aggregation on gapmind_pathways (305M rows), filtered to the
~1,500 GTDB clades that appear in the taxon bridge.
Stay in Spark until the final ~(n_clades × n_pathways) aggregation.
Pitfall: GapMind has multiple rows per genome-pathway pair — always GROUP BY
and MAX score first (see docs/pitfalls.md [pangenome_pathway_geography]).
# gapmind_pathways.clade_name = gtdb_species_clade_id format
# (e.g., 's__Rhizobium_phaseoli--RS_GCF_001234567.1')
# taxon_bridge.gtdb_species_clade_id is already in this format.
# No GTDB_species lookup needed — use mapped_clade_ids directly as clade_name filter.
mapped_clade_names = mapped_clade_ids # These match clade_name in gapmind_pathways
print(f'Mapped clade names for GapMind filter: {len(mapped_clade_names)}')
print('Sample:', mapped_clade_names[:5])
print()
print('NOTE: gapmind_pathways.clade_name = gtdb_species_clade_id format')
print(' (includes genome accession suffix, e.g., s__X--RS_GCF_...)')
Mapped clade names for GapMind filter: 1837
Sample: ['s__Burkholderia_cenocepacia--RS_GCF_900446215.1', 's__Streptomyces_albus--RS_GCF_000725885.1', 's__Stutzerimonas_stutzeri--RS_GCF_000219605.1', 's__Ralstonia_solanacearum--RS_GCF_002251695.1', 's__Nocardia_cyriacigeorgica--RS_GCF_000308555.1']
NOTE: gapmind_pathways.clade_name = gtdb_species_clade_id format
(includes genome accession suffix, e.g., s__X--RS_GCF_...)
# Register mapped clade names as a Spark temp view for efficient JOIN.
# pd.DataFrame from Python list (not from Spark.toPandas()) — safe to use with createDataFrame().
clade_names_df = pd.DataFrame({'clade_name': mapped_clade_names})
spark.createDataFrame(clade_names_df).createOrReplaceTempView('mapped_clade_names_tmp')
print(f'Registered {len(clade_names_df)} clade names as Spark temp view mapped_clade_names_tmp')
Registered 1837 clade names as Spark temp view mapped_clade_names_tmp
# Step 2a: Verify GapMind schema and confirm clade_name format matches mapped_clade_names.
# Note: Use LIMIT queries only — avoid full 305M-row scans that trigger Spark Connect
# session reconnects and silently destroy temp views.
print('=== gapmind_pathways schema ===')
spark.sql('DESCRIBE kbase_ke_pangenome.gapmind_pathways').show(20, truncate=False)
# Sample actual clade_name values to confirm format
print('\nSample clade_name values from gapmind_pathways (confirm format):')
spark.sql("""
SELECT DISTINCT clade_name
FROM kbase_ke_pangenome.gapmind_pathways
LIMIT 5
""").show(truncate=False)
# Re-register temp view here (defensive: session may have reconnected during prior cells)
spark.createDataFrame(
pd.DataFrame({'clade_name': mapped_clade_names})
).createOrReplaceTempView('mapped_clade_names_tmp')
print(f'Temp view re-registered: {len(mapped_clade_names)} clade names')
# Verify the JOIN actually returns rows before committing to the full aggregation
print('\nVerify temp view JOIN (5-row sample — should return results):')
spark.sql("""
SELECT g.clade_name, g.pathway, g.score_category, g.metabolic_category
FROM kbase_ke_pangenome.gapmind_pathways g
JOIN mapped_clade_names_tmp m ON g.clade_name = m.clade_name
LIMIT 5
""").show(truncate=False)
=== gapmind_pathways schema === +------------------+---------+-------+ |col_name |data_type|comment| +------------------+---------+-------+ |genome_id |string |NULL | |pathway |string |NULL | |clade_name |string |NULL | |metabolic_category|string |NULL | |sequence_scope |string |NULL | |nHi |int |NULL | |nMed |int |NULL | |nLo |int |NULL | |score |double |NULL | |score_category |string |NULL | |score_simplified |double |NULL | +------------------+---------+-------+ Sample clade_name values from gapmind_pathways (confirm format): +-------------------------------------------------+ |clade_name | +-------------------------------------------------+ |s__Snodgrassella_alvi_B--RS_GCF_002777425.1 | |s__Massilioclostridium_coli--RS_GCF_900095865.1 | |s__Marinisoma_sp000402655--GB_GCA_000402655.1 | |s__UBA2943_sp900321295--GB_GCA_900321295.1 | |s__Alloprevotella_sp022772475--GB_GCA_022772475.1| +-------------------------------------------------+ Temp view re-registered: 1837 clade names Verify temp view JOIN (5-row sample — should return results): +-----------------------------------------------+----------+-----------------+------------------+ |clade_name |pathway |score_category |metabolic_category| +-----------------------------------------------+----------+-----------------+------------------+ |s__Streptococcus_pneumoniae--RS_GCF_001457635.1|arginine |not_present |carbon | |s__Streptococcus_pneumoniae--RS_GCF_001457635.1|arginine |not_present |carbon | |s__Streptococcus_pneumoniae--RS_GCF_001457635.1|arginine |steps_missing_low|carbon | |s__Streptococcus_pneumoniae--RS_GCF_001457635.1|asparagine|complete |carbon | |s__Streptococcus_pneumoniae--RS_GCF_001457635.1|asparagine|not_present |carbon | +-----------------------------------------------+----------+-----------------+------------------+
# Re-register temp view (defensive: Spark Connect session may reconnect between cells)
spark.createDataFrame(
pd.DataFrame({'clade_name': mapped_clade_names})
).createOrReplaceTempView('mapped_clade_names_tmp')
# Check how many GapMind rows are in the mapped clades subset
n_gapmind_mapped = spark.sql("""
SELECT COUNT(*) as n
FROM kbase_ke_pangenome.gapmind_pathways g
JOIN mapped_clade_names_tmp m ON g.clade_name = m.clade_name
""").collect()[0]['n']
print(f'GapMind rows for mapped clades: {n_gapmind_mapped:,} (vs 305M total)')
# Count distinct pathways in mapped subset
n_pathways = spark.sql("""
SELECT COUNT(DISTINCT pathway) as n
FROM kbase_ke_pangenome.gapmind_pathways g
JOIN mapped_clade_names_tmp m ON g.clade_name = m.clade_name
""").collect()[0]['n']
print(f'Distinct pathways in mapped clades: {n_pathways}')
GapMind rows for mapped clades: 160,351,853 (vs 305M total) Distinct pathways in mapped clades: 80
# Re-register temp view immediately before the expensive aggregation (belt-and-suspenders)
spark.createDataFrame(
pd.DataFrame({'clade_name': mapped_clade_names})
).createOrReplaceTempView('mapped_clade_names_tmp')
# Two-stage GapMind aggregation (stay in Spark until final result):
# Stage 1: MAX score per (clade, genome, pathway) — eliminates duplicate rows per genome-pathway pair
# Stage 2: AVG across genomes per (clade, pathway) — species-level completeness
#
# Note: CAST AS DOUBLE required — Spark infers DECIMAL for AVG(INT) and for
# AVG(CASE WHEN ... THEN 1.0 ...) where 1.0 is a DECIMAL literal.
# Without the CAST, pandas receives decimal.Decimal objects that fail
# in downstream float arithmetic.
species_completeness_spark = spark.sql("""
WITH best_scores AS (
SELECT g.clade_name, g.genome_id, g.pathway, g.metabolic_category,
MAX(CASE g.score_category
WHEN 'complete' THEN 5
WHEN 'likely_complete' THEN 4
WHEN 'steps_missing_low' THEN 3
WHEN 'steps_missing_medium' THEN 2
WHEN 'not_present' THEN 1
ELSE 0 END) AS best_score
FROM kbase_ke_pangenome.gapmind_pathways g
JOIN mapped_clade_names_tmp m ON g.clade_name = m.clade_name
GROUP BY g.clade_name, g.genome_id, g.pathway, g.metabolic_category
)
SELECT
clade_name,
pathway,
metabolic_category,
CAST(AVG(best_score) AS DOUBLE) AS mean_best_score,
CAST(AVG(CASE WHEN best_score >= 5 THEN 1.0 ELSE 0.0 END) AS DOUBLE) AS frac_complete,
CAST(AVG(CASE WHEN best_score >= 4 THEN 1.0 ELSE 0.0 END) AS DOUBLE) AS frac_likely_complete,
COUNT(DISTINCT genome_id) AS n_genomes
FROM best_scores
GROUP BY clade_name, pathway, metabolic_category
""")
# Collect to pandas — this is the expensive Spark job (~10+ min for large clade sets)
print('Running GapMind aggregation (may take several minutes)...')
species_completeness = species_completeness_spark.toPandas()
print(f'Species pathway completeness rows: {len(species_completeness):,}')
print(f'Clades: {species_completeness["clade_name"].nunique()}')
print(f'Pathways: {species_completeness["pathway"].nunique()}')
print(f'Metabolic categories: {species_completeness["metabolic_category"].unique().tolist()}')
print(f'frac_complete dtype: {species_completeness["frac_complete"].dtype}')
print(species_completeness.head(5).to_string())
Running GapMind aggregation (may take several minutes)...
Species pathway completeness rows: 146,960
Clades: 1837
Pathways: 80
Metabolic categories: ['aa', 'carbon']
frac_complete dtype: float64
clade_name pathway metabolic_category mean_best_score frac_complete frac_likely_complete n_genomes
0 s__Citrobacter_werkmanii--GB_GCA_000759755.1 leu aa 5.000000 1.0 1.0 92
1 s__Methylocella_sp003162995--GB_GCA_003162995.1 gly aa 5.000000 1.0 1.0 2
2 s__Rickettsia_bellii--RS_GCF_000012385.1 xylitol carbon 1.000000 0.0 0.0 7
3 s__Carboxydothermus_hydrogenoformans--RS_GCF_000012865.1 met aa 5.000000 1.0 1.0 3
4 s__Lactiplantibacillus_plantarum--RS_GCF_014131735.1 tyrosine carbon 2.997268 0.0 0.0 732
# gapmind_pathways.clade_name = gtdb_species_clade_id — just rename for clarity.
# No merge needed; the values are already the same IDs used in taxon_bridge.
species_completeness = species_completeness.rename(
columns={'clade_name': 'gtdb_species_clade_id'}
)
print(f'Species pathway completeness rows: {len(species_completeness):,}')
print(f'Unique gtdb_species_clade_id: {species_completeness["gtdb_species_clade_id"].nunique()}')
print(f'Pathways: {species_completeness["pathway"].nunique()}')
print(species_completeness.head(3).to_string())
Species pathway completeness rows: 146,960
Unique gtdb_species_clade_id: 1837
Pathways: 80
gtdb_species_clade_id pathway metabolic_category mean_best_score frac_complete frac_likely_complete n_genomes
0 s__Citrobacter_werkmanii--GB_GCA_000759755.1 leu aa 5.0 1.0 1.0 92
1 s__Methylocella_sp003162995--GB_GCA_003162995.1 gly aa 5.0 1.0 1.0 2
2 s__Rickettsia_bellii--RS_GCF_000012385.1 xylitol carbon 1.0 0.0 0.0 7
# Save species-level pathway completeness
sp_path = os.path.join(DATA_DIR, 'species_pathway_completeness.csv')
species_completeness.to_csv(sp_path, index=False)
print(f'Saved: data/species_pathway_completeness.csv ({len(species_completeness):,} rows)')
print(f' Clades: {species_completeness["gtdb_species_clade_id"].nunique()}')
print(f' Pathways: {species_completeness["pathway"].nunique()}')
Saved: data/species_pathway_completeness.csv (146,960 rows) Clades: 1837 Pathways: 80
Part 3: Community-Weighted Pathway Completeness Per Sample¶
For each overlap sample:
- Get species-rank centrifuge abundances
- Join taxon →
gtdb_species_clade_idviataxon_bridge - Join
gtdb_species_clade_id→frac_completeper pathway viaspecies_pathway_completeness - Compute community-weighted mean, normalized by mapped abundance
# Load centrifuge species-rank data for overlap samples via Spark.
# Use omics_files_table to restrict to overlap samples (no pandas→Spark roundtrip).
BRIDGE_TBL = 'nmdc_arkin.omics_files_table'
clf_data_spark = spark.sql(f"""
SELECT c.file_id, c.label AS taxon_name, CAST(c.abundance AS DOUBLE) AS abundance
FROM nmdc_arkin.centrifuge_gold c
JOIN {BRIDGE_TBL} b ON c.file_id = b.file_id
WHERE b.sample_id IN (
SELECT b2.sample_id FROM {BRIDGE_TBL} b2
JOIN (SELECT DISTINCT file_id FROM nmdc_arkin.centrifuge_gold) c2 ON b2.file_id = c2.file_id
INTERSECT
SELECT b3.sample_id FROM {BRIDGE_TBL} b3
JOIN (SELECT DISTINCT file_id FROM nmdc_arkin.metabolomics_gold) m3 ON b3.file_id = m3.file_id
)
AND LOWER(c.rank) = 'species'
AND c.label IS NOT NULL AND c.label != ''
AND c.abundance > 0
""")
clf_data = clf_data_spark.toPandas()
# Ensure abundance is float (Spark DECIMAL maps to decimal.Decimal in pandas)
clf_data['abundance'] = clf_data['abundance'].astype(float)
print(f'Species-rank rows for overlap samples: {len(clf_data):,}')
print(f'Unique classifier files: {clf_data["file_id"].nunique()}')
print(f'abundance dtype: {clf_data["abundance"].dtype}')
print(clf_data.head(3).to_string())
Species-rank rows for overlap samples: 126,905
Unique classifier files: 220
abundance dtype: float64
file_id taxon_name abundance
0 nmdc:dobj-11-kbd8zm70 Staphylococcus pettenkoferi 0.000037
1 nmdc:dobj-11-kbxytm81 Actinoalloteichus sp. AHMU CJ021 0.000028
2 nmdc:dobj-11-kbxytm81 Variovorax sp. PMC12 0.001012
# Map file_id → sample_id using sample_file_map (one clf_file per sample)
clf_data = clf_data.merge(sample_file_map.rename(columns={'clf_file_id': 'file_id'}),
on='file_id', how='inner')
print(f'Rows after merging sample_id: {len(clf_data):,}')
print(f'Unique samples: {clf_data["sample_id"].nunique()}')
Rows after merging sample_id: 126,905 Unique samples: 220
# Join centrifuge taxa with taxon_bridge to get gtdb_species_clade_id
# Use all mapped tiers (species_exact, genus_proxy_unique, genus_proxy_ambiguous)
bridge_mapped = bridge[bridge['mapping_tier'] != 'unmapped'][[
'taxon_name', 'gtdb_species_clade_id', 'mapping_tier'
]].dropna(subset=['gtdb_species_clade_id'])
# For genus_proxy_ambiguous taxa, a single Centrifuge taxon name matches multiple GTDB clades
# (one per genome in that genus). We take ONE representative clade per taxon_name using
# sort + drop_duplicates, which gives deterministic alphabetical tiebreaking on
# gtdb_species_clade_id. This is a conservative proxy; the sensitivity to this choice is
# bounded by the genus_proxy_ambiguous fraction of mapped abundance (~6.5% of the 93.5%
# mapped total, since most ambiguous matches are low-abundance taxa). An
# abundance-weighted mean across all matching clades is a suggested future improvement
# (see REPORT.md Limitations).
bridge_mapped = (
bridge_mapped
.sort_values('gtdb_species_clade_id') # deterministic alphabetical tiebreak
.drop_duplicates(subset='taxon_name')
)
clf_bridged = clf_data.merge(bridge_mapped, on='taxon_name', how='left')
n_mapped = clf_bridged['gtdb_species_clade_id'].notna().sum()
n_total = len(clf_bridged)
mapped_abund_frac = (
clf_bridged[clf_bridged['gtdb_species_clade_id'].notna()]['abundance'].sum()
/ clf_bridged['abundance'].sum()
)
print(f'Rows mapped to a GTDB clade: {n_mapped:,} / {n_total:,} ({n_mapped/n_total:.1%})')
print(f'Fraction of total abundance covered by mapped taxa: {mapped_abund_frac:.1%}')
print('\nMapping tier breakdown:')
print(clf_bridged['mapping_tier'].value_counts(dropna=False).to_string())
Rows mapped to a GTDB clade: 112,794 / 126,905 (88.9%) Fraction of total abundance covered by mapped taxa: 93.5% Mapping tier breakdown: mapping_tier genus_proxy_ambiguous 57355 species_exact 51026 NaN 14111 genus_proxy_unique 4413
# Join with species pathway completeness
# completeness table is keyed by gtdb_species_clade_id + pathway
clf_with_completeness = clf_bridged[clf_bridged['gtdb_species_clade_id'].notna()].merge(
species_completeness[['gtdb_species_clade_id', 'pathway', 'metabolic_category',
'frac_complete', 'frac_likely_complete', 'mean_best_score']],
on='gtdb_species_clade_id',
how='inner'
)
# Defensive: cast Spark AVG/DECIMAL results to float.
# Spark treats decimal literals like 1.0 as DECIMAL (not DOUBLE), so AVG(CASE WHEN ... THEN 1.0 ...)
# returns decimal.Decimal in pandas, which fails in float arithmetic. CAST AS DOUBLE in cell-11
# is the primary fix; this is a safety net for cached runs.
for col in ['frac_complete', 'frac_likely_complete', 'mean_best_score']:
clf_with_completeness[col] = clf_with_completeness[col].astype(float)
print(f'Rows after joining with completeness: {len(clf_with_completeness):,}')
print(f'Unique samples: {clf_with_completeness["sample_id"].nunique()}')
print(f'Unique pathways: {clf_with_completeness["pathway"].nunique()}')
print(f'frac_complete dtype: {clf_with_completeness["frac_complete"].dtype}')
Rows after joining with completeness: 9,023,520 Unique samples: 220 Unique pathways: 80 frac_complete dtype: float64
# Compute community-weighted pathway completeness per sample
#
# For each (sample, pathway):
# total_mapped_abund = Σ abundance_i (only over taxa with a completeness score for this pathway)
# community_completeness = Σ (abundance_i / total_mapped_abund) × frac_complete_i
#
# This gives a value in [0,1]: fraction of community (by mapped abundance) with a complete pathway.
# Step 1: total mapped abundance per sample per pathway (denominator)
sample_pathway_totals = (
clf_with_completeness
.groupby(['sample_id', 'pathway'])['abundance']
.sum()
.rename('total_mapped_abund')
.reset_index()
)
# Step 2: merge denominator back and compute weighted completeness
clf_w = clf_with_completeness.merge(sample_pathway_totals, on=['sample_id', 'pathway'])
clf_w['weight'] = clf_w['abundance'] / clf_w['total_mapped_abund']
clf_w['weighted_frac_complete'] = clf_w['weight'] * clf_w['frac_complete']
clf_w['weighted_frac_likely_complete'] = clf_w['weight'] * clf_w['frac_likely_complete']
# Step 3: aggregate by sample × pathway
community_matrix_long = (
clf_w.groupby(['sample_id', 'pathway', 'metabolic_category'])
.agg(
community_frac_complete=('weighted_frac_complete', 'sum'),
community_frac_likely_complete=('weighted_frac_likely_complete', 'sum'),
n_mapped_taxa=('taxon_name', 'nunique'),
total_mapped_abund=('total_mapped_abund', 'first')
)
.reset_index()
)
print(f'Community matrix (long): {len(community_matrix_long):,} rows')
print(f'Samples: {community_matrix_long["sample_id"].nunique()}')
print(f'Pathways: {community_matrix_long["pathway"].nunique()}')
print(community_matrix_long.describe().to_string())
Community matrix (long): 17,600 rows
Samples: 220
Pathways: 80
community_frac_complete community_frac_likely_complete n_mapped_taxa total_mapped_abund
count 17600.000000 17600.000000 17600.000000 17600.000000
mean 0.699073 0.766397 512.700000 0.935370
std 0.299451 0.274886 313.274676 0.076067
min 0.000000 0.000000 1.000000 0.314354
25% 0.486466 0.621078 200.750000 0.899178
50% 0.801259 0.894016 597.000000 0.960668
75% 0.963211 0.982537 764.000000 0.986040
max 1.000000 1.000000 1303.000000 1.000000
# Pivot to wide format: samples (rows) × pathways (columns)
community_matrix_wide = community_matrix_long.pivot_table(
index='sample_id',
columns='pathway',
values='community_frac_complete',
aggfunc='first'
).reset_index()
print(f'Community pathway matrix (wide): {community_matrix_wide.shape}')
print(f' Samples: {len(community_matrix_wide)}')
print(f' Pathways: {community_matrix_wide.shape[1] - 1}')
# Summary: completeness score distributions
pathway_cols = [c for c in community_matrix_wide.columns if c != 'sample_id']
print(f'\nMean community completeness per pathway (top 10 most complete):')
pathway_means = community_matrix_wide[pathway_cols].mean().sort_values(ascending=False)
print(pathway_means.head(10).to_string())
print(f'\nBottom 10 (least complete):')
print(pathway_means.tail(10).to_string())
Community pathway matrix (wide): (220, 81) Samples: 220 Pathways: 80 Mean community completeness per pathway (top 10 most complete): pathway gln 0.993833 gly 0.987573 threonine 0.976052 chorismate 0.975708 asn 0.974520 met 0.973307 thr 0.961732 lys 0.961732 cys 0.943909 deoxyribose 0.932832 Bottom 10 (least complete): pathway glucosamine 0.430226 citrate 0.424053 NAG 0.418123 myoinositol 0.409428 ribose 0.371778 galacturonate 0.283137 mannitol 0.250467 D-serine 0.218830 xylitol 0.094513 glucose-6-P 0.011889
# Add ecosystem type from omics_files_table (has study_id) + study_table
# Use study_id from omics_files_table → join to study_table → get ecosystem_category, ecosystem_type
sample_study = spark.sql("""
SELECT DISTINCT b.sample_id, b.study_id
FROM nmdc_arkin.omics_files_table b
JOIN (SELECT DISTINCT file_id FROM nmdc_arkin.centrifuge_gold) c ON b.file_id = c.file_id
WHERE b.sample_id IS NOT NULL AND b.study_id IS NOT NULL
""").toPandas()
study_meta = spark.sql("""
SELECT study_id, ecosystem_category, ecosystem_type, ecosystem_subtype, specific_ecosystem
FROM nmdc_arkin.study_table
""").toPandas()
# Join sample_study → study_meta
sample_ecosystem = sample_study.merge(study_meta, on='study_id', how='left')
# Keep only one row per sample_id (take first study_id if multiple)
sample_ecosystem = (
sample_ecosystem
.sort_values('study_id')
.drop_duplicates(subset='sample_id')
.reset_index(drop=True)
)
print(f'Sample ecosystem metadata: {len(sample_ecosystem)} rows')
print('ecosystem_type distribution:')
print(sample_ecosystem['ecosystem_type'].value_counts(dropna=False).to_string())
Sample ecosystem metadata: 6361 rows ecosystem_type distribution: ecosystem_type NaN 5659 Soil 348 Freshwater 325 Unclassified 29
# Merge ecosystem metadata into community_matrix_wide
community_matrix_wide = community_matrix_wide.merge(
sample_ecosystem[['sample_id', 'study_id', 'ecosystem_category',
'ecosystem_type', 'ecosystem_subtype', 'specific_ecosystem']],
on='sample_id', how='left'
)
print(f'Community matrix with ecosystem: {community_matrix_wide.shape}')
print('ecosystem_type counts for overlap samples:')
print(community_matrix_wide['ecosystem_type'].value_counts(dropna=False).to_string())
Community matrix with ecosystem: (220, 86) ecosystem_type counts for overlap samples: ecosystem_type Soil 126 NaN 61 Freshwater 33
Part 4: Heatmap — Mean Completeness by Pathway × Ecosystem Type¶
# Compute mean community completeness per pathway × ecosystem_type
# Use long-format matrix (includes metabolic_category for ordering)
# Merge ecosystem into long format
community_long_eco = community_matrix_long.merge(
sample_ecosystem[['sample_id', 'ecosystem_type']],
on='sample_id', how='left'
)
community_long_eco['ecosystem_type'] = community_long_eco['ecosystem_type'].fillna('Unknown')
# Mean completeness per pathway × ecosystem_type
heatmap_data = (
community_long_eco
.groupby(['pathway', 'metabolic_category', 'ecosystem_type'])['community_frac_complete']
.mean()
.reset_index()
.rename(columns={'community_frac_complete': 'mean_completeness'})
)
# Pivot for heatmap
heatmap_pivot = heatmap_data.pivot_table(
index='pathway',
columns='ecosystem_type',
values='mean_completeness'
)
# Sort pathways by metabolic_category then by mean completeness
pathway_category = community_long_eco[['pathway', 'metabolic_category']].drop_duplicates()
heatmap_pivot = heatmap_pivot.join(
pathway_category.set_index('pathway')['metabolic_category']
).sort_values(['metabolic_category', 'pathway']).drop(columns='metabolic_category')
print(f'Heatmap: {heatmap_pivot.shape[0]} pathways × {heatmap_pivot.shape[1]} ecosystem types')
print(heatmap_pivot.head(5).to_string())
Heatmap: 80 pathways × 3 ecosystem types
Freshwater Soil Unknown
pathway
arg 0.704788 0.700501 0.869116
asn 0.915418 0.985185 0.984465
chorismate 0.971439 0.969769 0.990285
cys 0.892817 0.955455 0.947699
gln 0.992820 0.995030 0.991910
# Plot heatmap: pathways (rows) × ecosystem types (columns)
n_pathways_plot = heatmap_pivot.shape[0]
n_ecosystems = heatmap_pivot.shape[1]
fig_height = max(8, n_pathways_plot * 0.22)
fig, ax = plt.subplots(figsize=(max(6, n_ecosystems * 1.5), fig_height))
sns.heatmap(
heatmap_pivot,
ax=ax,
cmap='YlOrRd',
vmin=0, vmax=1,
linewidths=0.3,
linecolor='white',
annot=(n_pathways_plot <= 30), # only annotate if <= 30 pathways
fmt='.2f' if n_pathways_plot <= 30 else '',
cbar_kws={'label': 'Mean community pathway completeness'},
)
ax.set_title('Community Pathway Completeness by Ecosystem Type', fontsize=13, pad=12)
ax.set_xlabel('Ecosystem type', fontsize=11)
ax.set_ylabel('Pathway', fontsize=11)
ax.tick_params(axis='y', labelsize=8)
ax.tick_params(axis='x', labelsize=9, rotation=30)
plt.tight_layout()
fig_path = os.path.join(FIGURES_DIR, 'pathway_completeness_heatmap.png')
plt.savefig(fig_path, dpi=150, bbox_inches='tight')
plt.show()
print(f'Saved: figures/pathway_completeness_heatmap.png')
Saved: figures/pathway_completeness_heatmap.png
Part 5: Save Outputs¶
# Save community pathway matrix (wide format — used in NB05 statistical analysis)
matrix_path = os.path.join(DATA_DIR, 'community_pathway_matrix.csv')
community_matrix_wide.to_csv(matrix_path, index=False)
print(f'Saved: data/community_pathway_matrix.csv')
print(f' Shape: {community_matrix_wide.shape}')
print(f' Samples: {community_matrix_wide["sample_id"].nunique()}')
print(f' Pathway columns: {len(pathway_cols)}')
# Save long-format community matrix (useful for NB05 joins and groupby)
long_path = os.path.join(DATA_DIR, 'community_pathway_matrix_long.csv')
community_matrix_long.to_csv(long_path, index=False)
print(f'Saved: data/community_pathway_matrix_long.csv ({len(community_matrix_long):,} rows)')
Saved: data/community_pathway_matrix.csv Shape: (220, 86) Samples: 220 Pathway columns: 80 Saved: data/community_pathway_matrix_long.csv (17,600 rows)
# Summary: key statistics for NB05
print('=== NB03 Summary ===')
print(f'Overlap samples with pathway completeness data: {community_matrix_wide["sample_id"].nunique()}')
print(f'Pathways computed: {len(pathway_cols)}')
print(f'Metabolic categories: {community_long_eco["metabolic_category"].unique().tolist()}')
# metabolic_category values in gapmind_pathways are 'aa' and 'carbon' (not 'amino_acid')
aa_mask = community_long_eco['metabolic_category'] == 'aa'
n_aa_pathways = community_long_eco.loc[aa_mask, 'pathway'].nunique() if aa_mask.sum() > 0 else 0
print(f'Amino acid (aa) pathways: {n_aa_pathways}')
print()
print('Mean community completeness across all samples (frac_complete):')
print(f' All pathways: {community_matrix_long["community_frac_complete"].mean():.3f}')
if aa_mask.sum() > 0:
print(f' Amino acid (aa) pathways: '
f'{community_long_eco.loc[aa_mask, "community_frac_complete"].mean():.3f}')
print()
print('Ecosystem breakdown for overlap samples:')
print(community_matrix_wide[['ecosystem_category', 'ecosystem_type']]
.value_counts(dropna=False).to_string())
=== NB03 Summary === Overlap samples with pathway completeness data: 220 Pathways computed: 80 Metabolic categories: ['carbon', 'aa'] Amino acid (aa) pathways: 18 Mean community completeness across all samples (frac_complete): All pathways: 0.699 Amino acid (aa) pathways: 0.851 Ecosystem breakdown for overlap samples: ecosystem_category ecosystem_type Terrestrial Soil 126 NaN NaN 61 Aquatic Freshwater 33
Summary and Decisions for NB04¶
| Question | Finding |
|---|---|
| Samples in community matrix | 220 |
| Pathways computed | 80 (18 aa, 62 carbon) |
| Amino acid pathways | 18 |
| Mean community completeness (all pathways) | 0.699 |
| Mean community completeness (AA pathways) | 0.851 |
| Ecosystem types represented | Soil: 126, Freshwater: 33, Unknown: 61 |
| Metabolic categories in GapMind | 'aa' and 'carbon' (not 'amino_acid') |
Decision for NB04:
Proceed to metabolomics processing using data/community_pathway_matrix.csv.
Join with metabolomics_gold on met_file_id from nmdc_sample_inventory.csv.
Target amino acid compounds from metabolomics_gold for the Black Queen test (H1).
Use frac_complete (GapMind score ≥ 5, i.e., only genomes scored "complete") as the
primary metric; frac_likely_complete (score ≥ 4) is also computed for sensitivity.