Skip to content

Commit

Permalink
Granularity split (#162)
Browse files Browse the repository at this point in the history
* Split granularity into estimation_window and frequency

* Remove mentions of granularity

* flake8
  • Loading branch information
LadyChristina authored Oct 6, 2024
1 parent 0ddcf6e commit 414b057
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 235 deletions.
15 changes: 10 additions & 5 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ analyze_flags:

# The timeframe for which an analysis should be performed.
# Each date is a string of the form YYYY-MM-DD.
# If granularity is also set, then the analysis will run on the timeframe of the two farthest snapshots.
timeframe:
start_date: 2010-01-01
start_date: 2011-01-01
end_date: 2023-12-31

# The granularity for the analysis when two dates are provided in the --snapshot_dates argument (which are then interpreted as start and end dates).
# It can be one of: "day", "week", "month", "year", or empty. If empty, then only the snapshots for the given dates will be analyzed.
granularity: "month"
# The number of days to use for the estimation window, i.e.how many days of blocks to use for each data point.
# If left empty, then the entire time frame will be used (only valid when combined with empty frequency).
estimation_window: 30

# How frequently to sample the data, in days
# If left empty, then only one data point will be analyzed (snapshot instead of longitudinal analysis), but this is
# only valid when combined with an empty estimation_window.
frequency: 30 # todo maybe add hadrcoded values for day, week, month, year (in the code that parses this) + for the estimation window


input_directories: # Paths to directories that contain raw input data
- ./input
Expand Down
80 changes: 43 additions & 37 deletions consensus_decentralization/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from collections import defaultdict
from dateutil.rrule import rrule, MONTHLY, WEEKLY, YEARLY, DAILY
import datetime
import consensus_decentralization.helper as hlp

Expand Down Expand Up @@ -56,77 +55,84 @@ def aggregate(self, timeframe_start, timeframe_end):
return blocks_per_entity


def divide_timeframe(timeframe, granularity):
def divide_timeframe(timeframe, estimation_window, frequency):
"""
Divides the timeframe into smaller timeframes of the given granularity
Divides the timeframe into smaller timeframes based on the given estimation_window and frequency. Each smaller
timeframe will be estimation_window days long and the start (or end) date of each smaller timeframe will be
frequency days apart from the start (or end) date of the previous timeframe. The last timeframe will not
necessarily have the end date of the original timeframe, it might be some days before that, so that all time
frames produced have equal length.
If the estimation_window is None, then the timeframe is not divided and the list will contain only one
tuple with the start and end dates of the timeframe. If the frequency is None, then the list will contain only one
tuple with the start and end dates of the timeframe.
:param timeframe: a tuple of (start_date, end_date) where each date is a datetime.date object.
:param granularity: the granularity that will be used for the analysis. It can be one of: day, week, month, year, all
:return: a list of tuples of (start_date, end_date) where each date is a datetime.date object and each tuple
corresponds to a timeframe of the given granularity
:raises ValueError: if the timeframe is not valid (i.e. end date preceeds start_date) or if the granularity is not
one of: day, week, month, year
:param estimation_window: int or None. The number of days to include in each time chunk. If None, the entire
timeframe will be considered as one chunk.
:param frequency: int or None. The number of days between each sample start date. If None, only one sample will be
considered, spanning the entire timeframe (i.e. it needs to be combined with None estimation_window).
:returns: a list of tuples of (start_date, end_date) where each date is a datetime.date object. If the estimation
window is larger than the timeframe, then an empty list is returned.
:raises ValueError: if the timeframe is not valid (i.e. end date preceeds start_date)
"""
timeframe_start, timeframe_end = timeframe
if timeframe_end < timeframe_start:
raise ValueError(f'Invalid timeframe: {timeframe}')
if granularity == 'day':
start_dates = [dt.date() for dt in rrule(freq=DAILY, dtstart=timeframe_start, until=timeframe_end)]
end_dates = start_dates
elif granularity == 'week':
start_dates = [dt.date() for dt in rrule(freq=WEEKLY, dtstart=timeframe_start, until=timeframe_end)]
end_dates = [dt - datetime.timedelta(days=1) for dt in start_dates[1:]] + [timeframe_end]
elif granularity == 'month':
start_dates = [dt.date() for dt in rrule(freq=MONTHLY, dtstart=timeframe_start.replace(day=1), until=timeframe_end)]
start_dates[0] = timeframe_start
end_dates = [dt - datetime.timedelta(days=1) for dt in start_dates[1:]] + [timeframe_end]
elif granularity == 'year':
start_dates = [dt.date() for dt in rrule(freq=YEARLY, dtstart=timeframe_start.replace(month=1, day=1), until=timeframe_end)]
start_dates[0] = timeframe_start
end_dates = [dt - datetime.timedelta(days=1) for dt in start_dates[1:]] + [timeframe_end]
else:
# no need to divide the timeframe
start_dates = [timeframe_start]
end_dates = [timeframe_end]
return list(zip(start_dates, end_dates))
if estimation_window is None:
return [(timeframe_start, timeframe_end)]
time_chunks = []
first_window_day = timeframe_start
last_window_day = timeframe_start + datetime.timedelta(days=estimation_window - 1)
while last_window_day <= timeframe_end:
time_chunks.append((first_window_day, last_window_day))
first_window_day += datetime.timedelta(days=frequency)
last_window_day += datetime.timedelta(days=frequency)
return time_chunks


def aggregate(project, output_dir, timeframe, aggregate_by, force_aggregate):
def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate):
"""
Aggregates the results of the mapping process for the given project and timeframe. The results are saved in a csv
file in the project's output directory. Note that the output file is created (just with the headers) even if there
is no data to aggregate.
:param project: the name of the project
:param output_dir: the path to the general output directory
:param timeframe: a tuple of (start_date, end_date) where each date is a datetime.date object
:param aggregate_by: the granularity that will be used for the analysis. It can be one of: day, week, month,
year, all
:param estimation_window: int or None. The number of days to use for aggregating the data (i.e. counting all the
blocks produced by the entity within estimation_window days). If None, the entire timeframe will be considered
as one chunk.
:param frequency: int or None. The number of days to consider for the frequency of the analysis (i.e. the number
of days between each data point considered in the analysis). If None, only one data point will be considered,
spanning the entire timeframe (i.e. it needs to be combined with None estimation_window).
:param force_aggregate: bool. If True, then the aggregation will be performed, regardless of whether aggregated
data for the project and specified granularity already exist
data for the project and specified window / frequency already exist
:returns: a list of strings that correspond to the time chunks of the aggregation or None if no aggregation took
place (the corresponding output file already existed and force_aggregate was set to False)
"""
if estimation_window is not None:
if timeframe[0] + datetime.timedelta(days=estimation_window - 1) > timeframe[1]:
raise ValueError('The estimation window is too large for the given timeframe')

project_io_dir = output_dir / project
aggregator = Aggregator(project, project_io_dir)

filename = hlp.get_blocks_per_entity_filename(aggregate_by=aggregate_by, timeframe=timeframe)
filename = hlp.get_blocks_per_entity_filename(timeframe=timeframe, estimation_window=estimation_window, frequency=frequency)
output_file = aggregator.aggregated_data_dir / filename

if not output_file.is_file() or force_aggregate:
logging.info(f'Aggregating {project} data..')
timeframe_chunks = divide_timeframe(timeframe=timeframe, granularity=aggregate_by)
timeframe_chunk_starts = hlp.format_time_chunks(time_chunks=timeframe_chunks, granularity=aggregate_by)
timeframe_chunks = divide_timeframe(timeframe=timeframe, estimation_window=estimation_window, frequency=frequency)
representative_dates = hlp.get_representative_dates(time_chunks=timeframe_chunks)
blocks_per_entity = defaultdict(dict)
for i, chunk in enumerate(timeframe_chunks):
chunk_start, chunk_end = chunk
t_chunk = timeframe_chunk_starts[i]
chunk_blocks_per_entity = aggregator.aggregate(chunk_start, chunk_end)
for entity, blocks in chunk_blocks_per_entity.items():
blocks_per_entity[entity][t_chunk] = blocks
blocks_per_entity[entity][representative_dates[i]] = blocks

hlp.write_blocks_per_entity_to_file(
output_dir=aggregator.aggregated_data_dir,
blocks_per_entity=blocks_per_entity,
time_chunks=timeframe_chunk_starts,
dates=representative_dates,
filename=filename
)
return timeframe_chunks
Expand Down
38 changes: 19 additions & 19 deletions consensus_decentralization/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,50 +43,50 @@ def analyze(projects, aggregated_data_filename, output_dir):
logging.info(f'Calculating {project} metrics')
aggregate_output[project] = {}
aggregated_data_dir = output_dir / project / 'blocks_per_entity'
time_chunks, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / aggregated_data_filename)
for time_chunk in time_chunks:
aggregate_output[project][time_chunk] = {}
dates, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / aggregated_data_filename)
for date in dates:
aggregate_output[project][date] = {}

chunks_with_blocks = set()
dates_with_blocks = set()
for block_values in blocks_per_entity.values():
for tchunk, nblocks in block_values.items():
for date, nblocks in block_values.items():
if nblocks > 0:
chunks_with_blocks.add(tchunk)
dates_with_blocks.add(date)

for row_index, time_chunk in enumerate(time_chunks):
time_chunk_blocks_per_entity = {}
for row_index, date in enumerate(dates):
date_blocks_per_entity = {}
if column_index == 0:
for metric_name, _, _ in metric_params:
csv_contents[metric_name].append([time_chunk])
if time_chunk in chunks_with_blocks:
csv_contents[metric_name].append([date])
if date in dates_with_blocks:
for entity, block_values in blocks_per_entity.items():
try:
time_chunk_blocks_per_entity[entity] = block_values[time_chunk]
date_blocks_per_entity[entity] = block_values[date]
except KeyError:
time_chunk_blocks_per_entity[entity] = 0
sorted_time_chunk_blocks = sorted(time_chunk_blocks_per_entity.values(), reverse=True)
date_blocks_per_entity[entity] = 0
sorted_date_blocks = sorted(date_blocks_per_entity.values(), reverse=True)

for metric_name, metric, param in metric_params:
func = eval(f'compute_{metric}')
if param:
result = func(sorted_time_chunk_blocks, param)
result = func(sorted_date_blocks, param)
else:
result = func(sorted_time_chunk_blocks)
result = func(sorted_date_blocks)
csv_contents[metric_name][row_index + 1].append(result)
aggregate_output[project][time_chunk][metric_name] = result
aggregate_output[project][date][metric_name] = result

for metric in metric_names:
with open(output_dir / f'{metric}.csv', 'w') as f:
csv_writer = csv.writer(f)
csv_writer.writerows(csv_contents[metric])

clustering_flag = hlp.get_config_data()['analyze_flags']['clustering']
aggregate_csv_output = [['ledger', 'snapshot_date', 'clustering'] + metric_names]
aggregate_csv_output = [['ledger', 'date', 'clustering'] + metric_names]
for project, timeframes in aggregate_output.items():
for time_chunk, results in timeframes.items():
for date, results in timeframes.items():
metric_values = [results[metric] for metric in metric_names]
if any(metric_values):
aggregate_csv_output.append([project, time_chunk, clustering_flag] + metric_values)
aggregate_csv_output.append([project, date, clustering_flag] + metric_values)
with open(output_dir / 'output.csv', 'w') as f:
csv_writer = csv.writer(f)
csv_writer.writerows(aggregate_csv_output)
Expand Down
Loading

0 comments on commit 414b057

Please sign in to comment.