Skip to content
This repository has been archived by the owner on Aug 15, 2018. It is now read-only.

Add "Population Context" features #56

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions tmlib/models/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ def __init__(self, partition_key, mapobject_id, values, tpoint=None):
self.tpoint = tpoint
self.values = values

@classmethod
def _append_value(cls, connection, instance, key, val):
if not isinstance(instance, FeatureValues):
raise TypeError(
'Object must have type tmlib.models.feature.FeatureValues'
)
connection.execute('''SELECT master_modify_multiple_shards('UPDATE feature_values SET values = values || hstore('%(key)s','%(val)s') WHERE mapobject_id = %(mapobject_id)s');''',{'mapobject_id': instance.mapobject_id, 'key': key, 'val':val})


@classmethod
def _add(cls, connection, instance):
if not isinstance(instance, FeatureValues):
Expand All @@ -150,13 +159,13 @@ def _add(cls, connection, instance):
)
connection.execute('''
INSERT INTO feature_values AS v (
parition_key, values, mapobject_id, tpoint
partition_key, values, mapobject_id, tpoint
)
VALUES (
%(partition_key)s, %(values)s, %(mapobject_id)s, %(tpoint)s
)
ON CONFLICT
ON CONSTRAINT feature_values_mapobject_id_tpoint_key
-- ON CONSTRAINT feature_values_mapobject_id_fkey
DO UPDATE
SET values = v.values || %(values)s
WHERE v.mapobject_id = %(mapobject_id)s
Expand Down Expand Up @@ -188,6 +197,6 @@ def _bulk_ingest(cls, connection, instances):

def __repr__(self):
return (
'<FeatureValues(id=%r, tpoint=%r, mapobject_id=%r)>'
% (self.id, self.tpoint, self.mapobject_id)
'<FeatureValues(partition_key=%r, tpoint=%r, mapobject_id=%r)>'
% (self.partition_key, self.tpoint, self.mapobject_id)
)
17 changes: 17 additions & 0 deletions tmlib/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,23 @@ def add(self, instance):
else:
self._session.add(instance)

def append_value(self, instance, key, val):
'''Append a "key"=>"val" pair to an istance of FeatureValues class.

Parameters
----------
instance: tmlib.models.feature.FeatureValues instance
key: string
value: string
'''

cls = instance.__class__
if isinstance(instance, DistributedExperimentModel):
connection = self._session.get_bind()
with connection.connection.cursor() as c:
cls._append_value(c,instance, key, val)


def add_all(self, instances):
'''Adds multiple instances of a model class.

Expand Down
13 changes: 10 additions & 3 deletions tmlib/workflow/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CanonicalWorkflowDependencies(WorkflowDependencies):
'pyramid_creation':
['illuminati'],
'image_analysis':
['jterator']
['jterator','popcon']
}

#: collections.OrderedDict[str, Set[str]]: dependencies between workflow stages
Expand Down Expand Up @@ -140,8 +140,15 @@ class CanonicalWorkflowDependencies(WorkflowDependencies):
},
'imextract': {
'metaconfig'
}
},

'jterator':{

},
'popcon' : {
'jterator'
}
}


class MultiplexingWorkflowDependencies(CanonicalWorkflowDependencies):
Expand Down Expand Up @@ -170,7 +177,7 @@ class MultiplexingWorkflowDependencies(CanonicalWorkflowDependencies):
'pyramid_creation':
['illuminati'],
'image_analysis':
['jterator']
['jterator','popcon']
}


Expand Down
47 changes: 47 additions & 0 deletions tmlib/workflow/popcon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# TmLibrary - TissueMAPS library for distibuted image analysis routines.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''Workflow step for building and running image analysis pipelines.

The objective of image analysis is to identify meaningful objects (e.g. "cells")
in the images and extract features for the identified objects.
The `jterator` step provides users an interface to combine individual modules
available via the `JtModules repository <https://github.com/TissueMAPS/JtModules>`_
into custom image analysis *pipelines* in a
`CellProfiler <http://cellprofiler.org/>`_-like manner.
Outlines of segmented objects and extracted feature values can be stored for
further analyis or interactive visualization in the viewer.

'''
from tmlib import __version__

__dependencies__ = {'jterator'}

__fullname__ = 'Image analysis pipeline engine'

__description__ = (
'Application of a sequence of algorithms to a set of wells '
'to calculate population context for the identified objects.'
)

__logo__ = '''

__ __ __ __ __
|__)/ \|__)/ / \|\ |
| \__/| \__\__/| \|
{name} ({version})
{fullname}
https://github.com/TissueMAPS/TmLibrary
'''.format(name=__name__, version=__version__, fullname=__fullname__)

196 changes: 196 additions & 0 deletions tmlib/workflow/popcon/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# TmLibrary - TissueMAPS library for distibuted image analysis routines.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import sys
import shutil
import logging
import subprocess
import numpy as np
import pandas as pd
import collections
import shapely.geometry
import shapely.ops
from cached_property import cached_property
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import FLOAT
from psycopg2 import ProgrammingError
from psycopg2.extras import Json
from gc3libs.quantity import Duration, Memory

import tmlib.models as tm
from tmlib.utils import autocreate_directory_property
from tmlib.utils import flatten
from tmlib.readers import TextReader
from tmlib.readers import ImageReader
from tmlib.writers import TextWriter
from tmlib.models.types import ST_GeomFromText
from tmlib.workflow.api import WorkflowStepAPI
from tmlib.errors import PipelineDescriptionError
from tmlib.errors import JobDescriptionError
from tmlib.workflow.jobs import SingleRunPhase
from tmlib.workflow.jterator.jobs import DebugRunJob
from tmlib.workflow import register_step_api
from tmlib import cfg

from tmlib.workflow.popcon.lcc import LocalCC

logger = logging.getLogger(__name__)


@register_step_api('popcon')
class CrowdingEstimator(WorkflowStepAPI):

'''Class for running image analysis pipelines.'''

def __init__(self, experiment_id):
'''
Parameters
----------
experiment_id: int
ID of the processed experiment
'''
super(CrowdingEstimator, self).__init__(experiment_id)

def create_run_batches(self, args):
'''Creates job descriptions for parallel computing.

Parameters
----------
args: tmlib.workflow.jterator.args.BatchArguments
step-specific arguments

Returns
-------
generator
job descriptions
'''
with tm.utils.ExperimentSession(self.experiment_id) as session:
# Distribute wells randomly. Thereby we achieve a certain level
# of load balancing in case wells have different number of cells,
# for example.
wells = session.query(tm.Well.id).order_by(func.random()).all()
well_ids = [w.id for w in wells]
batches = self._create_batches(well_ids, args.batch_size)
for j, batch in enumerate(batches):
yield {
'id': j + 1, # job IDs are one-based!
'well_ids': batch,
'extract_object': args.extract_object,
'assign_object': args.assign_object
}
def delete_previous_job_output(self):
'''Deletes all instances of
:class:`MapobjectType <tmlib.models.mapobject.MapobjectType>`
that were generated by a prior run of the same pipeline as well as all
children instances for the processed experiment.
'''
pass

def run_job(self, batch, assume_clean_state):
'''Runs the pipeline, i.e. executes modules sequentially. After
Parameters
----------
batch: dict
job description
assume_clean_state: bool, optional
assume that output of previous runs has already been cleaned up
'''

for well_id in batch['well_ids']:
logger.info('process well %d', well_id)
with tm.utils.ExperimentSession(self.experiment_id) as session:
sites = session.query(tm.Site.id, tm.Site.height, tm.Site.width, tm.Site.y, tm.Site.x).\
filter_by(well_id=well_id).all()

sites_id = [i[0] for i in sites] #list of site ids for this well

logger.debug(
'well_id: %s has site_id: %s'
, well_id, len(sites))

wellY = sites[0][1]*len(set([i[3] for i in sites]))
wellX = sites[0][2]*len(set([i[4]for i in sites]))
extract_mapobject_type_id = session.query(tm.MapobjectType.id).\
filter_by(name=batch['extract_object']).one()[0]

logger.debug(
'id of %s is: %s '
, batch['extract_object'], extract_mapobject_type_id)

extract_seg_layer_id = session.query(tm.SegmentationLayer.id).\
filter_by(mapobject_type_id=extract_mapobject_type_id).one()[0]

extract_centroids = session.query(tm.MapobjectSegmentation.geom_centroid,tm.MapobjectSegmentation.mapobject_id,tm.MapobjectSegmentation.label,tm.MapobjectSegmentation.partition_key).\
filter(tm.MapobjectSegmentation.segmentation_layer_id==extract_seg_layer_id).\
filter(tm.MapobjectSegmentation.partition_key.in_(sites_id) ).all()

logger.debug(
'well_id: %s has %s sites and a total of %s centroids'
, well_id, len(sites), len(extract_centroids))

assign_mapobject_type_id = session.query(tm.MapobjectType.id).\
filter_by(name=batch['assign_object']).one()[0]
assign_seg_layer_id = session.query(tm.SegmentationLayer.id).\
filter_by(mapobject_type_id=assign_mapobject_type_id).one()[0]
assign_centroids = session.query(tm.MapobjectSegmentation.geom_centroid,tm.MapobjectSegmentation.mapobject_id,tm.MapobjectSegmentation.label,tm.MapobjectSegmentation.partition_key).\
filter(tm.MapobjectSegmentation.segmentation_layer_id==assign_seg_layer_id).\
filter(tm.MapobjectSegmentation.partition_key.in_(sites_id) ).all()

logger.info('Calculating LCC for well_id %s', well_id)
logger.info('Instantiating LCC for extract_object')
lcc_extract = LocalCC(extract_centroids, wellY, wellX)
logger.info('df lcc_extract: %s',lcc_extract.df.head())
logger.info(
'wellX: %s, wellY: %s, diagonal: %s'
, lcc_extract.wellX, lcc_extract.wellY, lcc_extract.well_diagonal)

real_lcc = lcc_extract.gen_real_distances()
random_lcc = lcc_extract.gen_random_distances()
lcc = lcc_extract.get_lcc(real_lcc,random_lcc)

lcc_extract.df['lcc'] = lcc_extract.df['lcc'].round()
logger.info('Instantiating LCC for assign_object')
lcc_assign = LocalCC(assign_centroids, wellY, wellX)

# remap y,x coordinates of extract object with mapobject_id
# of assign object
logger.info(
'assign mapobject_id of %s to %s'
, batch['assign_object'], batch['extract_object'])

#logger.debug(
# 'assign: %s extract: %s', lcc_assign.df['mapobject_id'], lcc_extract.df['mapobject_id'])

lcc_extract.df['mapobject_id'] = lcc_assign.df['mapobject_id']

feature_name = 'LocalCellCrowding_{}'.format(batch['extract_object'])
feature = session.get_or_create(
tm.Feature, name=feature_name,
mapobject_type_id=assign_mapobject_type_id,
is_aggregate=False)


for index, row in lcc_extract.df.iterrows():
feature_value = session.query(tm.FeatureValues).filter_by(mapobject_id= int(row['mapobject_id']) ).one()
session.append_value(feature_value,str(feature.id),row['lcc'].astype(str))
session.commit()




def collect_job_output(self, batch):
pass
45 changes: 45 additions & 0 deletions tmlib/workflow/popcon/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# TmLibrary - TissueMAPS library for distibuted image analysis routines.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from tmlib.workflow.args import Argument
from tmlib.workflow.args import BatchArguments
from tmlib.workflow.args import SubmissionArguments
from tmlib.workflow import register_step_batch_args
from tmlib.workflow import register_step_submission_args


@register_step_batch_args('popcon')
class PopconBatchArguments(BatchArguments):

extract_object = Argument(
type=str, short_flag='e',
help='Object that should be processed, e.g. Nuclei'
)

assign_object = Argument(
type=str, short_flag='a',
help='Object that the extract_object get assigned to, e.g. Cells'
)


batch_size = Argument(
type=int, help='number of wells that should be processed per job',
default=100, flag='batch-size', short_flag='b'
)


@register_step_submission_args('popcon')
class PopconSubmissionArguments(SubmissionArguments):

pass
Loading