01 Data Extraction
Jupyter notebook from the Metabolic Capability vs Metabolic Dependency project.
NB01: Data Extraction & Assembly¶
Extract all Spark-dependent data for the Metabolic Capability vs Dependency analysis.
Requires BERDL JupyterHub — get_spark_session() must be available.
Outputs¶
data/organism_mapping.csv— FB organism → GTDB clade mappingdata/fb_gapmind_scores.csv— GapMind pathway scores for FB organisms' reference genomesdata/fb_eggnog_annotations.csv— eggNOG functional annotations (EC, KEGG) for FB-linked gene clustersdata/fb_fitness_stats.csv— Per-gene fitness summary statistics for FB organismsdata/fb_essential_genes.csv— Putative essential genes (in gene table but absent from genefitness)data/species_pathway_summary.csv— Species-level GapMind aggregates for all 27K species
Key pitfalls¶
- GapMind has multiple rows per genome-pathway pair → must MAX aggregate
- FB fitness values are strings → must CAST to FLOAT
- GapMind genome_id format may differ from pangenome genome table → check and reconcile
- Gene cluster IDs are species-specific → use EC/KEGG for cross-species comparison
import pandas as pd
import numpy as np
from pathlib import Path
spark = get_spark_session()
print(f"Spark version: {spark.version}")
DATA_DIR = Path('../data')
DATA_DIR.mkdir(exist_ok=True)
1. Load Fitness Browser Organisms¶
fb_orgs = spark.sql("""
SELECT orgId, genus, species, strain, taxonomyId, division
FROM kescience_fitnessbrowser.organism
ORDER BY genus, species
""").toPandas()
print(f"Total FB organisms: {len(fb_orgs)}")
fb_orgs.head(10)
2. Match FB Organisms to GTDB Species Clades¶
Use NCBI taxonomy IDs to link FB organisms to GTDB pangenome clades.
This is a simplified version of the full DIAMOND-based mapping from
conservation_vs_fitness; it covers the majority of organisms.
# Collect unique taxonomy IDs from FB organisms
tax_ids = fb_orgs['taxonomyId'].dropna().astype(int).unique().tolist()
tax_id_str = ','.join([str(t) for t in tax_ids])
print(f"Unique FB taxonomy IDs: {len(tax_ids)}")
# Match against GTDB metadata
taxid_matches = spark.sql(f"""
SELECT DISTINCT
CAST(m.ncbi_species_taxid AS INT) as ncbi_species_taxid,
CAST(m.ncbi_taxid AS INT) as ncbi_taxid,
m.accession as genome_id,
g.gtdb_species_clade_id
FROM kbase_ke_pangenome.gtdb_metadata m
JOIN kbase_ke_pangenome.genome g ON m.accession = g.genome_id
WHERE CAST(m.ncbi_species_taxid AS INT) IN ({tax_id_str})
OR CAST(m.ncbi_taxid AS INT) IN ({tax_id_str})
""").toPandas()
print(f"Taxid matches: {len(taxid_matches)} genome-clade pairs")
print(f"Unique clades: {taxid_matches['gtdb_species_clade_id'].nunique()}")
# Map FB organisms to clades
mapping_rows = []
for _, org in fb_orgs.iterrows():
tid = org['taxonomyId']
if pd.isna(tid):
continue
tid = int(tid)
hits = taxid_matches[
(taxid_matches['ncbi_species_taxid'] == tid) |
(taxid_matches['ncbi_taxid'] == tid)
]
for clade in hits['gtdb_species_clade_id'].unique():
clade_genomes = hits[hits['gtdb_species_clade_id'] == clade]
mapping_rows.append({
'orgId': org['orgId'],
'genus': org['genus'],
'species': org['species'],
'strain': org['strain'],
'taxonomyId': tid,
'gtdb_species_clade_id': clade,
'representative_genome_id': clade_genomes['genome_id'].iloc[0],
'n_clade_genomes': len(clade_genomes)
})
org_mapping = pd.DataFrame(mapping_rows)
print(f"Mapped {org_mapping['orgId'].nunique()} FB organisms to {org_mapping['gtdb_species_clade_id'].nunique()} clades")
print(f"\nUnmatched organisms:")
unmatched = set(fb_orgs['orgId']) - set(org_mapping['orgId'])
for oid in sorted(unmatched):
row = fb_orgs[fb_orgs['orgId'] == oid].iloc[0]
print(f" {oid}: {row['genus']} {row['species']} (taxid={row['taxonomyId']})")
# Resolve multi-clade organisms: pick clade with most genomes
# (proxy for best-studied, most representative clade)
clades_per_org = org_mapping.groupby('orgId')['gtdb_species_clade_id'].nunique()
multi_clade = clades_per_org[clades_per_org > 1]
if len(multi_clade) > 0:
print(f"Resolving {len(multi_clade)} multi-clade organisms:")
resolved = []
for orgId in org_mapping['orgId'].unique():
org_rows = org_mapping[org_mapping['orgId'] == orgId]
if len(org_rows) == 1:
resolved.append(org_rows.iloc[0])
else:
# Pick clade with most genomes
best = org_rows.sort_values('n_clade_genomes', ascending=False).iloc[0]
print(f" {orgId}: {len(org_rows)} clades -> chose {best['gtdb_species_clade_id']} ({best['n_clade_genomes']} genomes)")
resolved.append(best)
org_mapping_resolved = pd.DataFrame(resolved)
else:
org_mapping_resolved = org_mapping.copy()
print(f"\nFinal mapping: {len(org_mapping_resolved)} organisms -> {org_mapping_resolved['gtdb_species_clade_id'].nunique()} clades")
org_mapping_resolved[['orgId', 'genus', 'species', 'gtdb_species_clade_id', 'representative_genome_id']]
3. Check GapMind genome_id Format¶
The GapMind table may use bare NCBI accessions (e.g., GCF_000005845.2)
while the pangenome genome table uses GTDB-prefixed IDs (e.g., RS_GCF_000005845.2).
We need to reconcile these.
# Check GapMind genome_id format with a sample
gapmind_sample = spark.sql("""
SELECT DISTINCT genome_id
FROM kbase_ke_pangenome.gapmind_pathways
LIMIT 20
""").toPandas()
print("Sample GapMind genome_ids:")
for gid in gapmind_sample['genome_id'].tolist():
print(f" {gid}")
# Check if they have GTDB prefix
has_prefix = gapmind_sample['genome_id'].str.startswith(('RS_', 'GB_')).any()
print(f"\nHas GTDB prefix (RS_/GB_): {has_prefix}")
# Check overlap with pangenome genome table
sample_ids = "','".join(gapmind_sample['genome_id'].tolist()[:5])
genome_check = spark.sql(f"""
SELECT genome_id FROM kbase_ke_pangenome.genome
WHERE genome_id IN ('{sample_ids}')
""").toPandas()
print(f"\nDirect match with genome table: {len(genome_check)}/5")
if len(genome_check) == 0:
# Try stripping prefix from genome table IDs
print("Checking if GapMind uses bare accessions...")
bare_check = spark.sql(f"""
SELECT genome_id,
CASE
WHEN genome_id LIKE 'RS_%' THEN SUBSTRING(genome_id, 4)
WHEN genome_id LIKE 'GB_%' THEN SUBSTRING(genome_id, 4)
ELSE genome_id
END as bare_id
FROM kbase_ke_pangenome.genome
LIMIT 5
""").toPandas()
print("Genome table IDs vs bare:")
print(bare_check)
# Build genome_id lookup based on format detection
# If GapMind uses bare accessions, we need to strip GTDB prefix from our mapping
target_genomes = org_mapping_resolved['representative_genome_id'].tolist()
if not has_prefix:
# Strip RS_/GB_ prefix for GapMind queries
def strip_prefix(gid):
if gid.startswith(('RS_', 'GB_')):
return gid[3:]
return gid
gapmind_genome_ids = [strip_prefix(g) for g in target_genomes]
genome_id_map = dict(zip(gapmind_genome_ids, target_genomes))
print(f"Using bare accessions for GapMind queries")
else:
gapmind_genome_ids = target_genomes
genome_id_map = {g: g for g in target_genomes}
print(f"GapMind uses GTDB-prefixed IDs (same as genome table)")
print(f"Target genomes for GapMind extraction: {len(gapmind_genome_ids)}")
print(f"Sample: {gapmind_genome_ids[:3]}")
4. Extract GapMind Pathway Scores for FB Organisms¶
For each FB organism's reference genome, extract the best GapMind score per pathway. GapMind has multiple rows per genome-pathway pair (one per step), so we take MAX.
# Build IN clause for GapMind query
genome_in = "','".join(gapmind_genome_ids)
# Extract best pathway score per genome-pathway pair
gapmind_fb = spark.sql(f"""
WITH scored AS (
SELECT
genome_id,
pathway,
metabolic_category,
clade_name,
score_category,
CAST(score AS FLOAT) as score,
CAST(nHi AS INT) as nHi,
CAST(nMed AS INT) as nMed,
CAST(nLo AS INT) as nLo,
CASE score_category
WHEN 'complete' THEN 5
WHEN 'likely_complete' THEN 4
WHEN 'steps_missing_low' THEN 3
WHEN 'steps_missing_medium' THEN 2
WHEN 'not_present' THEN 1
ELSE 0
END as score_rank
FROM kbase_ke_pangenome.gapmind_pathways
WHERE genome_id IN ('{genome_in}')
)
SELECT
genome_id,
pathway,
metabolic_category,
clade_name,
MAX(score_rank) as best_score_rank,
MAX(score) as best_score,
MAX(nHi) as max_nHi,
MAX(nMed) as max_nMed,
MAX(nLo) as max_nLo,
FIRST(score_category) as best_score_category
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY genome_id, pathway
ORDER BY score_rank DESC, score DESC
) as rn
FROM scored
)
WHERE rn = 1
GROUP BY genome_id, pathway, metabolic_category, clade_name
""").toPandas()
print(f"GapMind scores extracted: {len(gapmind_fb)} genome-pathway pairs")
print(f"Genomes: {gapmind_fb['genome_id'].nunique()}")
print(f"Pathways: {gapmind_fb['pathway'].nunique()}")
print(f"\nScore category distribution:")
print(gapmind_fb['best_score_category'].value_counts())
print(f"\nMetabolic category:")
print(gapmind_fb['metabolic_category'].value_counts())
# Map GapMind genome_ids back to GTDB-prefixed IDs and add orgId
gapmind_fb['gtdb_genome_id'] = gapmind_fb['genome_id'].map(genome_id_map)
# Create genome -> orgId lookup from mapping
genome_to_org = dict(zip(
org_mapping_resolved['representative_genome_id'],
org_mapping_resolved['orgId']
))
gapmind_fb['orgId'] = gapmind_fb['gtdb_genome_id'].map(genome_to_org)
# Check for unmapped
unmapped = gapmind_fb['orgId'].isna().sum()
if unmapped > 0:
print(f"WARNING: {unmapped} rows without orgId mapping")
print("Unmapped genome_ids:")
print(gapmind_fb[gapmind_fb['orgId'].isna()]['genome_id'].unique())
else:
print(f"All {len(gapmind_fb)} rows mapped to FB organisms")
# Summary per organism
org_summary = gapmind_fb.groupby('orgId').agg(
n_pathways=('pathway', 'nunique'),
n_complete=('best_score_rank', lambda x: (x >= 4).sum()),
n_absent=('best_score_rank', lambda x: (x <= 1).sum())
).reset_index()
org_summary['pct_complete'] = (org_summary['n_complete'] / org_summary['n_pathways'] * 100).round(1)
print(f"\nPer-organism pathway completeness:")
print(org_summary.sort_values('pct_complete', ascending=False).to_string(index=False))
5. Extract eggNOG Annotations for FB-Linked Clades¶
Get EC numbers, KEGG orthologs, and KEGG pathway assignments for gene clusters in the FB-linked species clades. These will be used to map genes to metabolic pathways.
# Get unique clade IDs for our mapped organisms
target_clades = org_mapping_resolved['gtdb_species_clade_id'].unique().tolist()
clade_in = "','".join(target_clades)
print(f"Extracting eggNOG annotations for {len(target_clades)} clades...")
eggnog = spark.sql(f"""
SELECT
gc.gene_cluster_id,
gc.gtdb_species_clade_id,
gc.is_core,
gc.is_auxiliary,
gc.is_singleton,
e.EC,
e.KEGG_ko,
e.KEGG_Pathway,
e.COG_category,
e.Description
FROM kbase_ke_pangenome.gene_cluster gc
LEFT JOIN kbase_ke_pangenome.eggnog_mapper_annotations e
ON gc.gene_cluster_id = e.query_name
WHERE gc.gtdb_species_clade_id IN ('{clade_in}')
""").toPandas()
print(f"Gene clusters extracted: {len(eggnog):,}")
print(f"With EC annotation: {eggnog['EC'].notna().sum():,} ({eggnog['EC'].notna().mean()*100:.1f}%)")
print(f"With KEGG_ko: {eggnog['KEGG_ko'].notna().sum():,} ({eggnog['KEGG_ko'].notna().mean()*100:.1f}%)")
print(f"With KEGG_Pathway: {eggnog['KEGG_Pathway'].notna().sum():,} ({eggnog['KEGG_Pathway'].notna().mean()*100:.1f}%)")
6. Extract Fitness Data for FB Organisms¶
Get per-gene fitness summary statistics. We aggregate across all experimental conditions:
- Mean absolute fitness (overall importance)
- Number of conditions with significant fitness effect (|fit| > 1, |t| > 4)
- Min/max fitness (extreme effects)
Also identify putative essential genes (protein-coding genes absent from genefitness).
# Get mapped orgIds
target_orgIds = org_mapping_resolved['orgId'].tolist()
orgId_in = "','".join(target_orgIds)
print(f"Extracting fitness data for {len(target_orgIds)} organisms...")
# Aggregate fitness per gene across all conditions
fitness_stats = spark.sql(f"""
SELECT
orgId,
locusId,
COUNT(*) as n_experiments,
AVG(CAST(fit AS FLOAT)) as mean_fit,
AVG(ABS(CAST(fit AS FLOAT))) as mean_abs_fit,
MIN(CAST(fit AS FLOAT)) as min_fit,
MAX(CAST(fit AS FLOAT)) as max_fit,
SUM(CASE WHEN ABS(CAST(fit AS FLOAT)) > 1 AND ABS(CAST(t AS FLOAT)) > 4 THEN 1 ELSE 0 END) as n_sig_important,
SUM(CASE WHEN CAST(fit AS FLOAT) < -1 AND ABS(CAST(t AS FLOAT)) > 4 THEN 1 ELSE 0 END) as n_sig_sick,
SUM(CASE WHEN CAST(fit AS FLOAT) > 1 AND ABS(CAST(t AS FLOAT)) > 4 THEN 1 ELSE 0 END) as n_sig_benefit
FROM kescience_fitnessbrowser.genefitness
WHERE orgId IN ('{orgId_in}')
GROUP BY orgId, locusId
""").toPandas()
print(f"Fitness stats: {len(fitness_stats):,} gene entries")
print(f"Organisms: {fitness_stats['orgId'].nunique()}")
print(f"\nPer-organism gene counts:")
print(fitness_stats.groupby('orgId').size().describe())
# Identify putative essential genes: in gene table (type=1, CDS) but NOT in genefitness
# These genes had no viable transposon insertions -> likely essential
essential_genes = spark.sql(f"""
SELECT g.orgId, g.locusId, g.type, g.scaffoldId, g.begin, g.end
FROM kescience_fitnessbrowser.gene g
LEFT JOIN (
SELECT DISTINCT orgId, locusId
FROM kescience_fitnessbrowser.genefitness
WHERE orgId IN ('{orgId_in}')
) gf ON g.orgId = gf.orgId AND g.locusId = gf.locusId
WHERE g.orgId IN ('{orgId_in}')
AND g.type = '1' -- CDS only
AND gf.locusId IS NULL
""").toPandas()
print(f"Putative essential genes: {len(essential_genes):,}")
print(f"Organisms: {essential_genes['orgId'].nunique()}")
# Per-organism essentiality rate
total_cds = spark.sql(f"""
SELECT orgId, COUNT(*) as n_cds
FROM kescience_fitnessbrowser.gene
WHERE orgId IN ('{orgId_in}') AND type = '1'
GROUP BY orgId
""").toPandas()
ess_counts = essential_genes.groupby('orgId').size().reset_index(name='n_essential')
ess_summary = total_cds.merge(ess_counts, on='orgId', how='left').fillna(0)
ess_summary['n_essential'] = ess_summary['n_essential'].astype(int)
ess_summary['pct_essential'] = (ess_summary['n_essential'] / ess_summary['n_cds'] * 100).round(1)
print(f"\nEssentiality rates:")
print(f" Median: {ess_summary['pct_essential'].median():.1f}%")
print(f" Range: {ess_summary['pct_essential'].min():.1f}% - {ess_summary['pct_essential'].max():.1f}%")
7. Extract Species-Level GapMind Summary (All 27K Species)¶
For the cross-species analysis (NB04), we need pathway completeness aggregated to the species level for all 27,690 species. This is a large aggregation over 305M rows — expected runtime ~10-15 minutes.
%%time
# Two-stage aggregation for performance (see performance.md)
species_pathway_summary = spark.sql("""
WITH best_scores AS (
SELECT
clade_name,
genome_id,
pathway,
metabolic_category,
MAX(CASE score_category
WHEN 'complete' THEN 5
WHEN 'likely_complete' THEN 4
WHEN 'steps_missing_low' THEN 3
WHEN 'steps_missing_medium' THEN 2
WHEN 'not_present' THEN 1
ELSE 0
END) as best_score
FROM kbase_ke_pangenome.gapmind_pathways
GROUP BY clade_name, genome_id, pathway, metabolic_category
),
genome_stats AS (
SELECT
clade_name,
genome_id,
COUNT(DISTINCT pathway) as n_pathways,
SUM(CASE WHEN best_score >= 5 THEN 1 ELSE 0 END) as n_complete,
SUM(CASE WHEN best_score >= 4 THEN 1 ELSE 0 END) as n_likely_complete,
SUM(CASE WHEN best_score <= 1 THEN 1 ELSE 0 END) as n_absent
FROM best_scores
GROUP BY clade_name, genome_id
)
SELECT
clade_name,
COUNT(DISTINCT genome_id) as n_genomes,
AVG(n_pathways) as mean_n_pathways,
AVG(n_complete) as mean_complete,
STDDEV(n_complete) as std_complete,
AVG(n_likely_complete) as mean_likely_complete,
STDDEV(n_likely_complete) as std_likely_complete,
AVG(n_absent) as mean_absent,
MIN(n_complete) as min_complete,
MAX(n_complete) as max_complete
FROM genome_stats
GROUP BY clade_name
""").toPandas()
print(f"Species pathway summary: {len(species_pathway_summary):,} species")
print(f"\nDistribution of mean complete pathways:")
print(species_pathway_summary['mean_complete'].describe())
# Also get pangenome openness metrics for all species
pangenome_stats = spark.sql("""
SELECT
gtdb_species_clade_id,
CAST(no_genomes AS INT) as no_genomes,
CAST(no_gene_clusters AS INT) as no_gene_clusters,
CAST(no_core AS INT) as no_core,
CAST(no_aux_genome AS INT) as no_aux,
CAST(no_singleton_gene_clusters AS INT) as no_singleton
FROM kbase_ke_pangenome.pangenome
""").toPandas()
# Compute openness metrics
pangenome_stats['pct_core'] = (pangenome_stats['no_core'] / pangenome_stats['no_gene_clusters'] * 100).round(2)
pangenome_stats['pct_singleton'] = (pangenome_stats['no_singleton'] / pangenome_stats['no_gene_clusters'] * 100).round(2)
pangenome_stats['openness'] = (pangenome_stats['no_aux'] / pangenome_stats['no_gene_clusters']).round(4)
print(f"Pangenome stats: {len(pangenome_stats):,} species")
print(pangenome_stats[['no_genomes', 'no_gene_clusters', 'pct_core', 'pct_singleton', 'openness']].describe())
8. Save All Outputs¶
# Save organism mapping
org_mapping_resolved.to_csv(DATA_DIR / 'organism_mapping.csv', index=False)
print(f"Saved: organism_mapping.csv ({len(org_mapping_resolved)} rows)")
# Save GapMind scores for FB organisms
gapmind_fb.to_csv(DATA_DIR / 'fb_gapmind_scores.csv', index=False)
print(f"Saved: fb_gapmind_scores.csv ({len(gapmind_fb)} rows)")
# Save eggNOG annotations
eggnog.to_csv(DATA_DIR / 'fb_eggnog_annotations.csv', index=False)
print(f"Saved: fb_eggnog_annotations.csv ({len(eggnog):,} rows)")
# Save fitness stats
fitness_stats.to_csv(DATA_DIR / 'fb_fitness_stats.csv', index=False)
print(f"Saved: fb_fitness_stats.csv ({len(fitness_stats):,} rows)")
# Save essential genes
essential_genes.to_csv(DATA_DIR / 'fb_essential_genes.csv', index=False)
print(f"Saved: fb_essential_genes.csv ({len(essential_genes):,} rows)")
# Save species-level pathway summary
species_pathway_summary.to_csv(DATA_DIR / 'species_pathway_summary.csv', index=False)
print(f"Saved: species_pathway_summary.csv ({len(species_pathway_summary):,} rows)")
# Save pangenome stats
pangenome_stats.to_csv(DATA_DIR / 'pangenome_stats.csv', index=False)
print(f"Saved: pangenome_stats.csv ({len(pangenome_stats):,} rows)")
9. Summary¶
print("=" * 60)
print("NB01 DATA EXTRACTION SUMMARY")
print("=" * 60)
print(f"FB organisms mapped: {org_mapping_resolved['orgId'].nunique()}/{len(fb_orgs)}")
print(f"GTDB clades: {org_mapping_resolved['gtdb_species_clade_id'].nunique()}")
print(f"GapMind scores: {len(gapmind_fb)} genome-pathway pairs")
print(f" Complete pathways: {(gapmind_fb['best_score_rank'] >= 5).sum()}")
print(f" Likely complete: {(gapmind_fb['best_score_rank'] == 4).sum()}")
print(f" Absent: {(gapmind_fb['best_score_rank'] <= 1).sum()}")
print(f"eggNOG annotations: {len(eggnog):,} gene clusters")
print(f" With EC numbers: {eggnog['EC'].notna().sum():,}")
print(f"Fitness stats: {len(fitness_stats):,} genes")
print(f"Essential genes: {len(essential_genes):,}")
print(f"Species pathway summary: {len(species_pathway_summary):,} species")
print(f"Pangenome stats: {len(pangenome_stats):,} species")
print("=" * 60)
print(f"\nAll outputs saved to {DATA_DIR}/")
print("Next: Run NB02 (pathway-gene linking) on JupyterHub")