02 Taxonomy Bridge Functional Features
Jupyter notebook from the Contamination Gradient vs Functional Potential in ENIGMA Communities project.
NB02: Taxonomy Bridge and Functional Features¶
Build ENIGMA genus -> pangenome genus bridge and compute genus-level functional proxies from eggNOG COG categories.
Inputs:
../data/community_taxon_counts.tsv
Outputs:
../data/taxon_bridge.tsv../data/taxon_functional_features.tsv
Feature Assumptions and Sensitivity¶
strict_single_cladeuses one representative clade per genus (largestno_genomes).relaxed_all_cladesaggregates across all matched clades per genus.- Mobilome proxy uses COG categories
XorLto avoid zero-variance collapse seen withXalone.
In [1]:
from pathlib import Path
import re
import pandas as pd
DATA_DIR = Path('../data')
community = pd.read_csv(DATA_DIR / 'community_taxon_counts.tsv', sep=' ')
print('Community rows:', len(community))
print('Unique ENIGMA genera:', community['genus'].nunique())
spark = get_spark_session()
print('Spark session ready')
Community rows: 41711 Unique ENIGMA genera: 1392
Spark session ready
In [2]:
def norm_genus(x: str) -> str:
if pd.isna(x):
return ''
x = str(x).strip().lower()
x = re.sub(r'^g__', '', x)
x = re.sub(r'[^a-z0-9]+', '_', x)
return x.strip('_')
community_genus = (
community[['genus']]
.dropna()
.drop_duplicates()
.assign(genus_norm=lambda d: d['genus'].map(norm_genus))
)
community_genus = community_genus[community_genus['genus_norm'] != '']
print('Community genus candidates:', len(community_genus))
Community genus candidates: 1392
In [3]:
species = spark.sql("""
SELECT DISTINCT *
FROM kbase_ke_pangenome.gtdb_species_clade
""").toPandas()
print('gtdb_species_clade columns:', species.columns.tolist())
species_col = None
for c in ['GTDB_species', 'gtdb_species', 'species', 'gtdb_species_name']:
if c in species.columns:
species_col = c
break
if species_col is None:
raise RuntimeError('Could not find species name column in gtdb_species_clade')
if 'gtdb_species_clade_id' not in species.columns:
raise RuntimeError('Expected gtdb_species_clade_id in gtdb_species_clade')
species = species[['gtdb_species_clade_id', species_col]].rename(columns={species_col: 'species_name'})
species['genus_guess'] = (
species['species_name']
.astype(str)
.str.replace('s__', '', regex=False)
.str.split('_')
.str[0]
)
species['genus_norm'] = species['genus_guess'].map(norm_genus)
species = species[species['genus_norm'] != ''].drop_duplicates()
print('Species rows with genus parsed:', len(species))
gtdb_species_clade columns: ['gtdb_species_clade_id', 'representative_genome_id', 'GTDB_species', 'GTDB_taxonomy', 'ANI_circumscription_radius', 'mean_intra_species_ANI', 'min_intra_species_ANI', 'mean_intra_species_AF', 'min_intra_species_AF', 'no_clustered_genomes_unfiltered', 'no_clustered_genomes_filtered'] Species rows with genus parsed: 27690
In [4]:
bridge = community_genus.merge(
species[['genus_norm', 'gtdb_species_clade_id', 'species_name']],
on='genus_norm',
how='left'
)
bridge['mapping_tier'] = bridge['gtdb_species_clade_id'].apply(lambda v: 'genus_exact' if pd.notna(v) else 'unmapped')
clade_counts = (
bridge[bridge['mapping_tier'] == 'genus_exact']
.groupby('genus_norm')['gtdb_species_clade_id']
.nunique()
.rename('n_species_clades_for_genus')
.reset_index()
)
bridge = bridge.merge(clade_counts, on='genus_norm', how='left')
bridge['n_species_clades_for_genus'] = bridge['n_species_clades_for_genus'].fillna(0).astype(int)
bridge['resolution_tier'] = bridge['n_species_clades_for_genus'].apply(
lambda n: 'species_proxy_unique' if n == 1 else ('multi_species_ambiguous' if n > 1 else 'unmapped')
)
mapped = bridge[bridge['mapping_tier'] == 'genus_exact'].copy()
print('Mapped genera:', mapped['genus'].nunique())
print('Unmapped genera:', (bridge['mapping_tier'] == 'unmapped').sum())
print('Unique-clade genera (species-proxy resolvable):', bridge.loc[bridge['resolution_tier'] == 'species_proxy_unique', 'genus'].nunique())
print('Ambiguous multi-clade genera:', bridge.loc[bridge['resolution_tier'] == 'multi_species_ambiguous', 'genus'].nunique())
bridge.to_csv(DATA_DIR / 'taxon_bridge.tsv', sep=' ', index=False)
print('Saved bridge table')
Mapped genera: 530 Unmapped genera: 862 Unique-clade genera (species-proxy resolvable): 150 Ambiguous multi-clade genera: 380 Saved bridge table
In [5]:
mapped_clades = mapped[['genus', 'genus_norm', 'gtdb_species_clade_id', 'species_name', 'n_species_clades_for_genus']].drop_duplicates()
if mapped_clades.empty:
raise RuntimeError('No mapped genera. Cannot compute functional features.')
spark.createDataFrame(mapped_clades[['genus_norm', 'gtdb_species_clade_id']].drop_duplicates()).createOrReplaceTempView('mapped_clades_tmp')
# Pull clade sizes to define a strict representative-clade mode.
clade_sizes = spark.sql("""
SELECT p.gtdb_species_clade_id, CAST(p.no_genomes AS DOUBLE) AS no_genomes
FROM kbase_ke_pangenome.pangenome p
JOIN mapped_clades_tmp m ON p.gtdb_species_clade_id = m.gtdb_species_clade_id
""").toPandas()
mapped_clades2 = mapped_clades.merge(clade_sizes, on='gtdb_species_clade_id', how='left')
mapped_clades2['no_genomes'] = pd.to_numeric(mapped_clades2['no_genomes'], errors='coerce').fillna(-1)
strict_clades = (
mapped_clades2
.sort_values(['genus_norm', 'no_genomes', 'gtdb_species_clade_id'], ascending=[True, False, True])
.drop_duplicates(subset=['genus_norm'])
[['genus', 'genus_norm', 'gtdb_species_clade_id']]
)
relaxed_clades = mapped_clades2[['genus', 'genus_norm', 'gtdb_species_clade_id']].drop_duplicates()
species_proxy_clades = (
mapped_clades2[mapped_clades2['n_species_clades_for_genus'] == 1]
[['genus', 'genus_norm', 'gtdb_species_clade_id']]
.drop_duplicates()
)
print('Strict clades:', len(strict_clades), 'Relaxed clades:', len(relaxed_clades))
print('Species-proxy clades (unique genus->single clade):', len(species_proxy_clades))
Strict clades: 530 Relaxed clades: 7380 Species-proxy clades (unique genus->single clade): 150
In [6]:
def build_mode_features(mode_name: str, clades_df: pd.DataFrame, clade_agg: pd.DataFrame) -> pd.DataFrame:
merged = clades_df[['genus', 'genus_norm', 'gtdb_species_clade_id']].drop_duplicates().merge(
clade_agg,
on='gtdb_species_clade_id',
how='left'
)
for c in ['total_ann', 'defense_ann', 'mobilome_ann', 'metabolism_ann']:
if c not in merged.columns:
merged[c] = 0.0
merged[c] = pd.to_numeric(merged[c], errors='coerce').fillna(0.0)
agg = merged.groupby(['genus', 'genus_norm'], as_index=False)[
['total_ann', 'defense_ann', 'mobilome_ann', 'metabolism_ann']
].sum()
denom = agg['total_ann'].replace(0, pd.NA)
agg['cog_defense_fraction'] = agg['defense_ann'] / denom
agg['cog_mobilome_fraction'] = agg['mobilome_ann'] / denom
agg['cog_metabolism_fraction'] = agg['metabolism_ann'] / denom
agg = agg.fillna(0.0)
agg['mapping_mode'] = mode_name
return agg[['genus', 'genus_norm', 'mapping_mode', 'cog_defense_fraction', 'cog_mobilome_fraction', 'cog_metabolism_fraction']].melt(
id_vars=['genus', 'genus_norm', 'mapping_mode'],
var_name='feature_name',
value_name='feature_value'
)
# Precompute clade-level annotation counts once on the union of clades used by any mapping mode.
all_clades = pd.concat([
relaxed_clades[['gtdb_species_clade_id']],
species_proxy_clades[['gtdb_species_clade_id']]
], ignore_index=True).drop_duplicates()
spark.createDataFrame(all_clades).createOrReplaceTempView('all_clades_tmp')
clade_agg = spark.sql("""
SELECT
gc.gtdb_species_clade_id,
COUNT(*) AS total_ann,
SUM(CASE WHEN e.COG_category LIKE '%V%' THEN 1 ELSE 0 END) AS defense_ann,
SUM(CASE WHEN e.COG_category RLIKE '.*[XL].*' THEN 1 ELSE 0 END) AS mobilome_ann,
SUM(CASE WHEN e.COG_category RLIKE '.*[CEGHIP].*' THEN 1 ELSE 0 END) AS metabolism_ann
FROM kbase_ke_pangenome.gene_cluster gc
JOIN all_clades_tmp a ON gc.gtdb_species_clade_id = a.gtdb_species_clade_id
JOIN kbase_ke_pangenome.eggnog_mapper_annotations e
ON gc.gene_cluster_id = e.query_name
WHERE e.COG_category IS NOT NULL AND e.COG_category != ''
GROUP BY gc.gtdb_species_clade_id
""").toPandas()
for c in ['total_ann', 'defense_ann', 'mobilome_ann', 'metabolism_ann']:
clade_agg[c] = pd.to_numeric(clade_agg[c], errors='coerce').fillna(0.0)
feats_strict = build_mode_features('strict_single_clade', strict_clades, clade_agg)
feats_relaxed = build_mode_features('relaxed_all_clades', relaxed_clades, clade_agg)
feats_species_proxy = build_mode_features('species_proxy_unique_genus', species_proxy_clades, clade_agg)
feats = pd.concat([feats_strict, feats_relaxed, feats_species_proxy], ignore_index=True)
feats.to_csv(DATA_DIR / 'taxon_functional_features.tsv', sep=' ', index=False)
print('Saved functional feature table')
print(feats.groupby('mapping_mode')['genus_norm'].nunique().rename('n_genera_with_features'))
print('Mobilome summary by mode:')
print(feats[feats['feature_name'] == 'cog_mobilome_fraction'].groupby('mapping_mode')['feature_value'].describe().to_string())
print(feats.head().to_string(index=False))
Saved functional feature table
mapping_mode
relaxed_all_clades 530
species_proxy_unique_genus 150
strict_single_clade 530
Name: n_genera_with_features, dtype: int64
Mobilome summary by mode:
count mean std min 25% 50% 75% max
mapping_mode
relaxed_all_clades 530.0 0.063832 0.018455 0.025568 0.050273 0.060508 0.074542 0.144654
species_proxy_unique_genus 150.0 0.059951 0.019456 0.030033 0.046328 0.056552 0.068978 0.144654
strict_single_clade 530.0 0.074394 0.032015 0.024374 0.053188 0.067206 0.087020 0.281397
genus genus_norm mapping_mode feature_name feature_value
67-14 67_14 strict_single_clade cog_defense_fraction 0.012452
Abiotrophia abiotrophia strict_single_clade cog_defense_fraction 0.039560
Acetoanaerobium acetoanaerobium strict_single_clade cog_defense_fraction 0.022270
Acetobacterium acetobacterium strict_single_clade cog_defense_fraction 0.026019
Achromobacter achromobacter strict_single_clade cog_defense_fraction 0.018887