Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Bigscape v2 data #234

Merged
merged 13 commits into from
Apr 4, 2024
10 changes: 8 additions & 2 deletions src/nplinker/arranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def arrange_bigscape(self) -> None:
- Check if the default BiG-SCAPE data directory exists.
- Check if the clustering file "mix_clustering_c{config.bigscape.cutoff}.tsv" exists in the
BiG-SCAPE data directory.
- Check if the 'data_sqlite.db' file exists in the BiG-SCAPE data directory.
"""
pass_validation = False
if config.mode == "podp":
Expand Down Expand Up @@ -445,6 +446,10 @@ def validate_bigscape(bigscape_dir: Path) -> None:
"mix_clustering_c{config.bigscape.cutoff}.tsv" where {config.bigscape.cutoff} is the
bigscape cutoff value set in the config file.

Alternatively, the directory can contain the BiG-SCAPE database file generated by BiG-SCAPE v2.
At the moment, all the family assignments in the database will be used, so this database should
contain results from a single run with the desired cutoff.

Args:
bigscape_dir: Path to the BiG-SCAPE data directory.

Expand All @@ -455,5 +460,6 @@ def validate_bigscape(bigscape_dir: Path) -> None:
raise FileNotFoundError(f"BiG-SCAPE data directory not found at {bigscape_dir}")

clustering_file = bigscape_dir / f"mix_clustering_c{config.bigscape.cutoff}.tsv"
if not clustering_file.exists():
raise FileNotFoundError(f"BiG-SCAPE clustering file not found: {clustering_file}")
database_file = bigscape_dir / "data_sqlite.db"
adraismawur marked this conversation as resolved.
Show resolved Hide resolved
adraismawur marked this conversation as resolved.
Show resolved Hide resolved
if not clustering_file.exists() and not database_file.exists():
raise FileNotFoundError(f"BiG-SCAPE data not found in {clustering_file} or {database_file}")
3 changes: 2 additions & 1 deletion src/nplinker/genomics/bigscape/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from .bigscape_loader import BigscapeGCFLoader
from .bigscape_loader import BigscapeV2GCFLoader
from .runbigscape import run_bigscape


logging.getLogger(__name__).addHandler(logging.NullHandler())

__all__ = ["BigscapeGCFLoader", "run_bigscape"]
__all__ = ["BigscapeGCFLoader", "BigscapeV2GCFLoader", "run_bigscape"]
76 changes: 76 additions & 0 deletions src/nplinker/genomics/bigscape/bigscape_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
import csv
import sqlite3
from os import PathLike
from nplinker.logconfig import LogConfig
from ..abc import GCFLoaderBase
Expand Down Expand Up @@ -59,3 +60,78 @@ def _parse_gcf(cluster_file: str) -> list[GCF]:

# register as virtual class to prevent metaclass conflicts
GCFLoaderBase.register(BigscapeGCFLoader)


class BigscapeV2GCFLoader:
def __init__(self, db_file: str | PathLike, /) -> None:
"""Build a loader for BiG-SCAPE v2 database file.

Args:
db_file: Path to the BiG-SCAPE v2 database file

Attributes:
db_file: path to the BiG-SCAPE database file.
"""
self.db_file = str(db_file)
self._gcf_list = self._parse_gcf(self.db_file)

def get_gcfs(self, keep_mibig_only: bool = False, keep_singleton: bool = False) -> list[GCF]:
"""Get all GCF objects.

Args:
keep_mibig_only: True to keep GCFs that contain only MIBiG
BGCs.
keep_singleton: True to keep singleton GCFs. A singleton GCF
is a GCF that contains only one BGC.

Returns:
a list of GCF objects.
"""
gcf_list = self._gcf_list
if not keep_mibig_only:
gcf_list = [gcf for gcf in gcf_list if not gcf.has_mibig_only()]
if not keep_singleton:
gcf_list = [gcf for gcf in gcf_list if not gcf.is_singleton()]
return gcf_list

@staticmethod
def _parse_gcf(db_file: str) -> list[GCF]:
"""Get GCF objects from database.

Args:
db_file: Path to the sqlite3 database file.

Returns:
A list of GCF objects
"""
gcf_dict: dict[str, GCF] = {}

with sqlite3.connect(db_file) as connection:
cursor = connection.cursor()

query = """
SELECT gbk.path, bgc_record_family.family_id FROM bgc_record_family
JOIN bgc_record ON bgc_record.id = bgc_record_family.record_id
JOIN gbk ON gbk.id = bgc_record.gbk_id
"""

results = cursor.execute(query).fetchall()

for result in results:
gbk_path, family_id = result

# take the filename of the gbk path as the bgc_id
# filename
bgc_id: str = gbk_path.split("/")[-1]
# remove extension
bgc_id = bgc_id.rsplit(".", 1)[0]

if family_id not in gcf_dict:
gcf_dict[family_id] = GCF(family_id)
gcf_dict[family_id].bgc_ids.add(bgc_id)

return list(gcf_dict.values())


# register as virtual class to prevent metaclass conflicts
GCFLoaderBase.register(BigscapeV2GCFLoader)
18 changes: 16 additions & 2 deletions src/nplinker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from nplinker import globals
from nplinker.config import config
from nplinker.genomics.antismash import AntismashBGCLoader
from nplinker.genomics.bigscape import BigscapeGCFLoader
from nplinker.genomics.bigscape import BigscapeGCFLoader, BigscapeV2GCFLoader
from nplinker.genomics.mibig import MibigLoader
from nplinker.genomics.utils import add_bgc_to_gcf
from nplinker.genomics.utils import add_strain_to_bgc
Expand Down Expand Up @@ -159,7 +159,21 @@ def _load_genomics(self):
bigscape_cluster_file = (
globals.BIGSCAPE_DEFAULT_PATH / f"mix_clustering_c{config.bigscape.cutoff}.tsv"
)
raw_gcfs = BigscapeGCFLoader(bigscape_cluster_file).get_gcfs()
bigscape_db_file = globals.BIGSCAPE_DEFAULT_PATH / f"data_sqlite.db"

# switch depending on found file. prefer V1 if both are found
adraismawur marked this conversation as resolved.
Show resolved Hide resolved
if bigscape_cluster_file.exists():
loader = BigscapeGCFLoader(bigscape_cluster_file)
logger.debug(f"Loading BigSCAPE cluster file {bigscape_cluster_file}")
elif bigscape_db_file.exists():
loader = BigscapeV2GCFLoader(bigscape_db_file)
logger.debug(f"Loading BigSCAPE database file {bigscape_db_file}")
else:
raise FileNotFoundError(
f"BigSCAPE cluster file {bigscape_cluster_file} or database file {bigscape_db_file} not found."
adraismawur marked this conversation as resolved.
Show resolved Hide resolved
)

raw_gcfs = loader.get_gcfs()

# Step 5: add BGC objects to GCF
all_gcfs_with_bgc, _, _ = add_bgc_to_gcf(all_bgcs_with_strain, raw_gcfs)
Expand Down
Binary file added tests/unit/data/bigscape/mix/data_sqlite.db
Binary file not shown.
33 changes: 33 additions & 0 deletions tests/unit/genomics/test_bigscape_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from nplinker.genomics import GCF
from nplinker.genomics.abc import GCFLoaderBase
from nplinker.genomics.bigscape import BigscapeGCFLoader
from nplinker.genomics.bigscape.bigscape_loader import BigscapeV2GCFLoader
from .. import DATA_DIR


Expand Down Expand Up @@ -37,3 +38,35 @@ def test_parse_gcf(self, loader):
assert len(gcf_list) == 5
for gcf in gcf_list:
assert isinstance(gcf, GCF)


class TestBigscapeV2GCFLoader:
@pytest.fixture
def loader(self):
db_file = DATA_DIR / "bigscape" / "mix" / "data_sqlite.db"
loader = BigscapeV2GCFLoader(db_file)
yield loader

def test_abc(self, loader):
assert issubclass(BigscapeV2GCFLoader, GCFLoaderBase)
assert isinstance(loader, GCFLoaderBase)

def test_init(self, loader):
assert loader.db_file == str(DATA_DIR / "bigscape" / "mix" / "data_sqlite.db")

@pytest.mark.parametrize(
"keep_mibig_only, keep_singleton, expected",
[(False, False, 1), (True, False, 2), (False, True, 2), (True, True, 4)],
)
def test_get_gcfs_v2(self, loader, keep_mibig_only, keep_singleton, expected):
gcfs = loader.get_gcfs(keep_mibig_only, keep_singleton)
assert isinstance(gcfs, list)
assert len(gcfs) == expected
assert isinstance(gcfs[0], GCF)

def test_parse_gcf_v2(self, loader):
gcf_list = BigscapeV2GCFLoader._parse_gcf(loader.db_file)
assert isinstance(gcf_list, list)
assert len(gcf_list) == 4
for gcf in gcf_list:
assert isinstance(gcf, GCF)
Loading