diff --git a/reader.py b/reader.py index 5845775..cbe0fc5 100644 --- a/reader.py +++ b/reader.py @@ -1,15 +1,23 @@ import os +import sys import time import logging import requests +import threading +from copy import copy +from Queue import Queue +from datetime import datetime from homura import download as fetch from tempfile import mkdtemp -from datetime import datetime from collections import OrderedDict logger = logging.getLogger('landsat8.meta') +class ReachedEndOfProcess(Exception): + pass + + def convert_date(value): return datetime.strptime(value, '%Y-%m-%d').date() @@ -28,11 +36,27 @@ def download_meta(url, download_path): return open(dpath, 'r') +def row_processor(record, date, dst, writers): + + path = os.path.join(dst, str(date.year), str(date.month), str(date.day)) + + logger.info('processing %s' % record['sceneID']) + for w in writers: + w(path, record) + + def csv_reader(dst, writers, start_date=None, end_date=None, url=None, - download=False, download_path=None): + download=False, download_path=None, num_worker_threads=1): """ Reads landsat8 metadata from a csv file stored on USGS servers and applys writer functions on the data """ + threaded = False + threads = [] + + if num_worker_threads > 0: + threaded = True + queue = Queue() + if not url: url = 'http://landsat.usgs.gov/metadata_service/bulk_metadata_files/LANDSAT_8.csv' @@ -50,43 +74,72 @@ def csv_reader(dst, writers, start_date=None, end_date=None, url=None, r = requests.get(url, stream=True) liner = r.iter_lines + if start_date: + start_date = convert_date(start_date) + + if end_date: + end_date = convert_date(end_date) + header = None - start_write = False + counter = 0 for line in liner(): row = line.split(',') # first line is the header if not header: header = row + continue # other lines have values else: - for j, v in enumerate(row): - try: - row[j] = float(v) - except ValueError: - pass - - # generate the record - record = OrderedDict(zip(header, row)) - - # apply filter - # if there is an enddate, stops the process when the end date is reached - if not end_date: - start_write = True - - if end_date and record['acquisitionDate'] == end_date: - start_write = True - - if start_date and record['acquisitionDate'] == start_date: - break - - # if condition didn't match, generate path and apply writers and go to the next line - if start_write: - date = convert_date(record['acquisitionDate']) - path = os.path.join(dst, str(date.year), str(date.month), str(date.day)) - - logger.info('processing %s' % record['sceneID']) - for w in writers: - w(path, record) + write = False + + for j, v in enumerate(row): + try: + row[j] = float(v) + except ValueError: + pass + + # generate the record + record = OrderedDict(zip(header, row)) + date = convert_date(record['acquisitionDate']) + + # apply filter + # if there is an enddate, stops the process when the end date is reached + if not end_date: + write = True + + if end_date and date <= end_date: + write = True + + if start_date and date < start_date: + break + + if write: + if threaded: + new_record = copy(record) + queue.put([new_record, date, dst, writers]) + counter += 1 + else: + row_processor(record, date, dst, writers) + + if threaded and counter > 500: + + def worker(): + while not queue.empty(): + args = queue.get() + try: + row_processor(*args) + except Exception: + exc = sys.exc_info() + logger.error('%s | %s scene skipped due to error: %s' % (threading.current_thread().name, + args[0]['sceneID'], + exc[1].__str__())) + queue.task_done() + + if not threads: + for i in range(num_worker_threads): + t = threading.Thread(target=worker) + t.start() + threads.append(t)