Skip to content

Commit

Permalink
Fix: No replicas for donors in HCA (#6582)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 27, 2024
1 parent 21efd5e commit 6b505f4
Show file tree
Hide file tree
Showing 10 changed files with 959 additions and 141 deletions.
11 changes: 8 additions & 3 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,21 @@ def transform(self,
log.info('Transforming %i entities in partition %s of bundle %s, version %s.',
num_entities, partition, bundle.uuid, bundle.version)
contributions = []
replicas = []
replicas_by_coords = {}
for transformer in transformers:
for document in transformer.transform(partition):
if isinstance(document, Contribution):
contributions.append(document)
elif isinstance(document, Replica):
replicas.append(document)
try:
dup = replicas_by_coords[document.coordinates]
except KeyError:
replicas_by_coords[document.coordinates] = document
else:
dup.hub_ids.extend(document.hub_ids)
else:
assert False, document
return contributions, replicas
return contributions, list(replicas_by_coords.values())

def create_indices(self, catalog: CatalogName):
es_client = ESClientFactory.get()
Expand Down
2 changes: 0 additions & 2 deletions src/azul/plugins/metadata/anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
BiosampleTransformer,
BundleTransformer,
DatasetTransformer,
DiagnosisTransformer,
DonorTransformer,
FileTransformer,
)
Expand Down Expand Up @@ -98,7 +97,6 @@ def transformer_types(self) -> Iterable[Type[BaseTransformer]]:
BiosampleTransformer,
BundleTransformer,
DatasetTransformer,
DiagnosisTransformer,
DonorTransformer,
FileTransformer,
)
Expand Down
92 changes: 31 additions & 61 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class LinkedEntities:
def __getitem__(self, item: EntityType) -> set[EntityReference]:
return self.ancestors[item] | self.descendants[item]

def __iter__(self) -> Iterable[EntityReference]:
for entities in self.ancestors.values():
yield from entities
for entities in self.descendants.values():
yield from entities

@classmethod
def from_links(cls,
origin: EntityReference,
Expand Down Expand Up @@ -450,8 +456,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]:

class SingletonTransformer(BaseTransformer, metaclass=ABCMeta):

def _contents(self) -> MutableJSON:
return dict(
def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
Expand All @@ -462,6 +468,7 @@ def _contents(self) -> MutableJSON:
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file'])
)
yield self._contribution(contents, entity.entity_id)

@classmethod
def field_types(cls) -> FieldTypes:
Expand All @@ -480,8 +487,11 @@ def _duos_types(cls) -> FieldTypes:
def _duos(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._duos_types())

def _is_duos(self, dataset: EntityReference) -> bool:
return 'description' in self.bundle.entities[dataset]

def _dataset(self, dataset: EntityReference) -> MutableJSON:
if 'description' in self.bundle.entities[dataset]:
if self._is_duos(dataset):
return self._duos(dataset)
else:
return super()._dataset(dataset)
Expand All @@ -500,23 +510,17 @@ class ActivityTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'activities'

def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=[self._activity(entity)],
biosamples=self._entities(self._biosample, linked['biosample']),
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file'])
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class BiosampleTransformer(BaseTransformer):
Expand All @@ -525,11 +529,8 @@ class BiosampleTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'biosamples'

def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -539,25 +540,9 @@ def _transform(self,
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file']),
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class DiagnosisTransformer(BaseTransformer):

def _transform(self, entity: EntityReference) -> Iterable[Replica]:
if config.enable_replicas:
files = self._linked_entities(entity)['file']
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)

@classmethod
def entity_type(cls) -> EntityType:
return 'diagnoses'


class BundleTransformer(SingletonTransformer):
Expand All @@ -570,10 +555,6 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
contents = self._contents()
yield self._contribution(contents, entity.entity_id)


class DatasetTransformer(SingletonTransformer):

Expand All @@ -587,18 +568,9 @@ def _singleton(self) -> EntityReference:
def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
contents = self._contents()
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
# Every file in a snapshot is linked to that snapshot's singular
# dataset, making an explicit list of hub IDs for the dataset both
# redundant and impractically large (we observe that for large
# snapshots, trying to track this many files in a single data structure
# causes a prohibitively high rate of conflicts during replica updates).
# Therefore, we leave the hub IDs field empty for datasets and rely on
# the tenet that every file is an implicit hub of its parent dataset.
hub_ids = []
yield self._replica(entity, hub_ids)
yield from super()._transform(entity)
if self._is_duos(entity):
yield self._replica(entity, [])


class DonorTransformer(BaseTransformer):
Expand All @@ -607,11 +579,8 @@ class DonorTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'donors'

def _transform(self,
entity: EntityReference
) -> Iterable[Contribution | Replica]:
def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
linked[activity_type]
Expand All @@ -621,12 +590,9 @@ def _transform(self,
datasets=[self._dataset(self._only_dataset())],
diagnoses=self._entities(self._diagnosis, linked['diagnosis']),
donors=[self._donor(entity)],
files=self._entities(self._file, files),
files=self._entities(self._file, linked['file']),
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class FileTransformer(BaseTransformer):
Expand All @@ -652,8 +618,12 @@ def _transform(self,
)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
# The result of the link traversal does not include the starting entity,
# so without this step the file itself wouldn't be included in its hubs
files = (entity, *linked['file'])
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)
# The other hubs will be added when the indexer consolidates duplicate replicas.
yield self._replica(entity, [entity.entity_id])
for linked_entity in linked:
# Datasets are linked to every file in their snapshot, making an explicit list
# of hub IDs for the dataset both redundant and impractically large. Therefore,
# we leave the hub IDs field empty for datasets and rely on the tenet that every
# file is an implicit hub of its parent dataset.
yield self._replica(linked_entity,
hub_ids=[] if linked_entity.entity_type == 'dataset' else [entity.entity_id])
73 changes: 26 additions & 47 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,13 @@ def visit(self, entity: api.Entity) -> None:
if not is_zarr or sub_name.endswith('.zattrs'):
self.files[entity.document_id] = entity

@property
def entities(self) -> Iterable[EntityReference]:
for entity_dict in vars(self).values():
for entity in entity_dict.values():
yield EntityReference(entity_type=entity.schema_name,
entity_id=str(entity.document_id))


ENTITY = TypeVar('ENTITY', bound=api.Entity)

Expand Down Expand Up @@ -1466,10 +1473,20 @@ def _transform(self,
additional_contents = self.matrix_stratification_values(file)
for entity_type, values in additional_contents.items():
contents[entity_type].extend(values)
yield self._contribution(contents, file.ref.entity_id)
file_id = file.ref.entity_id
yield self._contribution(contents, file_id)
if config.enable_replicas:
hub_ids = list(map(str, visitor.files))
yield self._replica(file.ref, hub_ids)
yield self._replica(self.api_bundle.ref,
# The other hubs will be added when the indexer consolidates duplicate replicas.
[file_id])
yield self._replica(one(self.api_bundle.projects.values()).ref,
# Projects are linked to every file in their snapshot, making an explicit list
# of hub IDs for the project both redundant and impractically large. Therefore,
# we leave the hub IDs field empty for projects and rely on the tenet that every
# file is an implicit hub of its parent project.
[])
for linked_entity in visitor.entities:
yield self._replica(linked_entity, [file_id])

def matrix_stratification_values(self, file: api.File) -> JSON:
"""
Expand Down Expand Up @@ -1541,7 +1558,7 @@ def _entities(self) -> Iterable[api.CellSuspension]:

def _transform(self,
cell_suspensions: Iterable[api.CellSuspension]
) -> Iterable[Contribution | Replica]:
) -> Iterable[Contribution]:
for cell_suspension in cell_suspensions:
samples: dict[str, Sample] = dict()
self._find_ancestor_samples(cell_suspension, samples)
Expand All @@ -1565,9 +1582,6 @@ def _transform(self,
dates=[self._date(cell_suspension)],
projects=[self._project(self._api_project)])
yield self._contribution(contents, cell_suspension.ref.entity_id)
if config.enable_replicas:
hub_ids = list(map(str, visitor.files))
yield self._replica(cell_suspension.ref, hub_ids)


class SampleTransformer(PartitionedTransformer):
Expand All @@ -1591,9 +1605,7 @@ def _entities(self) -> Iterable[Sample]:
self._find_ancestor_samples(file, samples)
return samples.values()

def _transform(self,
samples: Iterable[Sample]
) -> Iterable[Contribution | Replica]:
def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]:
for sample in samples:
visitor = TransformerVisitor()
sample.accept(visitor)
Expand All @@ -1615,9 +1627,6 @@ def _transform(self,
dates=[self._date(sample)],
projects=[self._project(self._api_project)])
yield self._contribution(contents, sample.ref.entity_id)
if config.enable_replicas:
hub_ids = list(map(str, visitor.files))
yield self._replica(sample.ref, hub_ids)


class BundleAsEntity(DatedEntity):
Expand Down Expand Up @@ -1653,13 +1662,11 @@ def _dated_entities(self) -> Iterable[DatedEntity]:
def estimate(self, partition: BundlePartition) -> int:
return int(partition.contains(self._singleton_id))

def transform(self,
partition: BundlePartition
) -> Iterable[Contribution | Replica]:
def transform(self, partition: BundlePartition) -> Iterable[Contribution]:
if partition.contains(self._singleton_id):
yield from self._transform()
yield self._transform()

def _transform(self) -> Iterable[Contribution | Replica]:
def _transform(self) -> Contribution:
# Project entities are not explicitly linked in the graph. The mere
# presence of project metadata in a bundle indicates that all other
# entities in that bundle belong to that project. Because of that we
Expand Down Expand Up @@ -1717,18 +1724,7 @@ def _transform(self) -> Iterable[Contribution | Replica]:
contributed_analyses=contributed_analyses,
dates=[self._date(self._singleton_entity())],
projects=[self._project(self._api_project)])
yield self._contribution(contents, str(self._singleton_id))
if config.enable_replicas:
hub_ids = self._hub_ids(visitor)
yield self._replica(self._entity_ref(), hub_ids)

@abstractmethod
def _hub_ids(self, visitor: TransformerVisitor) -> list[str]:
raise NotImplementedError

@abstractmethod
def _entity_ref(self) -> EntityReference:
raise NotImplementedError
return self._contribution(contents, str(self._singleton_id))


class ProjectTransformer(SingletonTransformer):
Expand All @@ -1740,17 +1736,6 @@ def _singleton_entity(self) -> DatedEntity:
def entity_type(cls) -> str:
return 'projects'

def _hub_ids(self, visitor: TransformerVisitor) -> list[str]:
# Every file in a snapshot is linked to that snapshot's singular
# project, making an explicit list of hub IDs for the project both
# redundant and impractically large. Therefore, we leave the hub IDs
# field empty for projects and rely on the tenet that every file is an
# implicit hub of its parent project.
return []

def _entity_ref(self) -> EntityReference:
return self._api_project.ref


class BundleTransformer(SingletonTransformer):

Expand All @@ -1767,9 +1752,3 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]:
@classmethod
def entity_type(cls) -> str:
return 'bundles'

def _hub_ids(self, visitor: TransformerVisitor) -> list[str]:
return list(map(str, visitor.files))

def _entity_ref(self) -> EntityReference:
return self.api_bundle.ref
Loading

0 comments on commit 6b505f4

Please sign in to comment.