01 Organism Mapping
Jupyter notebook from the Conservation vs Fitness -- Linking FB Genes to Pangenome Clusters project.
NB01: Map Fitness Browser Organisms to Pangenome Clades¶
Map each of the 48 FB organisms to one or more pangenome species clades using three complementary strategies:
- NCBI taxid — match FB
taxonomyIdagainstgtdb_metadata.ncbi_taxid/ncbi_species_taxid - NCBI organism name — match genus+species against
gtdb_metadata.ncbi_organism_name(catches GTDB-renamed organisms) - Scaffold accession — match FB scaffold IDs against pangenome
gene.gene_idprefixes
Requires BERDL JupyterHub — get_spark_session() must be available.
Output: data/organism_mapping.tsv
Key notes¶
- GTDB renames many organisms (e.g. P. putida → Pseudomonas_E alloputida), so we must match on NCBI names/taxids
- FB organism may map to multiple clades (GTDB splits finer than NCBI) — resolved later by DIAMOND hit count
- E. coli (
Keio) is expected to be absent — the mains__Escherichia_coliclade was too large for the pangenome
In [ ]:
import pandas as pd
import numpy as np
from pathlib import Path
spark = get_spark_session()
print(f"Spark version: {spark.version}")
DATA_DIR = Path('../data')
DATA_DIR.mkdir(exist_ok=True)
1. Load Fitness Browser Organisms¶
In [ ]:
fb_orgs = spark.sql("""
SELECT orgId, genus, species, strain, taxonomyId
FROM kescience_fitnessbrowser.organism
ORDER BY genus, species
""").toPandas()
print(f"Total FB organisms: {len(fb_orgs)}")
fb_orgs.head(10)
In [ ]:
# Get scaffold IDs per organism for Strategy 3
fb_scaffolds = spark.sql("""
SELECT DISTINCT orgId, scaffoldId
FROM kescience_fitnessbrowser.gene
""").toPandas()
print(f"Total scaffold entries: {len(fb_scaffolds)}")
print(f"Organisms with scaffolds: {fb_scaffolds['orgId'].nunique()}")
# Show sample scaffold IDs to understand format
print("\nSample scaffold IDs:")
for org in fb_scaffolds['orgId'].unique()[:5]:
scaffs = fb_scaffolds[fb_scaffolds['orgId'] == org]['scaffoldId'].tolist()
print(f" {org}: {scaffs[:3]}")
2. Strategy 1: NCBI Taxid Matching¶
In [ ]:
# Collect all unique FB taxonomy IDs
tax_ids = fb_orgs['taxonomyId'].dropna().unique().tolist()
tax_id_str = ','.join([str(int(t)) for t in tax_ids])
taxid_matches = spark.sql(f"""
SELECT DISTINCT
m.ncbi_taxid,
m.ncbi_species_taxid,
m.accession,
g.gtdb_species_clade_id
FROM kbase_ke_pangenome.gtdb_metadata m
JOIN kbase_ke_pangenome.genome g ON m.accession = g.genome_id
WHERE CAST(m.ncbi_species_taxid AS INT) IN ({tax_id_str})
OR CAST(m.ncbi_taxid AS INT) IN ({tax_id_str})
""").toPandas()
print(f"Taxid matches: {len(taxid_matches)} genome-clade pairs")
print(f"Unique clades: {taxid_matches['gtdb_species_clade_id'].nunique()}")
# Map back to FB orgIds
taxid_mapping = []
for _, org in fb_orgs.iterrows():
tid = org['taxonomyId']
if pd.isna(tid):
continue
tid = int(tid)
hits = taxid_matches[
(taxid_matches['ncbi_species_taxid'].astype(str).str.strip() == str(tid)) |
(taxid_matches['ncbi_taxid'].astype(str).str.strip() == str(tid))
]
for clade in hits['gtdb_species_clade_id'].unique():
pg_genomes = hits[hits['gtdb_species_clade_id'] == clade]['accession'].tolist()
taxid_mapping.append({
'orgId': org['orgId'],
'genus': org['genus'],
'species': org['species'],
'strain': org['strain'],
'taxonomyId': tid,
'gtdb_species_clade_id': clade,
'pg_genome_id': pg_genomes[0], # representative genome
'match_method': 'taxid'
})
taxid_df = pd.DataFrame(taxid_mapping)
taxid_orgs = taxid_df['orgId'].nunique() if len(taxid_df) > 0 else 0
print(f"\nFB organisms matched by taxid: {taxid_orgs}/48")
3. Strategy 2: NCBI Organism Name Matching¶
The ncbi_organism_name field in gtdb_metadata preserves pre-GTDB nomenclature,
so searching by genus+species here catches organisms that GTDB has renamed.
In [ ]:
name_mapping = []
for _, org in fb_orgs.iterrows():
orgId = org['orgId']
genus = org['genus']
species = org['species']
strain = org['strain']
if pd.isna(genus) or pd.isna(species):
continue
# Try genus + species match
query = f"""
SELECT DISTINCT m.accession, g.gtdb_species_clade_id, m.ncbi_organism_name
FROM kbase_ke_pangenome.gtdb_metadata m
JOIN kbase_ke_pangenome.genome g ON m.accession = g.genome_id
WHERE m.ncbi_organism_name LIKE '%{genus}%{species}%'
"""
hits = spark.sql(query).toPandas()
# If strain is available, also try with strain for more specific match
if not pd.isna(strain) and str(strain).strip() and len(hits) > 0:
strain_query = f"""
SELECT DISTINCT m.accession, g.gtdb_species_clade_id, m.ncbi_organism_name
FROM kbase_ke_pangenome.gtdb_metadata m
JOIN kbase_ke_pangenome.genome g ON m.accession = g.genome_id
WHERE m.ncbi_organism_name LIKE '%{genus}%{species}%{strain}%'
"""
strain_hits = spark.sql(strain_query).toPandas()
if len(strain_hits) > 0:
hits = strain_hits # prefer strain-specific match
for clade in hits['gtdb_species_clade_id'].unique():
clade_hits = hits[hits['gtdb_species_clade_id'] == clade]
name_mapping.append({
'orgId': orgId,
'genus': genus,
'species': species,
'strain': strain,
'taxonomyId': org['taxonomyId'],
'gtdb_species_clade_id': clade,
'pg_genome_id': clade_hits['accession'].iloc[0],
'match_method': 'ncbi_name'
})
name_df = pd.DataFrame(name_mapping)
name_orgs = name_df['orgId'].nunique() if len(name_df) > 0 else 0
print(f"FB organisms matched by NCBI name: {name_orgs}/48")
# Show which new organisms this strategy found beyond taxid
if len(taxid_df) > 0 and len(name_df) > 0:
new_by_name = set(name_df['orgId']) - set(taxid_df['orgId'])
print(f"New organisms found only by NCBI name (not taxid): {len(new_by_name)}")
if new_by_name:
for oid in sorted(new_by_name):
row = name_df[name_df['orgId'] == oid].iloc[0]
print(f" {oid}: {row['genus']} {row['species']}")
4. Strategy 3: Scaffold Accession Matching¶
For FB organisms with RefSeq-style scaffold IDs (NC_, NZ_, CP, AE prefixes),
search for pangenome genes whose gene_id starts with the same accession.
This works because pangenome gene IDs embed the scaffold accession.
In [ ]:
scaffold_mapping = []
# Only try scaffolds with RefSeq-style prefixes
refseq_prefixes = ('NC_', 'NZ_', 'CP', 'AE', 'AP', 'BA', 'BX', 'CR', 'FN', 'FP', 'HE', 'AL', 'AM')
for _, org in fb_orgs.iterrows():
orgId = org['orgId']
org_scaffolds = fb_scaffolds[fb_scaffolds['orgId'] == orgId]['scaffoldId'].tolist()
# Filter to RefSeq-style scaffolds
refseq_scaffolds = [s for s in org_scaffolds if any(s.startswith(p) for p in refseq_prefixes)]
if not refseq_scaffolds:
continue
# Try the first scaffold (usually the main chromosome)
scaffold = refseq_scaffolds[0]
query = f"""
SELECT DISTINCT g.genome_id, g.gtdb_species_clade_id
FROM kbase_ke_pangenome.gene gene_t
JOIN kbase_ke_pangenome.genome g ON gene_t.genome_id = g.genome_id
WHERE gene_t.gene_id LIKE '{scaffold}%'
"""
hits = spark.sql(query).toPandas()
for clade in hits['gtdb_species_clade_id'].unique():
clade_hits = hits[hits['gtdb_species_clade_id'] == clade]
scaffold_mapping.append({
'orgId': orgId,
'genus': org['genus'],
'species': org['species'],
'strain': org['strain'],
'taxonomyId': org['taxonomyId'],
'gtdb_species_clade_id': clade,
'pg_genome_id': clade_hits['genome_id'].iloc[0],
'match_method': 'scaffold'
})
scaffold_df = pd.DataFrame(scaffold_mapping)
scaffold_orgs = scaffold_df['orgId'].nunique() if len(scaffold_df) > 0 else 0
print(f"FB organisms matched by scaffold accession: {scaffold_orgs}/48")
# Show which new organisms this strategy found
already_matched = set()
if len(taxid_df) > 0:
already_matched |= set(taxid_df['orgId'])
if len(name_df) > 0:
already_matched |= set(name_df['orgId'])
if len(scaffold_df) > 0:
new_by_scaffold = set(scaffold_df['orgId']) - already_matched
print(f"New organisms found only by scaffold (not taxid/name): {len(new_by_scaffold)}")
if new_by_scaffold:
for oid in sorted(new_by_scaffold):
row = scaffold_df[scaffold_df['orgId'] == oid].iloc[0]
print(f" {oid}: {row['genus']} {row['species']}")
5. Combine All Strategies¶
In [ ]:
# Combine all mappings, keeping match_method to show provenance
all_mappings = pd.concat([taxid_df, name_df, scaffold_df], ignore_index=True)
# Deduplicate: same orgId + clade should only appear once, prefer taxid > scaffold > ncbi_name
method_priority = {'taxid': 0, 'scaffold': 1, 'ncbi_name': 2}
all_mappings['priority'] = all_mappings['match_method'].map(method_priority)
all_mappings = all_mappings.sort_values('priority').drop_duplicates(
subset=['orgId', 'gtdb_species_clade_id'], keep='first'
).drop(columns='priority').sort_values(['orgId', 'gtdb_species_clade_id'])
print(f"Combined mapping: {len(all_mappings)} orgId x clade pairs")
print(f"Unique FB organisms matched: {all_mappings['orgId'].nunique()}/48")
print(f"Unique pangenome clades: {all_mappings['gtdb_species_clade_id'].nunique()}")
print(f"\nMatch method breakdown:")
print(all_mappings['match_method'].value_counts())
6. Quality Control¶
In [ ]:
# Unmatched organisms
matched_orgs = set(all_mappings['orgId'])
unmatched = fb_orgs[~fb_orgs['orgId'].isin(matched_orgs)]
print(f"=== UNMATCHED ORGANISMS ({len(unmatched)}/48) ===")
if len(unmatched) > 0:
for _, row in unmatched.iterrows():
print(f" {row['orgId']:25s} {row['genus']} {row['species']} "
f"(strain={row['strain']}, taxid={row['taxonomyId']})")
else:
print(" All organisms matched!")
# Note E. coli absence
ecoli_orgs = fb_orgs[fb_orgs['species'].str.contains('coli', case=False, na=False)]
if len(ecoli_orgs) > 0:
for _, row in ecoli_orgs.iterrows():
status = 'MATCHED' if row['orgId'] in matched_orgs else 'UNMATCHED'
print(f"\nE. coli check: {row['orgId']} ({row['genus']} {row['species']} "
f"{row['strain']}) — {status}")
if status == 'UNMATCHED':
print(" Expected: main s__Escherichia_coli clade absent from pangenome (too large)")
In [ ]:
# Multi-clade organisms
clades_per_org = all_mappings.groupby('orgId')['gtdb_species_clade_id'].nunique().reset_index()
clades_per_org.columns = ['orgId', 'n_clades']
multi_clade = clades_per_org[clades_per_org['n_clades'] > 1]
print(f"=== MULTI-CLADE ORGANISMS ({len(multi_clade)}) ===")
for _, row in multi_clade.iterrows():
orgId = row['orgId']
org_info = fb_orgs[fb_orgs['orgId'] == orgId].iloc[0]
clades = all_mappings[all_mappings['orgId'] == orgId]['gtdb_species_clade_id'].tolist()
print(f" {orgId} ({org_info['genus']} {org_info['species']}): {row['n_clades']} clades")
for c in clades:
method = all_mappings[(all_mappings['orgId'] == orgId) &
(all_mappings['gtdb_species_clade_id'] == c)]['match_method'].iloc[0]
print(f" - {c} (via {method})")
print(f"\nSingle-clade organisms: {len(clades_per_org) - len(multi_clade)}")
In [ ]:
# Cross-check: show GTDB species names for all matched clades
# to verify they are taxonomically reasonable
unique_clades = all_mappings['gtdb_species_clade_id'].unique().tolist()
clade_str = "','".join(unique_clades)
clade_info = spark.sql(f"""
SELECT gtdb_species_clade_id, GTDB_species,
COUNT(*) as n_genomes
FROM kbase_ke_pangenome.gtdb_species_clade
WHERE gtdb_species_clade_id IN ('{clade_str}')
GROUP BY gtdb_species_clade_id, GTDB_species
""").toPandas()
print("=== CLADE CROSS-CHECK ===")
summary = all_mappings[['orgId', 'genus', 'species', 'gtdb_species_clade_id']].drop_duplicates()
summary = summary.merge(clade_info, on='gtdb_species_clade_id', how='left')
print(summary[['orgId', 'genus', 'species', 'gtdb_species_clade_id',
'GTDB_species', 'n_genomes']].to_string(index=False))
7. Save Organism Mapping¶
In [ ]:
output_path = DATA_DIR / 'organism_mapping.tsv'
all_mappings.to_csv(output_path, sep='\t', index=False)
print(f"Saved: {output_path}")
print(f" {len(all_mappings)} rows (orgId x clade pairs)")
print(f" {all_mappings['orgId'].nunique()} unique FB organisms")
print(f" {all_mappings['gtdb_species_clade_id'].nunique()} unique pangenome clades")
In [ ]:
print("=" * 60)
print("NB01 SUMMARY")
print("=" * 60)
print(f"FB organisms: {len(fb_orgs)}")
print(f"Matched: {all_mappings['orgId'].nunique()}")
print(f"Unmatched: {len(unmatched)}")
print(f"Multi-clade: {len(multi_clade)}")
print(f"Unique clades: {all_mappings['gtdb_species_clade_id'].nunique()}")
print(f"Match methods: {dict(all_mappings['match_method'].value_counts())}")
print(f"\nOutput: {output_path}")
print("=" * 60)