02 Extract Cluster Reps
Jupyter notebook from the Conservation vs Fitness -- Linking FB Genes to Pangenome Clusters project.
NB02: Extract Cluster Rep FASTAs & Download FB Proteins¶
Two tasks:
- Download FB protein sequences from
fit.genomics.lbl.gov/cgi_data/aaseqsand split into per-organism FASTAs - Extract pangenome cluster rep sequences from Spark for each mapped species clade
Requires BERDL JupyterHub — get_spark_session() must be available.
Inputs: data/organism_mapping.tsv (from NB01)
Outputs:
data/fb_fastas/<orgId>.fasta— per-organism FB protein FASTAsdata/species_fastas/<clade_id>.fasta— per-species cluster rep FASTAs
In [ ]:
import pandas as pd
import os
import subprocess
from pathlib import Path
spark = get_spark_session()
print(f"Spark version: {spark.version}")
DATA_DIR = Path('../data')
FB_FASTA_DIR = DATA_DIR / 'fb_fastas'
SPECIES_FASTA_DIR = DATA_DIR / 'species_fastas'
FB_FASTA_DIR.mkdir(parents=True, exist_ok=True)
SPECIES_FASTA_DIR.mkdir(parents=True, exist_ok=True)
In [ ]:
# Load organism mapping from NB01
mapping = pd.read_csv(DATA_DIR / 'organism_mapping.tsv', sep='\t')
print(f"Loaded {len(mapping)} mapping rows")
print(f"Unique organisms: {mapping['orgId'].nunique()}")
print(f"Unique clades: {mapping['gtdb_species_clade_id'].nunique()}")
mapped_orgs = sorted(mapping['orgId'].unique())
mapped_clades = sorted(mapping['gtdb_species_clade_id'].unique())
1. Download & Split FB Protein Sequences¶
Download the official amino acid sequences from the Fitness Browser website
and split into per-organism FASTA files. Header format: >orgId:locusId
In [ ]:
import urllib.request
AASEQS_URL = 'https://fit.genomics.lbl.gov/cgi_data/aaseqs'
aaseqs_path = DATA_DIR / 'fb_aaseqs_all.fasta'
if not aaseqs_path.exists():
print(f"Downloading FB protein sequences from {AASEQS_URL}...")
urllib.request.urlretrieve(AASEQS_URL, aaseqs_path)
print(f"Downloaded to {aaseqs_path}")
else:
print(f"Already downloaded: {aaseqs_path}")
# Count sequences
n_seqs = sum(1 for line in open(aaseqs_path) if line.startswith('>'))
print(f"Total FB protein sequences: {n_seqs:,}")
In [ ]:
# Split into per-organism FASTAs
# Header format: >orgId:locusId
org_files = {} # orgId -> file handle
org_counts = {} # orgId -> sequence count
with open(aaseqs_path) as f:
current_org = None
for line in f:
if line.startswith('>'):
# Parse header: >orgId:locusId
header = line[1:].strip()
orgId = header.split(':')[0]
current_org = orgId
if orgId not in org_files:
fasta_path = FB_FASTA_DIR / f"{orgId}.fasta"
org_files[orgId] = open(fasta_path, 'w')
org_counts[orgId] = 0
org_files[orgId].write(line)
org_counts[orgId] += 1
else:
if current_org and current_org in org_files:
org_files[current_org].write(line)
# Close all files
for fh in org_files.values():
fh.close()
print(f"Split {sum(org_counts.values()):,} sequences into {len(org_counts)} organism FASTAs")
print(f"\nSequences per organism:")
for org in sorted(org_counts.keys()):
mapped_flag = '*' if org in mapped_orgs else ' '
print(f" {mapped_flag} {org:25s} {org_counts[org]:>6,} sequences")
print(f"\n* = mapped to pangenome clade")
In [ ]:
# QC: check all mapped organisms have FASTA files
missing_fb = [org for org in mapped_orgs if org not in org_counts]
if missing_fb:
print(f"WARNING: {len(missing_fb)} mapped organisms missing from aaseqs download:")
for org in missing_fb:
print(f" {org}")
else:
print(f"All {len(mapped_orgs)} mapped organisms have FB protein FASTAs")
2. Extract Pangenome Cluster Rep Sequences¶
For each unique clade from the organism mapping, extract the representative protein
sequences from the gene_cluster table (which has inline faa_sequence).
In [ ]:
# Get expected cluster counts per clade for QC
clade_str = "','".join(mapped_clades)
expected_counts = spark.sql(f"""
SELECT gtdb_species_clade_id, no_gene_clusters
FROM kbase_ke_pangenome.pangenome
WHERE gtdb_species_clade_id IN ('{clade_str}')
""").toPandas()
print(f"Expected cluster counts for {len(expected_counts)} clades:")
print(expected_counts.to_string(index=False))
In [ ]:
extraction_stats = []
for i, clade_id in enumerate(mapped_clades):
fasta_path = SPECIES_FASTA_DIR / f"{clade_id}.fasta"
# Skip if already extracted
if fasta_path.exists() and fasta_path.stat().st_size > 0:
n_existing = sum(1 for line in open(fasta_path) if line.startswith('>'))
extraction_stats.append({
'clade_id': clade_id, 'n_clusters': n_existing, 'status': 'cached'
})
print(f" [{i+1}/{len(mapped_clades)}] {clade_id}: {n_existing:,} clusters (cached)")
continue
# Extract from Spark
cluster_reps = spark.sql(f"""
SELECT gene_cluster_id, faa_sequence
FROM kbase_ke_pangenome.gene_cluster
WHERE gtdb_species_clade_id = '{clade_id}'
AND faa_sequence IS NOT NULL AND faa_sequence != ''
""").toPandas()
if len(cluster_reps) == 0:
print(f" [{i+1}/{len(mapped_clades)}] {clade_id}: WARNING - no clusters with sequences!")
extraction_stats.append({
'clade_id': clade_id, 'n_clusters': 0, 'status': 'empty'
})
continue
# Write FASTA
with open(fasta_path, 'w') as f:
for _, row in cluster_reps.iterrows():
f.write(f">{row['gene_cluster_id']}\n")
# Wrap sequence at 70 chars
seq = row['faa_sequence']
for j in range(0, len(seq), 70):
f.write(seq[j:j+70] + '\n')
extraction_stats.append({
'clade_id': clade_id, 'n_clusters': len(cluster_reps), 'status': 'extracted'
})
print(f" [{i+1}/{len(mapped_clades)}] {clade_id}: {len(cluster_reps):,} clusters")
stats_df = pd.DataFrame(extraction_stats)
print(f"\nDone. Extracted {stats_df['n_clusters'].sum():,} total cluster rep sequences")
3. Quality Control¶
In [ ]:
# Compare extracted counts vs expected from pangenome table
qc = stats_df.merge(
expected_counts,
left_on='clade_id', right_on='gtdb_species_clade_id',
how='left'
)
qc['pct_extracted'] = (qc['n_clusters'] / qc['no_gene_clusters'] * 100).round(1)
print("=== CLUSTER COUNT QC ===")
print(f"Clades with >98% clusters extracted: "
f"{(qc['pct_extracted'] >= 98).sum()}/{len(qc)}")
# Flag any with significant discrepancy
low_pct = qc[qc['pct_extracted'] < 95]
if len(low_pct) > 0:
print(f"\nWARNING: {len(low_pct)} clades with <95% extraction rate:")
print(low_pct[['clade_id', 'n_clusters', 'no_gene_clusters', 'pct_extracted']].to_string(index=False))
else:
print("All clades have >=95% extraction rate")
print(f"\nCluster count summary:")
print(qc[['clade_id', 'n_clusters', 'no_gene_clusters', 'pct_extracted', 'status']].to_string(index=False))
In [ ]:
import matplotlib.pyplot as plt
# Sequence length distributions for a few representative species
sample_clades = mapped_clades[:min(6, len(mapped_clades))]
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()
for idx, clade_id in enumerate(sample_clades):
fasta_path = SPECIES_FASTA_DIR / f"{clade_id}.fasta"
if not fasta_path.exists():
continue
# Parse sequence lengths
lengths = []
current_len = 0
with open(fasta_path) as f:
for line in f:
if line.startswith('>'):
if current_len > 0:
lengths.append(current_len)
current_len = 0
else:
current_len += len(line.strip())
if current_len > 0:
lengths.append(current_len)
short_name = clade_id.split('__')[1].split('--')[0] if '__' in clade_id else clade_id[:30]
axes[idx].hist(lengths, bins=50, edgecolor='black', alpha=0.7)
axes[idx].set_title(f"{short_name}\n(n={len(lengths):,}, med={pd.Series(lengths).median():.0f} aa)")
axes[idx].set_xlabel('Protein length (aa)')
axes[idx].set_ylabel('Count')
# Hide unused subplots
for idx in range(len(sample_clades), len(axes)):
axes[idx].set_visible(False)
plt.suptitle('Cluster Rep Protein Length Distributions', fontsize=14)
plt.tight_layout()
plt.savefig('../figures/cluster_rep_lengths.png', dpi=150)
plt.show()
In [ ]:
# Verify all mapped species have non-empty FASTAs
empty_fastas = []
for clade_id in mapped_clades:
fasta_path = SPECIES_FASTA_DIR / f"{clade_id}.fasta"
if not fasta_path.exists() or fasta_path.stat().st_size == 0:
empty_fastas.append(clade_id)
if empty_fastas:
print(f"WARNING: {len(empty_fastas)} clades with missing/empty FASTAs:")
for c in empty_fastas:
print(f" {c}")
else:
print(f"All {len(mapped_clades)} mapped clades have non-empty FASTA files")
In [ ]:
print("=" * 60)
print("NB02 SUMMARY")
print("=" * 60)
print(f"FB proteins downloaded: {n_seqs:,} sequences")
print(f"Per-organism FASTAs: {len(org_counts)} organisms")
print(f"Mapped organisms with FASTAs: {len(mapped_orgs)}")
print(f"Pangenome clades extracted: {len(mapped_clades)}")
print(f"Total cluster rep sequences: {stats_df['n_clusters'].sum():,}")
print(f"\nOutputs:")
print(f" FB FASTAs: {FB_FASTA_DIR}/")
print(f" Species FASTAs: {SPECIES_FASTA_DIR}/")
print("=" * 60)