01 Data Extraction
Jupyter notebook from the Metabolic Capability vs Dependency project.
NB01: Data Extraction¶
Requires BERDL JupyterHub — get_spark_session() only available there.
Purpose: Extract and cache all data needed for the Metabolic Capability vs Dependency analysis. Leverages upstream project data where available (essential_metabolome, essential_genome).
v2 changes (2026-02-19):
- Added GapMind-FB organism coverage verification (essential_metabolome found only 7/45 covered)
- Added condition-type fitness extraction (per field_vs_lab_fitness methodology)
- Added KEGG KO-based annotations (more precise than KEGG pathway maps)
- Fixed: large extracts use Spark
.write.csv()instead of.toPandas()to avoid OOM - Upstream data check: reuse fb_genome_mapping from essential_metabolome
Outputs:
data/gapmind_genome_pathway_status.csv— Per-genome pathway completeness (best score, scope=all)data/gapmind_core_pathway_status.csv— Per-genome pathway completeness (core genes only)data/species_pathway_summary/— Species-level pathway completion fractions (partitioned Spark output)data/pangenome_stats.csv— Pangenome openness metricsdata/fb_fitness_summary.csv— Per-gene fitness summary for FB organismsdata/fb_fitness_by_condition_type.csv— Per-gene fitness stratified by condition typedata/fb_kegg_annotations.csv— KEGG KO + pathway annotations for metabolic gene clustersdata/fb_essential_genes.csv— Putative essential genes (no transposon insertions)data/gapmind_fb_coverage.csv— GapMind coverage for each FB organism
# Initialize Spark session
spark = get_spark_session()
import os
import pandas as pd
# Output directory
DATA_DIR = '../data'
os.makedirs(DATA_DIR, exist_ok=True)
# Upstream project data paths
UPSTREAM = {
'essential_metabolome': '../../essential_metabolome/data',
'essential_genome': '../../essential_genome/data',
'conservation_vs_fitness': '../../conservation_vs_fitness/data',
'fitness_effects_conservation': '../../fitness_effects_conservation/data',
}
print('Spark session ready')
print(f'Output directory: {os.path.abspath(DATA_DIR)}')
print()
# Check upstream data availability
for project, path in UPSTREAM.items():
if os.path.exists(path):
files = os.listdir(path)
print(f' {project}: {len(files)} files available')
for f in sorted(files):
size = os.path.getsize(os.path.join(path, f)) / 1e6
print(f' {f} ({size:.1f} MB)')
else:
print(f' {project}: NOT FOUND locally (may need JupyterHub extraction)')
print()
# Load FB-genome mapping from essential_metabolome if available
fb_genome_map_path = os.path.join(UPSTREAM['essential_metabolome'], 'fb_genome_mapping_manual.tsv')
if os.path.exists(fb_genome_map_path):
fb_genome_map = pd.read_csv(fb_genome_map_path, sep='\t')
print(f'Loaded FB-genome mapping: {len(fb_genome_map)} organisms')
print(fb_genome_map[['orgId', 'genome_id']].head())
else:
fb_genome_map = None
print('FB-genome mapping not found — will query from BERDL')
1a. GapMind-FB Organism Coverage Check¶
Critical: essential_metabolome found only 7 of 45 FB organisms had GapMind data. Verify coverage for all 48 FB organisms before designing the analysis.
output_path = f'{DATA_DIR}/gapmind_fb_coverage.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, loading...')
coverage = pd.read_csv(output_path)
else:
print('Checking GapMind coverage for FB organisms...')
# Get all FB organisms
fb_orgs = spark.sql("""
SELECT DISTINCT orgId FROM kescience_fitnessbrowser.gene
WHERE CAST(type AS INT) = 1
""").toPandas()
# Get all distinct clade_names in GapMind
gapmind_clades = spark.sql("""
SELECT DISTINCT clade_name FROM kbase_ke_pangenome.gapmind_pathways
""").toPandas()
# Get organism -> species mapping from FB gene table + pangenome
# FB organisms are mapped to GTDB species clades via the link table
# For now, check GapMind clade_names against known FB species
# Also check: how many distinct genomes per clade in GapMind?
gapmind_genome_counts = spark.sql("""
SELECT clade_name, COUNT(DISTINCT genome_id) as n_genomes,
COUNT(DISTINCT pathway) as n_pathways
FROM kbase_ke_pangenome.gapmind_pathways
WHERE sequence_scope = 'all'
GROUP BY clade_name
""").toPandas()
print(f'Total FB organisms: {len(fb_orgs)}')
print(f'Total GapMind clades: {len(gapmind_clades)}')
print(f'GapMind clades with data: {len(gapmind_genome_counts)}')
# If we have the FB-genome mapping, check coverage directly
if fb_genome_map is not None:
# Strip RS_/GB_ prefix from genome_id to match GapMind format
fb_genome_map['gapmind_genome_id'] = fb_genome_map['genome_id'].str.replace(r'^(RS_|GB_)', '', regex=True)
# Check which FB genomes appear in GapMind
gapmind_genomes = spark.sql("""
SELECT DISTINCT genome_id FROM kbase_ke_pangenome.gapmind_pathways
""").toPandas()
fb_genome_map['in_gapmind'] = fb_genome_map['gapmind_genome_id'].isin(gapmind_genomes['genome_id'])
coverage = fb_genome_map[['orgId', 'genome_id', 'gapmind_genome_id', 'in_gapmind']]
n_covered = coverage['in_gapmind'].sum()
print(f'\nFB organisms with GapMind data: {n_covered}/{len(coverage)}')
print(f'Coverage rate: {n_covered/len(coverage)*100:.1f}%')
print('\nCovered:')
print(coverage[coverage['in_gapmind']])
print('\nNOT covered:')
print(coverage[~coverage['in_gapmind']])
else:
print('No FB-genome mapping available — cannot check per-organism coverage')
coverage = gapmind_genome_counts
coverage.to_csv(output_path, index=False)
print(f'\nSaved: {output_path}')
1. GapMind Pathway Status (All Genes)¶
Aggregate gapmind_pathways to get the best pathway score per genome × pathway.
The table has multiple rows per pair (different sequence scopes and alternative routes).
We take the maximum score across all routes within each sequence scope.
output_path = f'{DATA_DIR}/gapmind_genome_pathway_status.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Extracting GapMind pathway status (scope=all)...')
gapmind_all = spark.sql("""
SELECT
genome_id,
clade_name,
pathway,
metabolic_category,
MAX(score) as best_score,
MAX(score_simplified) as is_complete,
MAX(nHi) as max_nHi,
MAX(CASE WHEN score_category = 'complete' THEN 1 ELSE 0 END) as is_fully_complete,
MAX(CASE WHEN score_category = 'likely_complete' THEN 1 ELSE 0 END) as is_likely_complete
FROM kbase_ke_pangenome.gapmind_pathways
WHERE sequence_scope = 'all'
GROUP BY genome_id, clade_name, pathway, metabolic_category
""")
n_rows = gapmind_all.count()
print(f'Aggregated rows: {n_rows:,}')
# This should be ~293K genomes × 80 pathways = ~23.4M rows
# Safe to collect to pandas for CSV export
gapmind_all_pd = gapmind_all.toPandas()
gapmind_all_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(gapmind_all_pd):,} rows)')
2. GapMind Pathway Status (Core Genes Only)¶
Same aggregation but for sequence_scope = 'core' only.
Pathways that are complete with all genes but incomplete with core genes only
indicate that accessory genes are required — these are interesting for the BQH analysis.
output_path = f'{DATA_DIR}/gapmind_core_pathway_status.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Extracting GapMind pathway status (scope=core)...')
gapmind_core = spark.sql("""
SELECT
genome_id,
clade_name,
pathway,
metabolic_category,
MAX(score) as best_score_core,
MAX(score_simplified) as is_complete_core
FROM kbase_ke_pangenome.gapmind_pathways
WHERE sequence_scope = 'core'
GROUP BY genome_id, clade_name, pathway, metabolic_category
""")
gapmind_core_pd = gapmind_core.toPandas()
gapmind_core_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(gapmind_core_pd):,} rows)')
3. Species-Level Pathway Summary¶
Aggregate pathway completeness to species level: for each species × pathway, what fraction of genomes have the pathway complete?
output_path = f'{DATA_DIR}/species_pathway_summary'
output_csv = f'{DATA_DIR}/species_pathway_summary.csv'
if os.path.exists(output_csv):
print(f'Already exists: {output_csv}, skipping')
else:
print('Computing species-level pathway summary...')
print('NOTE: This produces ~27K species × 80 pathways = ~2M rows. Using Spark write.')
species_summary = spark.sql("""
WITH genome_pathway AS (
SELECT
genome_id,
clade_name,
pathway,
metabolic_category,
MAX(score_simplified) as is_complete
FROM kbase_ke_pangenome.gapmind_pathways
WHERE sequence_scope = 'all'
GROUP BY genome_id, clade_name, pathway, metabolic_category
)
SELECT
clade_name,
pathway,
metabolic_category,
COUNT(*) as n_genomes,
SUM(CAST(is_complete AS INT)) as n_complete,
AVG(is_complete) as frac_complete
FROM genome_pathway
GROUP BY clade_name, pathway, metabolic_category
""")
n_rows = species_summary.count()
print(f'Aggregated rows: {n_rows:,}')
# This is ~2M rows — safe for toPandas but write as Spark first for safety
species_summary_pd = species_summary.toPandas()
species_summary_pd.to_csv(output_csv, index=False)
print(f'Saved: {output_csv} ({len(species_summary_pd):,} rows)')
n_species = species_summary_pd['clade_name'].nunique()
n_pathways = species_summary_pd['pathway'].nunique()
print(f'Species: {n_species:,}, Pathways: {n_pathways}')
4. Pangenome Stats¶
Extract pangenome openness metrics for all species.
output_path = f'{DATA_DIR}/pangenome_stats.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Extracting pangenome stats...')
pangenome = spark.sql("""
SELECT
gtdb_species_clade_id,
no_genomes,
no_gene_clusters,
no_core,
no_aux_genome,
no_singleton_gene_clusters
FROM kbase_ke_pangenome.pangenome
""")
pangenome_pd = pangenome.toPandas()
# Convert to numeric (may come as strings)
for col in ['no_genomes', 'no_gene_clusters', 'no_core', 'no_aux_genome', 'no_singleton_gene_clusters']:
pangenome_pd[col] = pd.to_numeric(pangenome_pd[col], errors='coerce')
# Calculate openness metrics
pangenome_pd['frac_singleton'] = pangenome_pd['no_singleton_gene_clusters'] / pangenome_pd['no_gene_clusters']
pangenome_pd['frac_core'] = pangenome_pd['no_core'] / pangenome_pd['no_gene_clusters']
pangenome_pd['frac_accessory'] = pangenome_pd['no_aux_genome'] / pangenome_pd['no_gene_clusters']
pangenome_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(pangenome_pd):,} rows)')
5. Fitness Browser — Gene Fitness Summary¶
For each FB organism, extract per-gene fitness summary statistics across all conditions. v2: Added fitness breadth metrics per fitness_effects_conservation methodology: breadth = fraction of conditions where gene has |fitness| > 1.
output_path = f'{DATA_DIR}/fb_fitness_summary.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Extracting FB fitness summary...')
# v2: Added fitness_breadth (fraction of conditions with strong phenotype)
# per fitness_effects_conservation methodology
fb_fitness = spark.sql("""
SELECT
orgId,
locusId,
COUNT(*) as n_conditions,
AVG(CAST(fit AS FLOAT)) as mean_fitness,
PERCENTILE_APPROX(CAST(fit AS FLOAT), 0.5) as median_fitness,
MIN(CAST(fit AS FLOAT)) as min_fitness,
MAX(CAST(fit AS FLOAT)) as max_fitness,
STDDEV(CAST(fit AS FLOAT)) as std_fitness,
SUM(CASE WHEN ABS(CAST(fit AS FLOAT)) > 1.0 THEN 1 ELSE 0 END) as n_strong_phenotype,
SUM(CASE WHEN CAST(fit AS FLOAT) < -1.0 THEN 1 ELSE 0 END) as n_sick,
SUM(CASE WHEN CAST(fit AS FLOAT) > 1.0 THEN 1 ELSE 0 END) as n_beneficial,
SUM(CASE WHEN ABS(CAST(fit AS FLOAT)) > 2.0 THEN 1 ELSE 0 END) as n_very_strong,
-- v2: explicit breadth metrics
SUM(CASE WHEN ABS(CAST(fit AS FLOAT)) > 1.0 THEN 1 ELSE 0 END) / COUNT(*) as fitness_breadth,
SUM(CASE WHEN CAST(fit AS FLOAT) < -1.0 THEN 1 ELSE 0 END) / COUNT(*) as sick_breadth,
SUM(CASE WHEN CAST(fit AS FLOAT) < -2.0 THEN 1 ELSE 0 END) / COUNT(*) as severe_sick_breadth
FROM kescience_fitnessbrowser.genefitness
GROUP BY orgId, locusId
""")
fb_fitness_pd = fb_fitness.toPandas()
fb_fitness_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(fb_fitness_pd):,} rows)')
print(f'Organisms: {fb_fitness_pd["orgId"].nunique()}')
print(f'\nFitness breadth distribution:')
print(fb_fitness_pd['fitness_breadth'].describe())
5b. Condition-Type Fitness (v2)¶
New in v2: Per field_vs_lab_fitness and core_gene_tradeoffs, condition type matters. Genes important under stress may appear neutral in rich media. Extract experiment metadata and compute per-gene fitness by condition type.
output_path = f'{DATA_DIR}/fb_fitness_by_condition_type.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Extracting condition-type fitness...')
# Classify experiments by condition type using the experiment table
# Note: table is 'experiment' (not 'exps'), columns are expName, expGroup, condition_1
# See docs/pitfalls.md [pathway_capability_dependency] for details
# Classification logic adapted from field_vs_lab_fitness project
cond_fitness = spark.sql("""
WITH exp_types AS (
SELECT
expName,
orgId,
CASE
WHEN LOWER(condition_1) LIKE '%carbon%'
OR LOWER(expGroup) LIKE '%carbon%'
OR LOWER(condition_1) LIKE '%glucose%'
OR LOWER(condition_1) LIKE '%sugar%'
OR LOWER(condition_1) LIKE '%acetate%'
OR LOWER(condition_1) LIKE '%lactate%'
OR LOWER(condition_1) LIKE '%glycerol%'
THEN 'carbon_source'
WHEN LOWER(condition_1) LIKE '%nitrogen%'
OR LOWER(expGroup) LIKE '%nitrogen%'
OR LOWER(condition_1) LIKE '%ammonium%'
OR LOWER(condition_1) LIKE '%amino acid%'
THEN 'nitrogen_source'
WHEN LOWER(condition_1) LIKE '%stress%'
OR LOWER(condition_1) LIKE '%metal%'
OR LOWER(condition_1) LIKE '%antibiotic%'
OR LOWER(condition_1) LIKE '%salt%'
OR LOWER(condition_1) LIKE '%pH%'
OR LOWER(condition_1) LIKE '%temperature%'
OR LOWER(condition_1) LIKE '%oxidative%'
OR LOWER(condition_1) LIKE '%osmotic%'
THEN 'stress'
ELSE 'other'
END as condition_type
FROM kescience_fitnessbrowser.experiment
)
SELECT
gf.orgId,
gf.locusId,
et.condition_type,
COUNT(*) as n_conditions,
AVG(CAST(gf.fit AS FLOAT)) as mean_fitness,
PERCENTILE_APPROX(CAST(gf.fit AS FLOAT), 0.5) as median_fitness,
MIN(CAST(gf.fit AS FLOAT)) as min_fitness,
SUM(CASE WHEN CAST(gf.fit AS FLOAT) < -1.0 THEN 1 ELSE 0 END) as n_sick,
SUM(CASE WHEN ABS(CAST(gf.fit AS FLOAT)) > 1.0 THEN 1 ELSE 0 END) as n_strong
FROM kescience_fitnessbrowser.genefitness gf
JOIN exp_types et ON gf.expName = et.expName AND gf.orgId = et.orgId
GROUP BY gf.orgId, gf.locusId, et.condition_type
""")
cond_fitness_pd = cond_fitness.toPandas()
cond_fitness_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(cond_fitness_pd):,} rows)')
print(f'\nCondition type distribution:')
print(cond_fitness_pd.groupby('condition_type')['n_conditions'].sum())
print(f'\nOrganism × condition type pairs: {cond_fitness_pd.groupby(["orgId", "condition_type"]).ngroups}')
6. FB-Pangenome Link Table¶
Check if the link table exists from the conservation_vs_fitness project.
If not available locally, we'll need to rebuild it or copy from JupyterHub.
# Check for existing link table from conservation_vs_fitness
link_paths = [
'../data/fb_pangenome_link.tsv',
'../../conservation_vs_fitness/data/fb_pangenome_link.tsv',
'../../essential_genome/data/all_ortholog_groups.csv',
]
found = None
for p in link_paths:
if os.path.exists(p):
found = p
print(f'Found link data: {p}')
df = pd.read_csv(p, sep='\t' if p.endswith('.tsv') else ',')
print(f' Rows: {len(df):,}')
print(f' Columns: {list(df.columns)}')
print(f' Organisms: {df.iloc[:, 1].nunique() if len(df.columns) > 1 else "?"}')
break
if not found:
print('No link table found locally.')
print('The link table must be rebuilt or copied from the conservation_vs_fitness project.')
print('For now, we will use the ortholog groups from essential_genome as a starting point.')
7. KEGG Annotations for Metabolic Gene Clusters¶
Extract eggNOG annotations (KEGG orthologs/KOs and EC numbers) for gene clusters.
v2 change: Extract KEGG_ko (KO numbers) in addition to KEGG_Pathway map IDs. KOs provide more precise functional assignment than broad KEGG pathway maps. This addresses the lossy KEGG map → GapMind pathway mapping from v1.
output_path = f'{DATA_DIR}/fb_kegg_annotations.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
# v2: Extract KEGG_ko (orthologs) + EC + KEGG_Pathway for metabolic genes
# Use broader filter: any gene with KEGG_ko or EC annotation in metabolic categories
# This captures more genes than filtering on KEGG_Pathway alone
print('Extracting KEGG KO + pathway annotations for metabolic genes...')
print('This queries the 93M-row eggnog_mapper_annotations table...')
kegg_annot = spark.sql("""
SELECT
query_name as gene_cluster_id,
EC,
KEGG_ko,
KEGG_Pathway,
COG_category,
Description
FROM kbase_ke_pangenome.eggnog_mapper_annotations
WHERE (
-- Amino acid biosynthesis/metabolism KEGG pathways
KEGG_Pathway LIKE '%map00220%' -- Arginine biosynthesis
OR KEGG_Pathway LIKE '%map00260%' -- Glycine, serine, threonine
OR KEGG_Pathway LIKE '%map00270%' -- Cysteine and methionine
OR KEGG_Pathway LIKE '%map00290%' -- Valine, leucine, isoleucine biosynthesis
OR KEGG_Pathway LIKE '%map00300%' -- Lysine biosynthesis
OR KEGG_Pathway LIKE '%map00330%' -- Arginine and proline metabolism
OR KEGG_Pathway LIKE '%map00340%' -- Histidine metabolism
OR KEGG_Pathway LIKE '%map00350%' -- Tyrosine metabolism
OR KEGG_Pathway LIKE '%map00360%' -- Phenylalanine metabolism
OR KEGG_Pathway LIKE '%map00380%' -- Tryptophan metabolism
OR KEGG_Pathway LIKE '%map00400%' -- Phe, Tyr, Trp biosynthesis
OR KEGG_Pathway LIKE '%map00250%' -- Alanine, aspartate, glutamate
-- Carbon source utilization KEGG pathways
OR KEGG_Pathway LIKE '%map00010%' -- Glycolysis
OR KEGG_Pathway LIKE '%map00020%' -- TCA cycle
OR KEGG_Pathway LIKE '%map00030%' -- Pentose phosphate
OR KEGG_Pathway LIKE '%map00040%' -- Pentose and glucuronate
OR KEGG_Pathway LIKE '%map00051%' -- Fructose and mannose
OR KEGG_Pathway LIKE '%map00052%' -- Galactose
OR KEGG_Pathway LIKE '%map00500%' -- Starch and sucrose
OR KEGG_Pathway LIKE '%map00520%' -- Amino sugar and nucleotide sugar
OR KEGG_Pathway LIKE '%map00620%' -- Pyruvate metabolism
OR KEGG_Pathway LIKE '%map00630%' -- Glyoxylate and dicarboxylate
OR KEGG_Pathway LIKE '%map00640%' -- Propanoate metabolism
OR KEGG_Pathway LIKE '%map00650%' -- Butanoate metabolism
OR KEGG_Pathway LIKE '%map00660%' -- C5-Branched dibasic acid
OR KEGG_Pathway LIKE '%map00562%' -- Inositol phosphate
-- v2: Also capture genes with metabolic KO annotations
-- even if they lack KEGG_Pathway assignment
OR COG_category LIKE '%E%' -- Amino acid transport and metabolism
OR COG_category LIKE '%G%' -- Carbohydrate transport and metabolism
OR COG_category LIKE '%C%' -- Energy production and conversion
)
AND (KEGG_ko != '-' OR EC != '-' OR KEGG_Pathway != '-')
""")
kegg_pd = kegg_annot.toPandas()
kegg_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(kegg_pd):,} rows)')
print(f'Unique gene clusters: {kegg_pd["gene_cluster_id"].nunique():,}')
print(f'\nWith KEGG_ko: {(kegg_pd["KEGG_ko"] != "-").sum():,}')
print(f'With EC: {(kegg_pd["EC"] != "-").sum():,}')
print(f'With KEGG_Pathway: {(kegg_pd["KEGG_Pathway"] != "-").sum():,}')
print(f'\nCOG category distribution:')
print(kegg_pd['COG_category'].value_counts().head(10))
8. Identify Essential Genes (No Fitness Data = Putative Essential)¶
Genes present in the FB gene table but absent from genefitness are putative essentials (no viable transposon insertions). These represent the strongest form of dependency.
output_path = f'{DATA_DIR}/fb_essential_genes.csv'
if os.path.exists(output_path):
print(f'Already exists: {output_path}, skipping')
else:
print('Identifying putative essential genes...')
essentials = spark.sql("""
SELECT g.orgId, g.locusId, g.sysName, g.gene as gene_name, g.desc as gene_desc
FROM kescience_fitnessbrowser.gene g
LEFT JOIN (
SELECT DISTINCT orgId, locusId FROM kescience_fitnessbrowser.genefitness
) gf ON g.orgId = gf.orgId AND g.locusId = gf.locusId
WHERE CAST(g.type AS INT) = 1 -- protein-coding genes only
AND gf.locusId IS NULL -- no fitness data = putative essential
""")
essentials_pd = essentials.toPandas()
essentials_pd.to_csv(output_path, index=False)
print(f'Saved: {output_path} ({len(essentials_pd):,} putative essential genes)')
print(f'Per organism:')
print(essentials_pd.groupby('orgId').size().describe())
9. Summary¶
List all generated files and their sizes.
import glob
print('Generated data files:')
print('=' * 60)
for f in sorted(glob.glob(f'{DATA_DIR}/*.csv')):
size_mb = os.path.getsize(f) / 1e6
df = pd.read_csv(f, nrows=0)
print(f' {os.path.basename(f):45s} {size_mb:8.1f} MB cols={len(df.columns)}')
print('\nReady for NB02 (Tier 1 classification) and NB03 (Tier 2 conservation).')