Pattern 1: Single Query with IN Clause for Moderate Species Lists

[cog_analysis] For 10-100 species, a single query with IN clause outperforms sequential queries:

# Get target species
species_list = ['s__Species1--RS_GCF_123', 's__Species2--RS_GCF_456', ...]  # 32 species

# Create IN clause
species_in_clause = "', '".join(species_list)

# Single query for all species
query = f"""
SELECT
    gc.gtdb_species_clade_id,
    gc.is_core,
    ann.COG_category,
    COUNT(*) as gene_count
FROM kbase_ke_pangenome.gene_cluster gc
JOIN kbase_ke_pangenome.gene_genecluster_junction j
    ON gc.gene_cluster_id = j.gene_cluster_id
JOIN kbase_ke_pangenome.eggnog_mapper_annotations ann
    ON j.gene_id = ann.query_name
WHERE gc.gtdb_species_clade_id IN ('{species_in_clause}')
    AND ann.COG_category IS NOT NULL
GROUP BY gc.gtdb_species_clade_id, gc.is_core, ann.COG_category
"""

result_df = spark.sql(query)  # Keep as Spark DataFrame; call .toPandas() only for final plotting

Performance: For 32 species analyzing COG distributions:
- Sequential queries (96 queries × 3 gene classes): ~30 minutes
- Single IN clause query: ~6 minutes (5x speedup)

Key insight: Let Spark handle parallelization internally rather than doing sequential queries. The overhead of 96 separate query round-trips dominates execution time, not the data transfer.

When to use:
- 10-100 species in your analysis
- Each species has moderate data volume (<1M rows per species)
- Query involves JOINs or aggregations

When NOT to use:
- >100 species (IN clause becomes unwieldy)
- Species with >10K genomes each (use per-species iteration instead)

Pattern 2: Per-Species Iteration (For Large Species or Many Species)

For very large species or >100 species total, iterate:

# Get target species list
target_species = [row.gtdb_species_clade_id for row in spark.sql("""
    SELECT gtdb_species_clade_id
    FROM kbase_ke_pangenome.pangenome
    WHERE no_genomes >= 50
""").collect()]

# Process one species at a time
results = []
for species_id in target_species:
    # Use LIKE to avoid -- comment issue
    species_prefix = species_id.split('--')[0]

    df = spark.sql(f"""
        SELECT genome_id, COUNT(*) as n_genes
        FROM kbase_ke_pangenome.gene g
        JOIN kbase_ke_pangenome.genome gm ON g.genome_id = gm.genome_id
        WHERE gm.gtdb_species_clade_id LIKE '{species_prefix}%'
        GROUP BY genome_id
    """)

    # .toPandas() is fine here since we already aggregated to a small result per species
    pdf = df.toPandas()
    pdf['species'] = species_id
    results.append(pdf)

final = pd.concat(results)

Pattern 2: Chunked Genome Queries

When you have a list of genome IDs, process in chunks:

def chunk_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

genome_ids = [...]  # Your list of genome IDs
CHUNK_SIZE = 500

all_results = []
for chunk in chunk_list(genome_ids, CHUNK_SIZE):
    genome_list = ','.join([f"'{g}'" for g in chunk])

    result = spark.sql(f"""
        SELECT * FROM kbase_ke_pangenome.gene
        WHERE genome_id IN ({genome_list})
    """)

    all_results.append(result)

# Union Spark DataFrames instead of concatenating pandas frames
from functools import reduce
final = reduce(lambda a, b: a.union(b), all_results)
# Only convert to pandas if you need local processing
# final_pd = final.toPandas()

Pattern 3: Pagination for Large Results

-- First page
SELECT * FROM kbase_ke_pangenome.gene_cluster
WHERE gtdb_species_clade_id LIKE 's__Escherichia_coli%'
ORDER BY gene_cluster_id
LIMIT 10000 OFFSET 0

-- Second page
SELECT * FROM kbase_ke_pangenome.gene_cluster
WHERE gtdb_species_clade_id LIKE 's__Escherichia_coli%'
ORDER BY gene_cluster_id
LIMIT 10000 OFFSET 10000

Important: Always include ORDER BY for consistent pagination.

Pattern 4: Aggregation Before Collection

Do all filtering, joins, and aggregations in Spark. Only call .toPandas() on the final, small result:

# GOOD: Aggregate in Spark, collect small summary
summary = spark.sql("""
    SELECT
        gtdb_species_clade_id,
        COUNT(*) as n_clusters,
        SUM(CASE WHEN is_core = 1 THEN 1 ELSE 0 END) as n_core
    FROM kbase_ke_pangenome.gene_cluster
    GROUP BY gtdb_species_clade_id
""")

# Convert to pandas only for plotting or export (27K rows — fine)
summary_pd = summary.toPandas()

# BAD: Collect all 132M rows then aggregate in pandas
# all_clusters = spark.sql("SELECT * FROM gene_cluster").toPandas()  # DON'T DO THIS

Recommended Batch Sizes

Operation Batch Size Rationale
ANI queries 1 species Species can have 10K+ genomes = 100M+ pairs
Gene queries 100-500 genomes ~3K genes/genome = 300K-1.5M rows
Gene cluster queries 1 species 5K-500K clusters per species
Annotation lookups 10K cluster IDs JOIN is fast with index
Pathway queries 1K genomes ~1K pathways/genome

Table-Specific Strategies

`genome_ani` (421M rows)

ANI is stored as directional pairs within species. Always filter by species:

# Get genomes for target species first
genomes = [row.genome_id for row in spark.sql("""
    SELECT genome_id FROM kbase_ke_pangenome.genome
    WHERE gtdb_species_clade_id LIKE 's__Klebsiella_pneumoniae%'
""").collect()]

genome_list = ','.join([f"'{g}'" for g in genomes])

# Query ANI only for those genomes — keep as Spark DataFrame
ani = spark.sql(f"""
    SELECT genome1_id, genome2_id, ANI, AF
    FROM kbase_ke_pangenome.genome_ani
    WHERE genome1_id IN ({genome_list})
      AND genome2_id IN ({genome_list})
""")
# .toPandas() only if species is small enough for local analysis

`gene` and `gene_genecluster_junction` (1B+ rows)

Never query without a filter:

# GOOD: Filter by genome
genes = spark.sql("""
    SELECT gene_id, genome_id
    FROM kbase_ke_pangenome.gene
    WHERE genome_id = 'RS_GCF_000005845.2'
""")

# GOOD: Filter by cluster
junction = spark.sql("""
    SELECT gene_id, gene_cluster_id
    FROM kbase_ke_pangenome.gene_genecluster_junction
    WHERE gene_cluster_id IN ('cluster1', 'cluster2', 'cluster3')
""")

# BAD: Full scan
# all_genes = spark.sql("SELECT * FROM kbase_ke_pangenome.gene")  # 1 BILLION ROWS!

`gapmind_pathways` (305M rows)

Filter by genome or pathway:

# Get pathways for specific genomes
pathways = spark.sql("""
    SELECT genome_id, pathway, score_category, score_simplified
    FROM kbase_ke_pangenome.gapmind_pathways
    WHERE genome_id IN ('GCF_000005845.2', 'GCF_000006765.1')
""")

# Get all genomes with a specific pathway
arginine = spark.sql("""
    SELECT genome_id, score, score_category
    FROM kbase_ke_pangenome.gapmind_pathways
    WHERE pathway = 'arginine'
      AND metabolic_category = 'amino_acid'
      AND score_simplified = 1
""")

REST API vs Direct Spark

REST API (`https://hub.berdl.kbase.us/apis/mcp/`)

Good for:
- Simple queries returning <1M rows
- Schema exploration (/tables/list, /tables/schema)
- Row counts (/tables/count)
- Quick samples (/tables/sample)

Limitations:
- 504 timeout on queries taking >60s
- No streaming for large results
- Transient 503 errors during cluster restarts

Direct Spark SQL (Preferred)

Required for:
- JOINs across large tables
- Aggregations on billions of rows
- Results >1M rows
- Complex window functions
- Iterative analysis

How to get a Spark session:

There are three environments, each with a different import pattern:

1. BERDL JupyterHub notebooks (no import needed):

# Built-in — injected by /configs/ipython_startup/00-notebookutils.py
spark = get_spark_session()

2. BERDL JupyterHub CLI / Python scripts (explicit import):

from berdl_notebook_utils.setup_spark_session import get_spark_session
spark = get_spark_session()

3. Local machine (requires .venv-berdl + proxy chain):

from get_spark_session import get_spark_session  # scripts/get_spark_session.py
spark = get_spark_session()

The local scripts/get_spark_session.py creates a remote Spark Connect session through the proxy chain. See scripts/README.md and .claude/skills/berdl-query/SKILL.md for setup.

Common mistake: Using from get_spark_session import get_spark_session on the BERDL cluster (ImportError) or using berdl_notebook_utils locally (not installed). Match the import to the environment.

spark = get_spark_session()

# Full Spark SQL capabilities
result = spark.sql("""
    SELECT
        g.gtdb_species_clade_id,
        AVG(m.checkm_completeness) as avg_completeness,
        COUNT(*) as n_genomes
    FROM kbase_ke_pangenome.genome g
    JOIN kbase_ke_pangenome.gtdb_metadata m ON g.genome_id = m.accession
    GROUP BY g.gtdb_species_clade_id
    HAVING COUNT(*) >= 10
""")

# Write to parquet for later use
result.write.parquet('/path/to/output/species_quality.parquet')

Anti-Patterns to Avoid

1. Large IN Clauses

# BAD: 10,000+ values in IN clause
genome_list = ','.join([f"'{g}'" for g in all_genomes])  # 10K+ items
query = f"SELECT * FROM gene WHERE genome_id IN ({genome_list})"

# GOOD: Use temporary table or iterate
spark.createDataFrame([(g,) for g in all_genomes], ['genome_id']).createOrReplaceTempView('target_genomes')
query = """
    SELECT g.* FROM kbase_ke_pangenome.gene g
    JOIN target_genomes t ON g.genome_id = t.genome_id
"""

2. Cross-Species Gene JOINs

# BAD: JOIN across all species (billions of rows)
query = """
    SELECT * FROM kbase_ke_pangenome.gene g1
    JOIN kbase_ke_pangenome.gene g2 ON g1.gene_id = g2.gene_id
"""

# GOOD: Filter to specific species first
query = """
    SELECT * FROM kbase_ke_pangenome.gene g
    JOIN kbase_ke_pangenome.genome gm ON g.genome_id = gm.genome_id
    WHERE gm.gtdb_species_clade_id LIKE 's__Escherichia_coli%'
"""

3. Collecting Before Filtering

# BAD: Collect all 132M rows, filter in pandas
df = spark.sql("SELECT * FROM kbase_ke_pangenome.gene_cluster").toPandas()
core_only = df[df['is_core'] == 1]

# GOOD: Filter in Spark, keep as Spark DataFrame
core_only = spark.sql("""
    SELECT * FROM kbase_ke_pangenome.gene_cluster
    WHERE is_core = 1
    AND gtdb_species_clade_id = 's__Escherichia_coli--RS_GCF_000005845.2'
""")
# Only .toPandas() if the result is small enough for local processing

Other Large Databases

Fitness Browser (`kescience_fitnessbrowser`)

Table Rows Strategy
genefitness 27,410,721 Filter by orgId
cofit 13,656,145 Filter by orgId and locusId
ortholog ~1.15M (32 orgs) Filter by orgId1 and/or orgId2 via IN clause
genedomain millions Filter by orgId

ICA Pipeline Performance (`fitness_modules`)

[fitness_modules] Robust ICA runtime scales with genes × experiments × components × runs:

Organism Genes Experiments Components Runs Time
DvH 2,741 757 80 50 82 min
Putida 4,778 300 80 30 78 min
Methanococcus_S2 1,244 371 80 50 13 min
SynE 1,899 129 41 30 3 min
Kang 2,003 108 40 30 5 min

Key performance factors:
- Component/experiment ratio: Keep ≤ 40%. Higher ratios cause FastICA convergence failures, and each failed run hits max_iter (very slow). SB2B (68 comp / 190 exps = 36%) took 25 min; dropping to 40% cap would have been fine.
- Pairwise cosine matrix: Size = (n_runs × n_components)². At 50 runs × 80 components = 4000 vectors → 16M entries, manageable. At 100 runs × 100 components = 10,000 vectors → 100M entries, very slow.
- Practical limit: Cap at 80 components, 30-50 runs. Total time for 32 organisms: ~4-5 hours.
- Ortholog extraction: 1.15M BBH pairs for 32 organisms takes ~2 minutes via Spark. The graph construction (networkx connected components) takes ~30 seconds.

Genomes (`kbase_genomes`)

Table Rows Strategy
feature 1,011,650,903 Filter by genome via junction tables
encoded_feature 1,011,650,903 Filter by genome via junction tables
name 1,046,526,298 Filter by name or entity_id
protein 253,173,194 Filter by protein_id
All junction tables ~1B each Always filter by one entity

Biochemistry (`kbase_msd_biochemistry`)

Table Rows Strategy
reaction_similarity 671M+ Always filter by reaction_id
Other tables <100K Safe to scan

UMAP on Large Embedding Datasets

[env_embedding_explorer] UMAP with metric='cosine' on 83K 64-dim vectors did not complete in >60 min on a single-CPU pod. Two optimizations brought this to ~7 min:

  1. L2-normalize + euclidean: normalize(embeddings, norm='l2') then metric='euclidean'. Equivalent topology for cosine similarity.
  2. Subsample fit + full transform: Fit on 20K random subsample, then reducer.transform() all 83K points. The transform step is fast (~18s) because it only needs the pre-computed nearest neighbor graph from the fit.
from sklearn.preprocessing import normalize
import umap

emb_normed = normalize(embeddings, norm='l2')
fit_idx = np.random.choice(len(emb_normed), 20_000, replace=False)

reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric='euclidean', random_state=42)
reducer.fit(emb_normed[fit_idx])
coords_2d = reducer.transform(emb_normed)  # all 83K points

AlphaEarth Table is Small — Safe to Collect

[env_embedding_explorer] The alphaearth_embeddings_all_years table (83K rows × 77 columns) is small enough for .toPandas() directly. Extraction takes ~30 seconds. No need for chunking or filtering.

Per-Species ANI Extraction is Fast; Gene Clusters are 18x Slower

[ecotype_env_reanalysis] Extracting data for 224 species (25K genomes total) with per-species WHERE genome_id IN (...) queries:

Data Table(s) Total time Per species Bottleneck
ANI pairs genome_ani (421M rows) ~8 min ~2s IN clause filter on indexed column
Gene clusters gene × gene_genecluster_junction (1B × 1B) ~120 min ~32s Two-table join, neither partitioned by genome_id

Gene cluster extraction is 18x slower because it joins two billion-row unpartitioned tables. The Spark optimizer must scan both tables for each species query. Potential optimizations:
- Partition gene by genome_id (requires lakehouse rebuild)
- Cache the join result and filter per-species from the cached DataFrame
- Use BROADCAST hints with a temp view of target genome IDs (see cofitness_coinheritance pitfall)

K. pneumoniae exceeded maxResultSize: This species (250 genomes × ~5,500 genes) produced results exceeding Spark's 1GB spark.driver.maxResultSize. For very large species, chunk the genome list or increase the driver memory limit.

[bakta_reannotation] Broadcast joins fail on uniprot_identifier (2.5B rows): Any multi-way join touching kbase_uniprot.uniprot_identifier can exceed maxResultSize via broadcast exchange, even if the final result is small. Fix: disable auto-broadcast before the join:

spark.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
# ... run your join ...
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 10485760")  # restore default

This forces sort-merge joins which stream data rather than materializing entire tables in the driver.


GapMind Pathway Aggregation Performance

[pangenome_pathway_geography] Aggregating 305M rows from gapmind_pathways to species-level statistics:

Runtime: 10-15 minutes on BERDL JupyterHub with the corrected query

Key optimizations:
1. Two-stage aggregation: First aggregate genome-pathway pairs (GROUP BY genome_id, pathway), then aggregate to species level. This reduces intermediate result size.
2. CASE expressions for scoring: Compute numeric scores inline rather than joining lookup tables
3. Single query for all species: Process all 27,690 species in one Spark job rather than iterating per-species

Query structure:

-- Stage 1: Score each pathway step, take MAX per genome-pathway (handles multiple rows)
WITH best_scores AS (
    SELECT clade_name, genome_id, pathway,
           MAX(CASE score_category WHEN 'complete' THEN 5 ... END) as best_score
    FROM gapmind_pathways
    GROUP BY clade_name, genome_id, pathway
),
-- Stage 2: Count complete pathways per genome
genome_stats AS (
    SELECT clade_name, genome_id,
           SUM(CASE WHEN best_score >= 5 THEN 1 ELSE 0 END) as complete_pathways
    FROM best_scores
    GROUP BY clade_name, genome_id
)
-- Stage 3: Aggregate to species level
SELECT clade_name,
       AVG(complete_pathways) as mean_complete,
       STDDEV(complete_pathways) as std_complete
FROM genome_stats
GROUP BY clade_name

Why this works: The GROUP BY in stage 1 collapses the multiple rows per genome-pathway pair (305M rows → ~27.6M unique pairs), then stage 2 aggregates to per-genome stats (~293K rows), then stage 3 produces the final species-level output (27.7K rows). Each stage dramatically reduces data volume before the next.

Anti-pattern to avoid: Do NOT filter to score_category = 'present' — this category doesn't exist in the data. Always use the score hierarchy: complete > likely_complete > steps_missing_low > steps_missing_medium > not_present.


Performance Checklist

Before running a query:

  • [ ] Is the target table >10M rows? If yes, add filters
  • [ ] Am I using LIMIT for exploration queries?
  • [ ] Am I keeping data as Spark DataFrames until the final step?
  • [ ] Am I only calling .toPandas() on small, aggregated results?
  • [ ] Am I aggregating in Spark before collecting?
  • [ ] Am I iterating per-species/per-organism for cross-entity analysis?
  • [ ] Am I using pagination for large result sets?
  • [ ] For REST API: Is expected result <1M rows?
  • [ ] For REST API: Is query simple (no complex JOINs)?
  • [ ] Am I casting string columns to numeric types where needed?