05 Nmdc Environmental Analysis
Jupyter notebook from the Prophage Gene Modules and Terminase-Defined Lineages Across Bacterial Phylogeny and Environmental Gradients project.
NB05: NMDC Environmental Gradient Analysis¶
Project: Prophage Ecology Across Bacterial Phylogeny and Environmental Gradients
Goal: Test H1d — infer per-sample prophage burden from NMDC metagenomic taxonomic profiles, weighted by pangenome-derived prophage prevalence per genus. Correlate with abiotic environmental features.
Dependencies: NB01 outputs (data/species_module_summary.tsv)
Environment: Requires BERDL JupyterHub (Spark SQL) for NMDC data access
Outputs:
data/nmdc_prophage_prevalence.tsv— per-sample prophage inference scoresdata/nmdc_module_by_environment.tsv— per-module abiotic correlationsfigures/nmdc_prophage_vs_abiotic.png
import sys
import os
import pandas as pd
import numpy as np
from scipy import stats
spark = get_spark_session()
sys.path.insert(0, '../src')
from prophage_utils import MODULES
os.makedirs('../data', exist_ok=True)
os.makedirs('../figures', exist_ok=True)
# Load NB01 species module summary
species_summary = pd.read_csv('../data/species_module_summary.tsv', sep='\t')
print(f'Species with prophage data: {len(species_summary):,}')
Species with prophage data: 27,702
1. NMDC Data Overview¶
NMDC has no per-sample gene-level functional annotations — we use taxonomy-based inference (same approach as PHB NB04).
# Load NMDC taxonomy features and study metadata
tax_features = spark.sql("SELECT * FROM nmdc_arkin.taxonomy_features").toPandas()
studies = spark.sql("SELECT * FROM nmdc_arkin.study_table").toPandas()
abiotic_all = spark.sql("SELECT * FROM nmdc_arkin.abiotic_features").toPandas()
tax_dim = spark.sql("SELECT * FROM nmdc_arkin.taxonomy_dim").toPandas()
print(f'NMDC samples with taxonomy: {len(tax_features):,}')
print(f'NMDC samples with abiotic data: {len(abiotic_all):,}')
print(f'NMDC studies: {len(studies)}')
print(f'Taxonomy dimension table: {len(tax_dim):,} taxa')
taxon_cols = [c for c in tax_features.columns if c != 'sample_id']
print(f'Taxon columns: {len(taxon_cols)}')
NMDC samples with taxonomy: 6,365 NMDC samples with abiotic data: 13,847 NMDC studies: 48 Taxonomy dimension table: 2,594,787 taxa Taxon columns: 3492
# Study ecosystem overview
study_eco = studies[['study_id', 'name', 'ecosystem', 'ecosystem_category',
'ecosystem_type', 'ecosystem_subtype']].copy()
print('NMDC studies by ecosystem:')
for _, row in study_eco.iterrows():
eco = f"{row['ecosystem_category'] or '?'}/{row['ecosystem_type'] or '?'}"
print(f" {row['study_id']}: {eco} — {str(row['name'])[:60]}")
NMDC studies by ecosystem: nmdc:sty-11-8fb6t785: Terrestrial/Deep subsurface — Deep subsurface shale carbon reservoir microbial communities nmdc:sty-11-33fbta56: Aquatic/Freshwater — Peatland microbial communities from Minnesota, USA, analyzin nmdc:sty-11-aygzgv51: Aquatic/Freshwater — Riverbed sediment microbial communities from the Columbia Ri nmdc:sty-11-34xj1150: nan/nan — National Ecological Observatory Network: soil metagenomes (D nmdc:sty-11-076c9980: Terrestrial/Soil — Lab enrichment of tropical soil microbial communities from L nmdc:sty-11-t91cwb40: nan/nan — Determining the genomic basis for interactions between gut f nmdc:sty-11-5bgrvr62: nan/nan — Freshwater microbial communities from Lake Mendota, Crystal nmdc:sty-11-5tgfr349: Aquatic/Freshwater — Freshwater microbial communities from rivers from various lo nmdc:sty-11-dcqce727: Terrestrial/Soil — Bulk soil microbial communities from the East River watershe nmdc:sty-11-1t150432: Plants/Unclassified — Populus root and rhizosphere microbial communities from Tenn nmdc:sty-11-zs2syx06: nan/nan — Meadow Soil Metagenomes from the Angelo Coast Range Reserve nmdc:sty-11-r2h77870: Terrestrial/Soil — Roots, rhizosphere and bulk soil microbial communities from nmdc:sty-11-28tm5d36: nan/nan — 1000 Soils Research Campaign nmdc:sty-11-547rwq94: nan/nan — nan nmdc:sty-11-hht5sb92: nan/nan — National Ecological Observatory Network: surface water metag nmdc:sty-11-pzmd0x14: nan/nan — National Ecological Observatory Network: benthic metagenomes nmdc:sty-11-db67n062: nan/nan — Geochemical controls on soil resiliency to carbon loss follo nmdc:sty-11-8xdqsn54: nan/nan — Coupling spectral techniques; Molecular characterization of nmdc:sty-11-hdd4bf83: nan/nan — Colonization resistance against Candida nmdc:sty-11-2zhqs261: nan/nan — nan nmdc:sty-11-xcbexm97: nan/nan — nan nmdc:sty-11-x4aawf73: nan/nan — nan nmdc:sty-11-f1he1955: nan/nan — nan nmdc:sty-11-cytnjc39: nan/nan — nan nmdc:sty-11-msexsy29: nan/nan — nan nmdc:sty-11-nxrz9m96: nan/nan — nan nmdc:sty-11-e4yb9z58: nan/nan — Seasonal activities of the phyllosphere microbiome of perenn nmdc:sty-11-abkzcd11: nan/nan — nan nmdc:sty-11-ev70y104: nan/nan — EcoFAB 2.0 Root Microbiome Ring Trial nmdc:sty-11-8ws97026: nan/nan — Molecular mechanisms underlying changes in the temperature s nmdc:sty-11-fkbnah04: nan/nan — Freshwater microbial communities from oligotrophic, dystroph nmdc:sty-11-prtb4s28: nan/nan — Arabidopsis, maize, boechera and miscanthus rhizosphere micr nmdc:sty-11-dwsv7q78: nan/nan — Microbial regulation of soil water repellency to control soi nmdc:sty-11-y1kdd163: nan/nan — Great Lakes Bioenergy Research Center (GLBRC) nmdc:sty-11-3cmn1g53: nan/nan — Center for Advanced Bioenergy and Bioproducts Innovation (CA nmdc:sty-11-cssvjy19: nan/nan — Center for Bioenergy Innovation (CBI) nmdc:sty-11-ggfd7z76: nan/nan — Joint BioEnergy Institute (JBEI) nmdc:sty-11-srtxhh77: nan/nan — MONet nmdc:sty-11-kjs8av65: nan/nan — COMPASS - Field, Measurements, and Experiments nmdc:sty-11-wbc14h22: nan/nan — Switchgrass cropping systems affect soil carbon and nitrogen nmdc:sty-11-vh2hty57: nan/nan — The Impact of Stand Age and Fertilization on the Soil Microb nmdc:sty-11-kfvmk798: nan/nan — Chronic drought differentially alters the belowground microb nmdc:sty-11-n7mtj961: nan/nan — Impact of modulating bioenergy traits on the sorghum microbi nmdc:sty-11-46aje659: nan/nan — Panicgrass rhizosphere soil microbial communities from growt nmdc:sty-11-h1m9nj62: nan/nan — Seawater and ice microbial communities from Arctic Ocean nmdc:sty-11-rh9tya90: nan/nan — Effects of warming and drought on the microbial communities nmdc:sty-11-b0ykqz91: nan/nan — 2014 Lake Erie Harmful Algal Bloom nmdc:sty-11-j05pc998: nan/nan — Soil microbial communities from watershed of Upper East Rive
2. Build Genus-Level Prophage Burden Scores¶
From the pangenome, compute the mean prophage module count per genus. This is our reference for weighting NMDC taxonomic profiles.
# Get genus-level taxonomy from GTDB
# Note: gtdb_taxonomy_r214v1 JOIN intermittently returns 0 rows;
# use gtdb_metadata.gtdb_taxonomy string instead (reliable)
taxonomy = spark.sql("""
SELECT DISTINCT g.gtdb_species_clade_id,
REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) AS gtdb_genus_name
FROM kbase_ke_pangenome.genome g
JOIN kbase_ke_pangenome.gtdb_metadata m ON g.genome_id = m.accession
WHERE m.gtdb_taxonomy IS NOT NULL
AND REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) != ''
""").toPandas()
# Merge with species module summary
species_with_genus = species_summary.merge(taxonomy, on='gtdb_species_clade_id', how='left')
# Build genus-level prophage burden scores
module_ids = sorted(MODULES.keys())
# Per-genus: mean prophage prevalence and per-module prevalence
genus_scores = species_with_genus.groupby('gtdb_genus_name').agg(
n_species=('gtdb_species_clade_id', 'count'),
mean_prophage_clusters=('n_prophage_clusters', 'mean'),
mean_modules=('n_modules_present', 'mean'),
**{f'pct_{m}': (f'has_{m}', 'mean') for m in module_ids},
).reset_index()
# Overall prophage burden score = mean prophage cluster count per species
genus_scores['prophage_burden'] = genus_scores['mean_prophage_clusters']
print(f'Genera with prophage burden scores: {len(genus_scores):,}')
print(f'\nTop 10 genera by prophage burden:')
print(genus_scores.nlargest(10, 'prophage_burden')[['gtdb_genus_name', 'n_species',
'prophage_burden', 'mean_modules']].to_string(index=False))
Genera with prophage burden scores: 8,419
Top 10 genera by prophage burden:
gtdb_genus_name n_species prophage_burden mean_modules
Salmonella 5 2044.00000 7.0
Klebsiella 16 1976.25000 7.0
Citrobacter 10 1403.30000 7.0
Dorea 1 1373.00000 7.0
Citrobacter_B 1 1195.00000 7.0
Enterobacter 32 979.78125 7.0
Escherichia 8 970.00000 7.0
Serratia 10 881.40000 7.0
Enterococcus_D 4 809.75000 7.0
Hungatella 2 802.50000 7.0
3. Two-Tier Taxonomy Mapping¶
Map NMDC taxon IDs to pangenome genera via:
- Tier 1: gtdb_metadata NCBI taxid → GTDB genus (handles renames)
- Tier 2: Direct genus name matching via taxonomy_dim
# Tier 1: gtdb_metadata bridge (NCBI taxid → GTDB genus)
print('=== Tier 1: Mapping via gtdb_metadata ===')
ncbi_to_gtdb = spark.sql("""
SELECT DISTINCT
CAST(m.ncbi_species_taxid AS INT) as ncbi_taxid,
REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) AS gtdb_genus
FROM kbase_ke_pangenome.gtdb_metadata m
WHERE m.gtdb_taxonomy IS NOT NULL
AND REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) != ''
UNION
SELECT DISTINCT
CAST(m.ncbi_taxid AS INT) as ncbi_taxid,
REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) AS gtdb_genus
FROM kbase_ke_pangenome.gtdb_metadata m
WHERE m.gtdb_taxonomy IS NOT NULL
AND REGEXP_EXTRACT(m.gtdb_taxonomy, 'g__([^;]+)', 1) != ''
""").toPandas()
print(f'gtdb_metadata mappings: {len(ncbi_to_gtdb):,}')
# For taxids mapping to multiple genera, take the most common
ncbi_genus_map = ncbi_to_gtdb.groupby('ncbi_taxid')['gtdb_genus'].agg(
lambda x: x.value_counts().index[0]
).to_dict()
# Build Tier 1 mapping
taxid_to_genus = {}
tier1_hits = 0
for col_id in taxon_cols:
try:
tid = int(col_id)
except (ValueError, TypeError):
continue
if tid in ncbi_genus_map:
taxid_to_genus[col_id] = ncbi_genus_map[tid]
tier1_hits += 1
print(f'Tier 1 matches: {tier1_hits}/{len(taxon_cols)}')
# Tier 2: Direct genus name matching via taxonomy_dim
print('\n=== Tier 2: Fallback via taxonomy_dim ===')
gtdb_genus_set = set(genus_scores['gtdb_genus_name'].dropna().str.strip().str.lower())
tier2_hits = 0
for col_id in taxon_cols:
if col_id in taxid_to_genus:
continue
try:
tid = int(col_id)
except (ValueError, TypeError):
continue
matches = tax_dim[tax_dim['taxid'] == tid]
if len(matches) > 0:
genus = str(matches.iloc[0]['genus']).strip()
if genus and genus.lower() not in ('unclassified', 'nan', ''):
if genus.lower() in gtdb_genus_set:
taxid_to_genus[col_id] = genus
tier2_hits += 1
print(f'Tier 2 matches: {tier2_hits} additional')
print(f'\nTotal mapped: {len(taxid_to_genus)}/{len(taxon_cols)} taxon columns')
=== Tier 1: Mapping via gtdb_metadata ===
gtdb_metadata mappings: 72,819
Tier 1 matches: 2336/3492 === Tier 2: Fallback via taxonomy_dim ===
Tier 2 matches: 678 additional Total mapped: 3014/3492 taxon columns
# Build genus → prophage score lookups
genus_burden_lookup = dict(zip(
genus_scores['gtdb_genus_name'].str.lower(),
genus_scores['prophage_burden']
))
# Per-module burden lookups
genus_module_lookups = {}
for module_id in module_ids:
genus_module_lookups[module_id] = dict(zip(
genus_scores['gtdb_genus_name'].str.lower(),
genus_scores[f'pct_{module_id}']
))
# Build matched taxon list
matched = []
for col_id in taxon_cols:
genus = taxid_to_genus.get(col_id, None)
if genus and genus.lower() in genus_burden_lookup:
matched.append((col_id, genus.lower()))
print(f'Taxon IDs matched to pangenome genera with prophage scores: {len(matched)}/{len(taxon_cols)}')
Taxon IDs matched to pangenome genera with prophage scores: 3014/3492
4. Compute Per-Sample Prophage Inference Scores¶
# Compute per-sample prophage inference scores
# Overall score + per-module scores
sample_scores = []
for _, row in tax_features.iterrows():
sample_id = row['sample_id']
overall_score = 0.0
module_scores = {m: 0.0 for m in module_ids}
matched_abundance = 0.0
total_abundance = 0.0
for col_id, genus in matched:
abundance = pd.to_numeric(row.get(col_id, 0), errors='coerce')
if pd.notna(abundance) and abundance > 0:
overall_score += abundance * genus_burden_lookup.get(genus, 0)
matched_abundance += abundance
for module_id in module_ids:
module_scores[module_id] += abundance * genus_module_lookups[module_id].get(genus, 0)
for col in taxon_cols:
val = pd.to_numeric(row.get(col, 0), errors='coerce')
if pd.notna(val) and val > 0:
total_abundance += val
result = {
'sample_id': sample_id,
'prophage_score': overall_score,
'matched_abundance': matched_abundance,
'total_abundance': total_abundance,
'pct_matched': matched_abundance / total_abundance * 100 if total_abundance > 0 else 0,
}
for module_id in module_ids:
result[f'score_{module_id}'] = module_scores[module_id]
sample_scores.append(result)
sample_prophage = pd.DataFrame(sample_scores)
print(f'Computed prophage scores for {len(sample_prophage):,} samples')
print(f'\nOverall prophage score distribution:')
print(sample_prophage['prophage_score'].describe())
print(f'\nTaxon matching coverage:')
print(f' Median % abundance matched: {sample_prophage["pct_matched"].median():.1f}%')
Computed prophage scores for 6,365 samples Overall prophage score distribution: count 6.365000e+03 mean 8.503221e+04 std 8.695364e+04 min 0.000000e+00 25% 3.755687e+04 50% 6.141617e+04 75% 1.028432e+05 max 2.435216e+06 Name: prophage_score, dtype: float64 Taxon matching coverage: Median % abundance matched: 87.2%
# Per-module score distributions
print('Per-module score summary:')
for module_id in module_ids:
col = f'score_{module_id}'
print(f' {module_id}: median={sample_prophage[col].median():.3f}, '
f'mean={sample_prophage[col].mean():.3f}, '
f'max={sample_prophage[col].max():.3f}')
Per-module score summary: A_packaging: median=288.820, mean=393.206, max=12283.852 B_head_morphogenesis: median=185.908, mean=266.546, max=7748.568 C_tail: median=211.695, mean=295.847, max=9260.947 D_lysis: median=288.820, mean=393.221, max=12283.852 E_integration: median=288.428, mean=392.756, max=12161.048 F_lysogenic_regulation: median=288.820, mean=393.221, max=12283.852 G_anti_defense: median=248.646, mean=336.430, max=11289.522
5. Correlate with Abiotic Features¶
# Merge prophage scores with abiotic features
prophage_abiotic = sample_prophage.merge(abiotic_all, on='sample_id', how='inner')
print(f'Samples with both prophage scores and abiotic data: {len(prophage_abiotic):,}')
# Identify abiotic columns
abiotic_cols = [c for c in abiotic_all.columns if c != 'sample_id']
# Correlate overall prophage score with abiotic variables
overall_corr = []
for col in abiotic_cols:
vals = pd.to_numeric(prophage_abiotic[col], errors='coerce')
valid = vals.notna() & (vals != 0) & prophage_abiotic['prophage_score'].notna()
if valid.sum() >= 30:
rho, p = stats.spearmanr(prophage_abiotic.loc[valid, 'prophage_score'], vals[valid])
overall_corr.append({
'abiotic_variable': col,
'score_type': 'overall',
'n': valid.sum(),
'spearman_rho': rho,
'p_value': p,
})
overall_corr_df = pd.DataFrame(overall_corr).sort_values('p_value')
print('\nOverall prophage score vs abiotic variables:')
print(overall_corr_df.to_string(index=False))
Samples with both prophage scores and abiotic data: 6,365
Overall prophage score vs abiotic variables:
abiotic_variable score_type n spearman_rho p_value
annotations_ph overall 4366 0.474357 6.985993e-244
annotations_temp_has_numeric_value overall 4587 0.399498 2.264122e-175
annotations_depth_has_maximum_numeric_value overall 4973 0.352461 2.078767e-145
annotations_tot_nitro_content_has_numeric_value overall 1231 0.314422 1.190946e-29
annotations_carb_nitro_ratio_has_numeric_value overall 910 -0.215925 4.626414e-11
annotations_depth_has_minimum_numeric_value overall 349 0.281673 8.700601e-08
annotations_depth_has_numeric_value overall 517 -0.170558 9.727513e-05
annotations_ammonium_nitrogen_has_numeric_value overall 1230 0.100235 4.304370e-04
annotations_samp_size_has_numeric_value overall 148 -0.218260 7.700167e-03
annotations_diss_oxygen_has_numeric_value overall 272 0.113607 6.133263e-02
annotations_soluble_react_phosp_has_numeric_value overall 32 0.199615 2.733682e-01
annotations_conduc_has_numeric_value overall 70 0.080360 5.084146e-01
annotations_tot_phosp_has_numeric_value overall 32 0.112609 5.394632e-01
annotations_ammonium_has_numeric_value overall 33 0.109324 5.447657e-01
annotations_chlorophyll_has_numeric_value overall 34 -0.055165 7.566605e-01
# Per-module correlations with abiotic variables
module_corr_results = []
for module_id in module_ids:
score_col = f'score_{module_id}'
for abiotic_col in abiotic_cols:
vals = pd.to_numeric(prophage_abiotic[abiotic_col], errors='coerce')
valid = vals.notna() & (vals != 0) & prophage_abiotic[score_col].notna()
if valid.sum() >= 30:
rho, p = stats.spearmanr(prophage_abiotic.loc[valid, score_col], vals[valid])
module_corr_results.append({
'module': module_id,
'module_name': MODULES[module_id]['full_name'],
'abiotic_variable': abiotic_col,
'n': valid.sum(),
'spearman_rho': rho,
'p_value': p,
})
module_corr_df = pd.DataFrame(module_corr_results)
# Multiple testing correction (FDR)
from statsmodels.stats.multitest import multipletests
if len(module_corr_df) > 0:
reject, pvals_corrected, _, _ = multipletests(module_corr_df['p_value'], method='fdr_bh')
module_corr_df['p_fdr'] = pvals_corrected
module_corr_df['significant_fdr'] = reject
# Show significant per-module correlations
sig_module = module_corr_df[module_corr_df['significant_fdr'] == True].sort_values('p_fdr')
print(f'\nSignificant module-abiotic correlations (FDR < 0.05): {len(sig_module)}')
if len(sig_module) > 0:
print(sig_module[['module_name', 'abiotic_variable', 'spearman_rho', 'p_fdr']].head(30).to_string(index=False))
Significant module-abiotic correlations (FDR < 0.05): 57
module_name abiotic_variable spearman_rho p_fdr
Packaging Module annotations_ph 0.518946 2.869697e-298
Lysis Module annotations_ph 0.518946 2.869697e-298
Lysogenic Regulation Module annotations_ph 0.518946 2.869697e-298
Integration Module annotations_ph 0.518959 2.869697e-298
Tail Module annotations_ph 0.497890 5.588501e-271
Anti-Defense Module annotations_ph 0.497110 4.427506e-270
Tail Module annotations_depth_has_maximum_numeric_value 0.382007 2.368981e-171
Head Morphogenesis Module annotations_ph 0.400735 5.129427e-167
Lysogenic Regulation Module annotations_depth_has_maximum_numeric_value 0.360718 9.576379e-152
Lysis Module annotations_depth_has_maximum_numeric_value 0.360718 9.576379e-152
Packaging Module annotations_depth_has_maximum_numeric_value 0.360720 9.576379e-152
Integration Module annotations_depth_has_maximum_numeric_value 0.360702 9.576379e-152
Anti-Defense Module annotations_depth_has_maximum_numeric_value 0.356371 6.288947e-148
Anti-Defense Module annotations_temp_has_numeric_value 0.351715 8.453988e-133
Tail Module annotations_temp_has_numeric_value 0.351417 1.367282e-132
Head Morphogenesis Module annotations_depth_has_maximum_numeric_value 0.336709 2.836186e-131
Integration Module annotations_temp_has_numeric_value 0.331672 2.043343e-117
Packaging Module annotations_temp_has_numeric_value 0.331531 2.210564e-117
Lysis Module annotations_temp_has_numeric_value 0.331531 2.210564e-117
Lysogenic Regulation Module annotations_temp_has_numeric_value 0.331531 2.210564e-117
Head Morphogenesis Module annotations_temp_has_numeric_value 0.311518 4.388531e-103
Lysis Module annotations_tot_nitro_content_has_numeric_value 0.333463 9.892882e-33
Lysogenic Regulation Module annotations_tot_nitro_content_has_numeric_value 0.333463 9.892882e-33
Integration Module annotations_tot_nitro_content_has_numeric_value 0.333463 9.892882e-33
Packaging Module annotations_tot_nitro_content_has_numeric_value 0.333463 9.892882e-33
Tail Module annotations_tot_nitro_content_has_numeric_value 0.324806 4.771820e-31
Anti-Defense Module annotations_tot_nitro_content_has_numeric_value 0.324838 4.771820e-31
Head Morphogenesis Module annotations_tot_nitro_content_has_numeric_value 0.270635 1.565975e-21
Tail Module annotations_carb_nitro_ratio_has_numeric_value -0.278034 4.664831e-17
Packaging Module annotations_carb_nitro_ratio_has_numeric_value -0.249175 7.648686e-14
6. Study-Level Analysis¶
Compare prophage burden across NMDC study ecosystem types.
# Try to link samples to studies
# The biosample_set or omics_processing tables should have study links
try:
sample_study = spark.sql("""
SELECT DISTINCT sample_id, study_id
FROM nmdc_arkin.biosample_set
""").toPandas()
print(f'Sample-study links from biosample_set: {len(sample_study):,}')
except Exception as e:
print(f'biosample_set query failed: {e}')
# Try alternative approach
try:
sample_study = spark.sql("""
SELECT DISTINCT sample_id, study_id
FROM nmdc_arkin.omics_processing_set
""").toPandas()
print(f'Sample-study links from omics_processing_set: {len(sample_study):,}')
except Exception as e2:
print(f'omics_processing_set also failed: {e2}')
sample_study = None
if sample_study is not None and len(sample_study) > 0:
# Merge with prophage scores and study metadata
study_prophage = sample_prophage.merge(sample_study, on='sample_id', how='inner')
study_prophage = study_prophage.merge(
studies[['study_id', 'name', 'ecosystem_category', 'ecosystem_type']],
on='study_id', how='left'
)
print(f'\nSamples linked to studies: {len(study_prophage):,}')
# Prophage score by ecosystem type
eco_stats = study_prophage.groupby('ecosystem_type').agg(
n_samples=('sample_id', 'count'),
mean_prophage_score=('prophage_score', 'mean'),
median_prophage_score=('prophage_score', 'median'),
).sort_values('median_prophage_score', ascending=False)
print('\nProphage score by ecosystem type:')
print(eco_stats.to_string())
else:
print('Could not link samples to studies — skipping study-level analysis')
study_prophage = None
{"ts": "2026-02-21 00:26:02.693", "level": "ERROR", "logger": "SQLQueryContextLogger", "msg": "[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`biosample_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 13;\n'Distinct\n+- 'Project ['sample_id, 'study_id]\n +- 'UnresolvedRelation [nmdc_arkin, biosample_set], [], false\n\n\nJVM stacktrace:\norg.apache.spark.sql.catalyst.ExtendedAnalysisException\n\tat org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:91)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:306)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)\n\tat scala.collection.immutable.Vector.foreach(Vector.scala:2125)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)\n\tat scala.collection.immutable.Vector.foreach(Vector.scala:2125)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)\n\tat scala.util.Try$.apply(Try.scala:217)\n\tat org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)\n\tat org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)\n\tat org.apache.spark.util.LazyTry.get(LazyTry.scala:58)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)\n\tat org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)\n\tat org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:139)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:136)\n\tat org.apache.spark.sql.classic.SparkSession.$anonfun$sql$4(SparkSession.scala:499)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:490)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:2764)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2608)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2499)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)\n\tat org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)\n\tat org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)\n\tat org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)\n\tat org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)\n\tat org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)\n\tat org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)\n\tat org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)", "context": {"errorClass": "TABLE_OR_VIEW_NOT_FOUND"}, "exception": {"class": "_MultiThreadedRendezvous", "msg": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`biosample_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 13;\n'Distinct\n+- 'Project ['sample_id, 'study_id]\n +- 'UnresolvedRelation [nmdc_arkin, biosample_set], [], false\n\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`biosample_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 13;\\n\\'Distinct\\n+- \\'Project [\\'sample_id, \\'study_id]\\n +- \\'UnresolvedRelation [nmdc_arkin, biosample_set], [], false\\n\", grpc_status:13}\"\n>", "stacktrace": [{"class": null, "method": "_execute_and_fetch_as_iterator", "file": "/usr/local/spark/python/pyspark/sql/connect/client/core.py", "line": "1523"}, {"class": null, "method": "__next__", "file": "<frozen _collections_abc>", "line": "360"}, {"class": null, "method": "send", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "138"}, {"class": null, "method": "_has_next", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "190"}, {"class": null, "method": "_has_next", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "162"}, {"class": null, "method": "_call_iter", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "281"}, {"class": null, "method": "_call_iter", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "261"}, {"class": null, "method": "<lambda>", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "163"}, {"class": null, "method": "__next__", "file": "/opt/conda/lib/python3.13/site-packages/grpc/_channel.py", "line": "538"}, {"class": null, "method": "_next", "file": "/opt/conda/lib/python3.13/site-packages/grpc/_channel.py", "line": "956"}]}}
biosample_set query failed: [TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`biosample_set` cannot be found. Verify the spelling and correctness of the schema and catalog. If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog. To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 13; 'Distinct +- 'Project ['sample_id, 'study_id] +- 'UnresolvedRelation [nmdc_arkin, biosample_set], [], false JVM stacktrace: org.apache.spark.sql.catalyst.ExtendedAnalysisException at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:91) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:306) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251) at scala.collection.immutable.Vector.foreach(Vector.scala:2125) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251) at scala.collection.immutable.Vector.foreach(Vector.scala:2125) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110) at scala.util.Try$.apply(Try.scala:217) at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439) at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80) at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:139) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:136) at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$4(SparkSession.scala:499) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:490) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:2764) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2608) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2499) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186) at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
{"ts": "2026-02-21 00:26:03.945", "level": "ERROR", "logger": "SQLQueryContextLogger", "msg": "[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`omics_processing_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 17;\n'Distinct\n+- 'Project ['sample_id, 'study_id]\n +- 'UnresolvedRelation [nmdc_arkin, omics_processing_set], [], false\n\n\nJVM stacktrace:\norg.apache.spark.sql.catalyst.ExtendedAnalysisException\n\tat org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:91)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:306)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)\n\tat scala.collection.immutable.Vector.foreach(Vector.scala:2125)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251)\n\tat scala.collection.immutable.Vector.foreach(Vector.scala:2125)\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244)\n\tat org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111)\n\tat org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110)\n\tat scala.util.Try$.apply(Try.scala:217)\n\tat org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)\n\tat org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)\n\tat org.apache.spark.util.LazyTry.get(LazyTry.scala:58)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121)\n\tat org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80)\n\tat org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:139)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:136)\n\tat org.apache.spark.sql.classic.SparkSession.$anonfun$sql$4(SparkSession.scala:499)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:490)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:2764)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2608)\n\tat org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2499)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)\n\tat org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)\n\tat org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)\n\tat org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)\n\tat org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)\n\tat org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)\n\tat org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)\n\tat org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)\n\tat org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)\n\tat org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)", "context": {"errorClass": "TABLE_OR_VIEW_NOT_FOUND"}, "exception": {"class": "_MultiThreadedRendezvous", "msg": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`omics_processing_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 17;\n'Distinct\n+- 'Project ['sample_id, 'study_id]\n +- 'UnresolvedRelation [nmdc_arkin, omics_processing_set], [], false\n\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer {grpc_message:\"[TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`omics_processing_set` cannot be found. Verify the spelling and correctness of the schema and catalog.\\nIf you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.\\nTo tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 17;\\n\\'Distinct\\n+- \\'Project [\\'sample_id, \\'study_id]\\n +- \\'UnresolvedRelation [nmdc_arkin, omics_processing_set], [], false\\n\", grpc_status:13}\"\n>", "stacktrace": [{"class": null, "method": "_execute_and_fetch_as_iterator", "file": "/usr/local/spark/python/pyspark/sql/connect/client/core.py", "line": "1523"}, {"class": null, "method": "__next__", "file": "<frozen _collections_abc>", "line": "360"}, {"class": null, "method": "send", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "138"}, {"class": null, "method": "_has_next", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "190"}, {"class": null, "method": "_has_next", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "162"}, {"class": null, "method": "_call_iter", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "281"}, {"class": null, "method": "_call_iter", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "261"}, {"class": null, "method": "<lambda>", "file": "/usr/local/spark/python/pyspark/sql/connect/client/reattach.py", "line": "163"}, {"class": null, "method": "__next__", "file": "/opt/conda/lib/python3.13/site-packages/grpc/_channel.py", "line": "538"}, {"class": null, "method": "_next", "file": "/opt/conda/lib/python3.13/site-packages/grpc/_channel.py", "line": "956"}]}}
omics_processing_set also failed: [TABLE_OR_VIEW_NOT_FOUND] The table or view `nmdc_arkin`.`omics_processing_set` cannot be found. Verify the spelling and correctness of the schema and catalog. If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog. To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS. SQLSTATE: 42P01; line 3 pos 17; 'Distinct +- 'Project ['sample_id, 'study_id] +- 'UnresolvedRelation [nmdc_arkin, omics_processing_set], [], false JVM stacktrace: org.apache.spark.sql.catalyst.ExtendedAnalysisException at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.tableNotFound(package.scala:91) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:306) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:284) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:252) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251) at scala.collection.immutable.Vector.foreach(Vector.scala:2125) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:251) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:251) at scala.collection.immutable.Vector.foreach(Vector.scala:2125) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:251) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:284) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:255) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:299) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:244) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:231) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:299) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$resolveInFixedPoint$1(HybridAnalyzer.scala:192) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:192) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:76) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:111) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:71) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:330) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:330) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:110) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:278) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:278) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:277) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:110) at scala.util.Try$.apply(Try.scala:217) at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378) at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439) at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:121) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:80) at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:139) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:136) at org.apache.spark.sql.classic.SparkSession.$anonfun$sql$4(SparkSession.scala:499) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.classic.SparkSession.sql(SparkSession.scala:490) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.executeSQL(SparkConnectPlanner.scala:2764) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2608) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2499) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:322) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186) at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347) Could not link samples to studies — skipping study-level analysis
7. Save Outputs¶
# Save per-sample prophage scores
sample_prophage.to_csv('../data/nmdc_prophage_prevalence.tsv', sep='\t', index=False)
print(f'Saved data/nmdc_prophage_prevalence.tsv: {len(sample_prophage):,} rows')
# Save module × abiotic correlations
module_corr_df.to_csv('../data/nmdc_module_by_environment.tsv', sep='\t', index=False)
print(f'Saved data/nmdc_module_by_environment.tsv: {len(module_corr_df):,} rows')
Saved data/nmdc_prophage_prevalence.tsv: 6,365 rows Saved data/nmdc_module_by_environment.tsv: 105 rows
8. Figures¶
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
# Figure: multi-panel prophage vs abiotic
# Top correlations (by p-value) for overall score
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Panel 1: Distribution of prophage scores
ax = axes[0, 0]
ax.hist(sample_prophage['prophage_score'], bins=50, color='#E91E63', alpha=0.8, edgecolor='white')
ax.set_xlabel('Prophage inference score')
ax.set_ylabel('Number of samples')
ax.set_title('Distribution of Prophage Scores')
ax.axvline(sample_prophage['prophage_score'].median(), color='black', linestyle='--', alpha=0.5)
# Panel 2: Matching coverage
ax = axes[0, 1]
ax.hist(sample_prophage['pct_matched'], bins=50, color='#4CAF50', alpha=0.8, edgecolor='white')
ax.set_xlabel('% abundance matched')
ax.set_ylabel('Number of samples')
ax.set_title('Pangenome Matching Coverage')
# Panels 3-6: Top 4 abiotic correlations
if len(overall_corr_df) > 0:
top_corr = overall_corr_df.head(4)
panel_positions = [(0, 2), (1, 0), (1, 1), (1, 2)]
for i, (_, corr_row) in enumerate(top_corr.iterrows()):
if i >= len(panel_positions):
break
r, c = panel_positions[i]
ax = axes[r, c]
col = corr_row['abiotic_variable']
vals = pd.to_numeric(prophage_abiotic[col], errors='coerce')
valid = vals.notna() & (vals != 0) & prophage_abiotic['prophage_score'].notna()
ax.scatter(vals[valid], prophage_abiotic.loc[valid, 'prophage_score'],
alpha=0.2, s=10, color='#E91E63')
clean_name = col.replace('annotations_', '').replace('_has_numeric_value', '')
ax.set_xlabel(clean_name)
ax.set_ylabel('Prophage score')
ax.set_title(f'rho={corr_row["spearman_rho"]:.3f}, p={corr_row["p_value"]:.1e}')
plt.suptitle('NMDC: Prophage Burden vs Environmental Variables', fontsize=13, y=1.02)
plt.tight_layout()
plt.savefig('../figures/nmdc_prophage_vs_abiotic.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved figures/nmdc_prophage_vs_abiotic.png')
Saved figures/nmdc_prophage_vs_abiotic.png
# Figure: Per-module abiotic correlation heatmap
if len(sig_module) > 0:
# Pivot: module × abiotic variable → rho
pivot = module_corr_df.pivot_table(
index='module_name', columns='abiotic_variable', values='spearman_rho'
)
# Clean column names
pivot.columns = [c.replace('annotations_', '').replace('_has_numeric_value', '')
for c in pivot.columns]
fig, ax = plt.subplots(figsize=(14, 6))
sns.heatmap(pivot, cmap='RdBu_r', center=0, annot=True, fmt='.2f', ax=ax,
cbar_kws={'label': 'Spearman rho'})
ax.set_title('Per-Module Correlation with Abiotic Variables')
ax.set_ylabel('')
plt.tight_layout()
plt.savefig('../figures/nmdc_module_abiotic_heatmap.png', dpi=150, bbox_inches='tight')
plt.show()
print('Saved figures/nmdc_module_abiotic_heatmap.png')
else:
print('No significant module-abiotic correlations to visualize')
Saved figures/nmdc_module_abiotic_heatmap.png
# Summary
print('='*60)
print('NB05 SUMMARY')
print('='*60)
print(f'NMDC samples scored: {len(sample_prophage):,}')
print(f'Median matching coverage: {sample_prophage["pct_matched"].median():.1f}%')
print(f'Genera matched to pangenome: {len(matched)}/{len(taxon_cols)}')
n_sig_overall = (overall_corr_df['p_value'] < 0.05).sum() if len(overall_corr_df) > 0 else 0
print(f'\nOverall prophage score significant correlations (p<0.05): {n_sig_overall}')
if len(overall_corr_df) > 0:
for _, row in overall_corr_df.head(5).iterrows():
clean = row['abiotic_variable'].replace('annotations_', '').replace('_has_numeric_value', '')
print(f' {clean}: rho={row["spearman_rho"]:.3f}, p={row["p_value"]:.2e}')
n_sig_module = len(sig_module) if len(module_corr_df) > 0 else 0
print(f'\nPer-module significant correlations (FDR<0.05): {n_sig_module}')
print(f'\nFiles saved:')
print(f' data/nmdc_prophage_prevalence.tsv ({len(sample_prophage):,} rows)')
print(f' data/nmdc_module_by_environment.tsv ({len(module_corr_df):,} rows)')
print(f' figures/nmdc_prophage_vs_abiotic.png')
============================================================ NB05 SUMMARY ============================================================ NMDC samples scored: 6,365 Median matching coverage: 87.2% Genera matched to pangenome: 3014/3492 Overall prophage score significant correlations (p<0.05): 9
ph: rho=0.474, p=6.99e-244 temp: rho=0.399, p=2.26e-175 depth_has_maximum_numeric_value: rho=0.352, p=2.08e-145 tot_nitro_content: rho=0.314, p=1.19e-29 carb_nitro_ratio: rho=-0.216, p=4.63e-11 Per-module significant correlations (FDR<0.05): 57 Files saved: data/nmdc_prophage_prevalence.tsv (6,365 rows) data/nmdc_module_by_environment.tsv (105 rows) figures/nmdc_prophage_vs_abiotic.png