-
Notifications
You must be signed in to change notification settings - Fork 0
/
factories.py
88 lines (68 loc) · 2.32 KB
/
factories.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
import csv
import json
from typing import Protocol
from openpyxl import load_workbook
class BillProcessor(Protocol):
def calculate_total(self) -> float:
...
class JsonBillProcessor:
def __init__(self, filename):
self.filename = filename
self._doc = self._load_doc()
def _load_doc(self):
with open(self.filename, "r") as f:
doc = json.load(f)
return doc
def calculate_total(self) -> float:
return sum(row["Quantity"] * row["Price"] for row in self._doc)
class CsvBillProcessor:
def __init__(self, filename):
self.filename = filename
self._rows = self._load_rows()
def _load_rows(self):
rows = []
with open(self.filename, "r") as f:
rd = csv.DictReader(f)
for row in rd:
rows.append(row)
return rows
def calculate_total(self) -> float:
return sum(float(row["Qty"]) * float(row["Price"]) for row in self._doc)
class XlsxBillProcessor:
def __init__(self, filename):
self.filename = filename
self._rows = self._load_workbook()
def _load_workbook(self):
workbook = load_workbook(self.filename)
ws = workbook.worksheets[0]
header = []
rows = []
first = True
for row in ws.iter_rows():
if first:
for cell in row:
header.append(cell.value)
first = False
continue
values = []
for cell in row:
values.append(cell.value)
rows.append(values)
results = []
for row in rows:
results.append(dict(zip(header, row)))
return results
def calculate_total(self) -> float:
return sum(row["qty"] * row["Price"] for row in self._doc)
class BillProcessorFactory:
def create_bill_processor(self, filename):
_, ext = os.path.splitext(filename)
ext = ext[1:].title()
classname = f"{ext}BillProcessor"
BillProcessorClass = globals().get(classname)
if BillProcessorClass is None:
raise Exception("Unsupported format")
return BillProcessorClass(filename)
def calculate_total_monthly_bills(bills: list[BillProcessor]) -> float:
return sum(bill.calculate_total() for bill in bills)