Skip to content

Commit

Permalink
Revert "Celery experiment (#720)" (#726)
Browse files Browse the repository at this point in the history
This reverts commit 382e92c.
  • Loading branch information
danielhers authored Sep 12, 2017
1 parent 382e92c commit ab84dfb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
5 changes: 0 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ node_js:

services:
- postgresql
- rabbitmq

sudo: false

Expand All @@ -24,14 +23,10 @@ before_script:
- psql -c 'create database anyway;' -U postgres
- export DATABASE_URL='postgresql://postgres@localhost/anyway'
- python models.py
- celery worker -A clusters_calculator -D

script:
- pylint -j $(nproc) *.py tests
- eslint static/js --ignore-path=static/js/.eslintignore
- python process.py
- python united.py --light
- pytest tests

after_script:
- kill $(cat celeryd.pid)
30 changes: 16 additions & 14 deletions clusters_calculator.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import itertools
from celery import Celery, group
from models import Marker
from static.pymapcluster import calculate_clusters
import logging
import concurrent.futures
import multiprocessing


celery_app = Celery('tasks', backend='rpc://', broker='pyamqp://guest@localhost//')
def retrieve_clusters(**kwargs):
marker_boxes = divide_to_boxes(kwargs['ne_lat'], kwargs['ne_lng'], kwargs['sw_lat'], kwargs['sw_lng'])
result_futures = []
logging.info('number of cores: ' + str(multiprocessing.cpu_count()))
with concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for marker_box in marker_boxes:

@celery_app.task
def calculate_marker_box(kwargs, marker_box):
kwargs.update(marker_box)
markers_in_box = Marker.bounding_box_query(**kwargs).markers.all()
return calculate_clusters(markers_in_box, kwargs['zoom'])
kwargs.update(marker_box)
markers_in_box = Marker.bounding_box_query(**kwargs).markers.all()
result_futures.append(executor.submit(calculate_clusters, markers_in_box, kwargs['zoom']))

completed_futures = concurrent.futures.wait(result_futures)
result = []
for future in completed_futures.done:
result.extend(future.result())

def retrieve_clusters(**kwargs):
marker_boxes = divide_to_boxes(kwargs['ne_lat'], kwargs['ne_lng'], kwargs['sw_lat'], kwargs['sw_lng'])
job = group([calculate_marker_box.s(kwargs, marker_box) for marker_box in marker_boxes])
result = job.apply_async()
result.join()
return list(itertools.chain.from_iterable(result.get()))
return result


def divide_to_boxes(ne_lat, ne_lng, sw_lat, sw_lng):
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ Flask-Admin==1.1.0
Flask-Security==1.7.5
WTForms==2.0.2
sendgrid==1.4.0
futures==2.1.6
python-dateutil==2.4.2
alembic==0.8.2
apscheduler==2.1.2
Flask-Compress==1.3.0
rauth==0.7.2
requests==2.4.3
celery
requests==2.4.3

0 comments on commit ab84dfb

Please sign in to comment.