01 Enigma Extraction Qc
Jupyter notebook from the Contamination Gradient vs Functional Potential in ENIGMA Communities project.
NB01: ENIGMA Extraction and QC¶
Extract overlap-ready geochemistry, community, and metadata tables for downstream modeling.
Requires BERDL JupyterHub with built-in get_spark_session().
Outputs:
../data/geochemistry_sample_matrix.tsv../data/community_taxon_counts.tsv../data/sample_location_metadata.tsv
In [1]:
from pathlib import Path
import re
import pandas as pd
DATA_DIR = Path('../data')
DATA_DIR.mkdir(parents=True, exist_ok=True)
spark = get_spark_session()
print('Spark session ready')
Spark session ready
In [2]:
tables = spark.sql('SHOW TABLES IN enigma_coral').toPandas()
table_names = set(tables['tableName'].tolist())
print(f"ENIGMA tables discovered: {len(table_names)}")
community_brick = 'ddt_brick0000476' if 'ddt_brick0000476' in table_names else 'ddt_brick0000459'
print('Using community count table:', community_brick)
for t in ['ddt_brick0000010', community_brick, 'ddt_brick0000454', 'sdt_sample', 'sdt_community', 'sdt_location']:
print(f"\n=== {t} ===")
spark.sql(f'DESCRIBE enigma_coral.{t}').show(200, truncate=False)
ENIGMA tables discovered: 47 Using community count table: ddt_brick0000476 === ddt_brick0000010 ===
+-----------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------+
|col_name |data_type|comment |
+-----------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------+
|sdt_sample_name |string |{"description": "environmental sample ID", "type": "foreign_key", "references": "sdt_sample.sdt_sample_name"} |
|molecule_from_list_sys_oterm_id |string |{"description": "molecule from list, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"}|
|molecule_from_list_sys_oterm_name |string |{"description": "molecule from list"} |
|molecule_molecular_weight_dalton |double |{"description": "molecular weight", "unit": "dalton"} |
|molecule_algorithm_parameter |string |{"description": "algorithm parameter"} |
|molecule_detection_limit_micromolar|double |{"description": "detection limit", "unit": "micromolar"} |
|physiochemical_state |string |{"description": "physiochemical state"} |
|replicate_series_count_unit |int |{"description": "replicate series", "unit": "count unit"} |
|concentration_micromolar |double |{"description": "concentration", "unit": "micromolar"} |
+-----------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------+
=== ddt_brick0000476 ===
+---------------------------+---------+--------------------------------------------------------------------------------------------------------+
|col_name |data_type|comment |
+---------------------------+---------+--------------------------------------------------------------------------------------------------------+
|sdt_asv_name |string |{"description": "ASV ID", "type": "foreign_key", "references": "sdt_asv.sdt_asv_name"} |
|sdt_community_name |string |{"description": "community ID", "type": "foreign_key", "references": "sdt_community.sdt_community_name"}|
|replicate_series_count_unit|int |{"description": "replicate series", "unit": "count unit"} |
|count_count_unit |int |{"description": "count", "unit": "count unit"} |
+---------------------------+---------+--------------------------------------------------------------------------------------------------------+
=== ddt_brick0000454 ===
+------------------------------+---------+----------------------------------------------------------------------------------------------------------------------+
|col_name |data_type|comment |
+------------------------------+---------+----------------------------------------------------------------------------------------------------------------------+
|sdt_asv_name |string |{"description": "ASV ID", "type": "foreign_key", "references": "sdt_asv.sdt_asv_name"} |
|taxonomic_level_sys_oterm_id |string |{"description": "taxonomic level, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"}|
|taxonomic_level_sys_oterm_name|string |{"description": "taxonomic level"} |
|sdt_taxon_name |string |{"description": "taxon ID", "type": "foreign_key", "references": "sdt_taxon.sdt_taxon_name"} |
+------------------------------+---------+----------------------------------------------------------------------------------------------------------------------+
=== sdt_sample ===
+--------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|col_name |data_type|comment |
+--------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sdt_sample_id |string |{"description": "Unique identifier for the sample (Primary key)", "type": "primary_key"} |
|sdt_sample_name |string |{"description": "Unique name of the sample", "type": "unique_key"} |
|sdt_location_name |string |{"description": "Location where the sample was collected (Foreign key)", "type": "foreign_key", "references": "sdt_location.sdt_location_name"} |
|depth_meter |double |{"description": "For below-ground samples, the average distance below ground level in meters where the sample was taken", "unit": "meter"} |
|elevation_meter |double |{"description": "For above-ground samples, the average distance above ground level in meters where the sample was taken", "unit": "meter"} |
|date |string |{"description": "YYYY[-MM[-DD]]"} |
|time |string |{"description": "HH[:MM[:SS]] [AM|PM]"} |
|timezone |string |{"description": "ISO8601 compliant format, ie. UTC-7"} |
|material_sys_oterm_id |string |{"description": "Material type of the sample, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"} |
|material_sys_oterm_name |string |{"description": "Material type of the sample"} |
|env_package_sys_oterm_id |string |{"description": "MIxS environmental package classification of the sample, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"}|
|env_package_sys_oterm_name|string |{"description": "MIxS environmental package classification of the sample"} |
|sdt_sample_description |string |{"description": "Free-form description or notes about the sample"} |
+--------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
=== sdt_community ===
+-----------------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|col_name |data_type |comment |
+-----------------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sdt_community_id |string |{"description": "Unique internal identifier for the community (Primary key)", "type": "primary_key"} |
|sdt_community_name |string |{"description": "Unique name of the community", "type": "unique_key"} |
|community_type_sys_oterm_id |string |{"description": "Type of community, e.g., isolate or enrichment, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"} |
|community_type_sys_oterm_name|string |{"description": "Type of community, e.g., isolate or enrichment"} |
|sdt_sample_name |string |{"description": "Reference to the Sample from which the community was obtained.", "type": "foreign_key", "references": "sdt_sample.sdt_sample_name"} |
|parent_sdt_community_name |string |{"description": "Reference to the name of a parent community, establishing hierarchical relationships", "type": "foreign_key", "references": "sdt_community.sdt_community_name"} |
|sdt_condition_name |string |{"description": "Reference to the experimental or environmental condition associated with the community", "type": "foreign_key", "references": "sdt_condition.sdt_condition_name"}|
|defined_sdt_strain_names |array<string>|{"description": "List of strains that comprise the community, if the community is defined", "type": "foreign_key", "references": "sdt_strain.sdt_strain_name"} |
|sdt_community_description |string |{"description": "Free-text field providing additional details or notes about the community"} |
+-----------------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
=== sdt_location ===
+------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|col_name |data_type|comment |
+------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|sdt_location_id |string |{"description": "Unique identifier for the location (Primary key)", "type": "primary_key"} |
|sdt_location_name |string |{"description": "Unique name of the location", "type": "unique_key"} |
|latitude_degree |double |{"description": "Latitude of the location in decimal degrees", "unit": "degree"} |
|longitude_degree |double |{"description": "Longitude of the location in decimal degrees", "unit": "degree"} |
|continent_sys_oterm_id |string |{"description": "Continent where the location is situated, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"} |
|continent_sys_oterm_name|string |{"description": "Continent where the location is situated"} |
|country_sys_oterm_id |string |{"description": "Country of the location, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"} |
|country_sys_oterm_name |string |{"description": "Country of the location"} |
|region |string |{"description": "Specific local region name(s)"} |
|biome_sys_oterm_id |string |{"description": "Biome classification of the location, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"} |
|biome_sys_oterm_name |string |{"description": "Biome classification of the location"} |
|feature_sys_oterm_id |string |{"description": "Environmental or geographic feature at the location, ontology term CURIE", "type": "foreign_key", "references": "sys_oterm.sys_oterm_id"}|
|feature_sys_oterm_name |string |{"description": "Environmental or geographic feature at the location"} |
+------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
In [3]:
geochem_raw = spark.sql("""
SELECT
sdt_sample_name,
molecule_from_list_sys_oterm_name AS molecule,
CAST(concentration_micromolar AS DOUBLE) AS concentration_micromolar
FROM enigma_coral.ddt_brick0000010
WHERE concentration_micromolar IS NOT NULL
""").toPandas()
geochem_raw = geochem_raw.dropna(subset=['sdt_sample_name', 'molecule'])
print(f"Geochemistry raw rows: {len(geochem_raw):,}")
print(f"Samples: {geochem_raw['sdt_sample_name'].nunique():,}")
print(f"Molecules: {geochem_raw['molecule'].nunique():,}")
geochem = geochem_raw.pivot_table(
index='sdt_sample_name',
columns='molecule',
values='concentration_micromolar',
aggfunc='mean'
)
geochem.columns = [re.sub(r'[^a-z0-9]+', '_', c.lower()).strip('_').replace('_atom', '') for c in geochem.columns]
geochem = geochem.reset_index()
print('Geochemistry matrix shape:', geochem.shape)
target_cols = [c for c in geochem.columns if any(k in c for k in ['uranium', 'chromium', 'nickel', 'zinc', 'copper', 'cadmium', 'lead', 'arsenic', 'mercury'])]
print('Contaminant columns:', target_cols[:20])
Geochemistry raw rows: 52,884 Samples: 117 Molecules: 48
Geochemistry matrix shape: (117, 49) Contaminant columns: ['arsenic', 'cadmium', 'chromium', 'copper', 'lead', 'nickel', 'uranium', 'zinc']
In [4]:
geochem_samples = geochem[['sdt_sample_name']].drop_duplicates()
spark.createDataFrame(geochem_samples).createOrReplaceTempView('geochem_samples_tmp')
overlap_communities = spark.sql("""
SELECT DISTINCT
c.sdt_community_name,
c.sdt_sample_name
FROM enigma_coral.sdt_community c
JOIN geochem_samples_tmp g ON c.sdt_sample_name = g.sdt_sample_name
WHERE c.sdt_community_name IS NOT NULL
""")
print('Overlap communities:', overlap_communities.count())
print('Overlap samples:', overlap_communities.select('sdt_sample_name').distinct().count())
overlap_communities.createOrReplaceTempView('overlap_comms_tmp')
# Keep heavy operations in Spark; aggregate before collecting to driver
asv_counts_spark = spark.sql(f"""
SELECT
b.sdt_asv_name,
b.sdt_community_name,
o.sdt_sample_name,
SUM(CAST(regexp_replace(CAST(b.count_count_unit AS STRING), '[^0-9.-]', '') AS DOUBLE)) AS read_count
FROM enigma_coral.{community_brick} b
JOIN overlap_comms_tmp o ON b.sdt_community_name = o.sdt_community_name
WHERE b.count_count_unit IS NOT NULL
GROUP BY b.sdt_asv_name, b.sdt_community_name, o.sdt_sample_name
HAVING SUM(CAST(regexp_replace(CAST(b.count_count_unit AS STRING), '[^0-9.-]', '') AS DOUBLE)) > 0
""")
print('ASV grouped rows (Spark):', asv_counts_spark.count())
print('Unique ASVs (Spark):', asv_counts_spark.select('sdt_asv_name').distinct().count())
Overlap communities: 220
Overlap samples: 108
ASV grouped rows (Spark): 132356
Unique ASVs (Spark): 47528
In [5]:
asv_genus_spark = spark.sql("""
SELECT DISTINCT
sdt_asv_name,
sdt_taxon_name AS genus
FROM enigma_coral.ddt_brick0000454
WHERE taxonomic_level_sys_oterm_name = 'Genus'
""")
asv_phylum_spark = spark.sql("""
SELECT DISTINCT
sdt_asv_name,
sdt_taxon_name AS phylum
FROM enigma_coral.ddt_brick0000454
WHERE taxonomic_level_sys_oterm_name = 'Phylum'
""")
asv_tax_spark = asv_genus_spark.join(asv_phylum_spark, on='sdt_asv_name', how='left')
community_taxon_spark = (
asv_counts_spark
.join(asv_tax_spark, on='sdt_asv_name', how='left')
.fillna({'genus': 'unclassified', 'phylum': 'unclassified'})
)
# Final export table: sample/community/genus/phylum counts
community_taxon = (
community_taxon_spark
.groupBy('sdt_sample_name', 'sdt_community_name', 'genus', 'phylum')
.sum('read_count')
.withColumnRenamed('sum(read_count)', 'read_count')
.toPandas()
)
print('Community+taxonomy rows (aggregated):', len(community_taxon))
print('Unique genera:', community_taxon['genus'].nunique())
Community+taxonomy rows (aggregated): 41711 Unique genera: 1392
In [6]:
sample_meta = spark.sql("""
SELECT DISTINCT
s.sdt_sample_name,
s.sdt_location_name,
s.date,
s.depth_meter,
l.latitude_degree,
l.longitude_degree,
l.region
FROM enigma_coral.sdt_sample s
LEFT JOIN enigma_coral.sdt_location l
ON s.sdt_location_name = l.sdt_location_name
""").toPandas()
valid_samples = set(community_taxon['sdt_sample_name'])
geochem_f = geochem[geochem['sdt_sample_name'].isin(valid_samples)].copy()
community_f = community_taxon[community_taxon['sdt_sample_name'].isin(valid_samples)].copy()
sample_meta_f = sample_meta[sample_meta['sdt_sample_name'].isin(valid_samples)].copy()
print('Filtered geochemistry shape:', geochem_f.shape)
print('Filtered community rows:', len(community_f))
print('Filtered sample metadata rows:', len(sample_meta_f))
Filtered geochemistry shape: (108, 49) Filtered community rows: 41711 Filtered sample metadata rows: 108
In [7]:
geochem_f.to_csv(DATA_DIR / 'geochemistry_sample_matrix.tsv', sep=' ', index=False)
community_f.to_csv(DATA_DIR / 'community_taxon_counts.tsv', sep=' ', index=False)
sample_meta_f.to_csv(DATA_DIR / 'sample_location_metadata.tsv', sep=' ', index=False)
print('Saved outputs:')
print(' -', (DATA_DIR / 'geochemistry_sample_matrix.tsv').resolve())
print(' -', (DATA_DIR / 'community_taxon_counts.tsv').resolve())
print(' -', (DATA_DIR / 'sample_location_metadata.tsv').resolve())
print('\nSummary')
print('Samples with both geochem+community:', geochem_f['sdt_sample_name'].nunique())
print('Communities:', community_f['sdt_community_name'].nunique())
if 'sdt_asv_name' in community_f.columns:
print('ASVs:', community_f['sdt_asv_name'].nunique())
print('Genera:', community_f['genus'].nunique())
Saved outputs: - /home/psdehal/pangenome_science/BERIL-research-observatory/projects/enigma_contamination_functional_potential/data/geochemistry_sample_matrix.tsv - /home/psdehal/pangenome_science/BERIL-research-observatory/projects/enigma_contamination_functional_potential/data/community_taxon_counts.tsv - /home/psdehal/pangenome_science/BERIL-research-observatory/projects/enigma_contamination_functional_potential/data/sample_location_metadata.tsv Summary Samples with both geochem+community: 108 Communities: 212 Genera: 1392