Skip to content

Commit

Permalink
[r] Fix: No replicas for donors in HCA (#6582)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 20, 2024
1 parent e4bbbe5 commit 0aa044e
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 67 deletions.
15 changes: 12 additions & 3 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Mapping,
Sequence,
)
import itertools
from itertools import (
groupby,
)
Expand Down Expand Up @@ -308,13 +309,21 @@ def transform(self,
# The cast is necessary because unzip()'s type stub doesn't
# support heterogeneous tuples.
transforms = cast(
tuple[Iterable[Optional[Contribution]], Iterable[Optional[Replica]]],
tuple[Iterable[Contribution], Iterable[list[Replica]]],
unzip(transformer.transform(partition))
)
if transforms:
contributions_part, replicas_part = transforms
contributions.extend(filter(None, contributions_part))
replicas.extend(filter(None, replicas_part))
contributions.extend(contributions_part)
replicas_by_coords = {}
for replica in itertools.chain.from_iterable(replicas_part):
try:
dup = replicas_by_coords[replica.coordinates]
except KeyError:
replicas_by_coords[replica.coordinates] = replica
else:
dup.hub_ids.extend(replica.hub_ids)
replicas.extend(replicas_by_coords.values())
return contributions, replicas

def create_indices(self, catalog: CatalogName):
Expand Down
2 changes: 1 addition & 1 deletion src/azul/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
JSON,
)

Transform = tuple[Optional[Contribution], Optional[Replica]]
Transform = tuple[Contribution, list[Replica]]


@attr.s(frozen=True, kw_only=True, auto_attribs=True)
Expand Down
2 changes: 0 additions & 2 deletions src/azul/plugins/metadata/anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
BiosampleTransformer,
BundleTransformer,
DatasetTransformer,
DiagnosisTransformer,
DonorTransformer,
FileTransformer,
)
Expand Down Expand Up @@ -98,7 +97,6 @@ def transformer_types(self) -> Iterable[Type[BaseTransformer]]:
BiosampleTransformer,
BundleTransformer,
DatasetTransformer,
DiagnosisTransformer,
DonorTransformer,
FileTransformer,
)
Expand Down
73 changes: 30 additions & 43 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,13 @@ def _list_entities(self) -> Iterable[EntityReference]:
def _transform(self, entity: EntityReference) -> Transform:
raise NotImplementedError

def _add_replica(self,
contribution: JSON | None,
entity: EntityReference,
hub_ids: list[EntityID]
) -> Transform:
no_replica = not config.enable_replicas or self.entity_type() == 'bundles'
def _result(self,
contribution: JSON,
entity: EntityReference,
) -> Transform:
return (
None if contribution is None else self._contribution(contribution, entity),
None if no_replica else self._replica(self.bundle.entities[entity],
entity,
hub_ids)
self._contribution(contribution, entity),
[]
)

def _pluralize(self, entity_type: str) -> str:
Expand Down Expand Up @@ -526,17 +522,15 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=[self._activity(entity)],
biosamples=self._entities(self._biosample, linked['biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file'])
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
return self._result(contents, entity)


class BiosampleTransformer(BaseTransformer):
Expand All @@ -547,7 +541,6 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -557,22 +550,9 @@ def _transform(self, entity: EntityReference) -> Transform:
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file']),
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)


class DiagnosisTransformer(BaseTransformer):

def _transform(self, entity: EntityReference) -> Transform:
files = self._linked_entities(entity)['file']
hub_ids = [f.entity_id for f in files]
return self._add_replica(None, entity, hub_ids)

@classmethod
def entity_type(cls) -> EntityType:
return 'diagnoses'
return self._result(contents, entity)


class BundleTransformer(SingletonTransformer):
Expand All @@ -587,8 +567,7 @@ def _singleton(self) -> EntityReference:

def _transform(self, entity: EntityReference) -> Transform:
contents = self._contents()
hub_ids = [f.entity_id for f in self._entities_by_type['file']]
return self._add_replica(contents, entity, hub_ids)
return self._result(contents, entity)


class DatasetTransformer(SingletonTransformer):
Expand All @@ -609,8 +588,7 @@ def _transform(self, entity: EntityReference) -> Transform:
# causes a prohibitively high rate of conflicts during replica updates).
# Therefore, we leave the hub IDs field empty for datasets and rely on
# the tenet that every file is an implicit hub of its parent dataset.
hub_ids = []
return self._add_replica(contents, entity, hub_ids)
return self._result(contents, entity)


class DonorTransformer(BaseTransformer):
Expand All @@ -621,7 +599,6 @@ def entity_type(cls) -> str:

def _transform(self, entity: EntityReference) -> Transform:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -631,10 +608,9 @@ def _transform(self, entity: EntityReference) -> Transform:
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=[self._donor(entity)],
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file']),
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
return self._result(contents, entity)


class FileTransformer(BaseTransformer):
Expand All @@ -656,8 +632,19 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, linked['donor']),
files=[self._file(entity)],
)
# The result of the link traversal does not include the starting entity,
# so without this step the file itself wouldn't be included in its hubs
files = (entity, *linked['file'])
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
return self._result(contents, entity)

def _result(self,
contribution: JSON,
entity: EntityReference,
) -> Transform:
contribution, replicas = super()._result(contribution, entity)
assert len(replicas) == 0, replicas
if config.enable_replicas:
hub_type = 'file'
for ref, content in self.bundle.entities.items():
if ref.entity_type != 'bundle' and (ref.entity_type != hub_type or ref == entity):
hubs = [] if ref.entity_type == 'dataset' else self._linked_entities(ref)[hub_type]
replica = self._replica(content, ref, hub_ids=[h.entity_id for h in hubs])
replicas.append(replica)
return contribution, replicas
52 changes: 34 additions & 18 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,22 +501,15 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]:
else:
return SimpleAggregator()

def _add_replica(self,
contribution: JSON,
entity: Union[api.Entity, DatedEntity],
hub_ids: list[EntityID]
) -> Transform:
def _result(self,
contribution: JSON,
entity: Union[api.Entity, DatedEntity],
hub_ids: list[EntityID]
) -> Transform:
entity_ref = EntityReference(entity_id=str(entity.document_id),
entity_type=self.entity_type())
if not config.enable_replicas:
replica = None
elif self.entity_type() == 'bundles':
links = self.bundle.links
replica = self._replica(links, entity_ref, hub_ids)
else:
assert isinstance(entity, api.Entity), entity
replica = self._replica(entity.json, entity_ref, hub_ids)
return self._contribution(contribution, entity_ref), replica
replicas = []
return self._contribution(contribution, entity_ref), replicas

def _find_ancestor_samples(self,
entity: api.LinkedEntity,
Expand Down Expand Up @@ -1472,7 +1465,30 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]:
for entity_type, values in additional_contents.items():
contents[entity_type].extend(values)
hub_ids = list(map(str, visitor.files))
yield self._add_replica(contents, file, hub_ids)
yield self._result(contents, file, hub_ids)

def _result(self,
contribution: MutableJSON,
entity: Union[api.Entity, DatedEntity],
hub_ids: list[EntityID]
) -> Transform:
contribution, replicas = super()._result(contribution, entity, hub_ids)
assert len(replicas) == 0, replicas
if config.enable_replicas:
for replica_entity in self.api_bundle.entities.values():
if isinstance(replica_entity, (api.Project, api.Bundle)):
hub_ids = list(map(str, self.api_bundle.files))
elif isinstance(replica_entity, api.LinkedEntity):
visitor = TransformerVisitor()
replica_entity.accept(visitor)
replica_entity.ancestors(visitor)
hub_ids = list(map(str, visitor.files))
else:
assert False, replica_entity
ref = EntityReference(entity_type=replica_entity.schema_name,
entity_id=str(replica_entity.document_id))
replicas.append(self._replica(replica_entity.json, ref, hub_ids))
return contribution, replicas

def matrix_stratification_values(self, file: api.File) -> JSON:
"""
Expand Down Expand Up @@ -1568,7 +1584,7 @@ def _transform(self,
dates=[self._date(cell_suspension)],
projects=[self._project(self._api_project)])
hub_ids = list(map(str, visitor.files))
yield self._add_replica(contents, cell_suspension, hub_ids)
yield self._result(contents, cell_suspension, hub_ids)


class SampleTransformer(PartitionedTransformer):
Expand Down Expand Up @@ -1614,7 +1630,7 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]:
dates=[self._date(sample)],
projects=[self._project(self._api_project)])
hub_ids = list(map(str, visitor.files))
yield self._add_replica(contents, sample, hub_ids)
yield self._result(contents, sample, hub_ids)


class BundleAsEntity(DatedEntity):
Expand Down Expand Up @@ -1715,7 +1731,7 @@ def _transform(self) -> Transform:
dates=[self._date(self._singleton_entity())],
projects=[self._project(self._api_project)])
hub_ids = self._hub_ids(visitor)
return self._add_replica(contents, self._singleton_entity(), hub_ids)
return self._result(contents, self._singleton_entity(), hub_ids)

@abstractmethod
def _hub_ids(self, visitor: TransformerVisitor) -> list[str]:
Expand Down

0 comments on commit 0aa044e

Please sign in to comment.