-
Notifications
You must be signed in to change notification settings - Fork 1
/
mrtask_f.py
43 lines (33 loc) · 1.55 KB
/
mrtask_f.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#Task 4
# (f) How does revenue vary over time? Calculate the average trip revenue per month - analysing it by hour of the day (day vs night) and the day of the week (weekday vs weekend).
from mrjob.job import MRJob # importing mrjob library
from datetime import datetime # importing datetime library
class AverageRevenueOverTime(MRJob): # extending the MRJob class
def parse_datetime(self, datetime_str): # parse datetime function
formats = ['%d-%m-%Y %H:%M:%S', '%d-%m-%Y %H:%M', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S']
for fmt in formats:
try:
return datetime.strptime(datetime_str, fmt)
except ValueError:
pass
raise ValueError('no valid date format found')
def mapper(self, _, line): # mapper function
# Skip the header line
if not line.startswith('VendorID'):
fields = line.split(',')
revenue = float(fields[16])
pickup_datetime = self.parse_datetime(fields[1])
month = pickup_datetime.month
hour = pickup_datetime.hour
weekday = pickup_datetime.weekday()
yield (month, hour, weekday), revenue
def reducer(self, key, values): # reducer function
total_revenue = 0
num_trips = 0
for revenue in values:
total_revenue += revenue
num_trips += 1
average_revenue = total_revenue / num_trips
yield key, average_revenue
if __name__ == '__main__': # main function
AverageRevenueOverTime.run() # calling the run function