-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
142 lines (116 loc) · 4.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys
import io
import uuid
import pytest
import itertools
import more_itertools
MIN_PYTHON = (3, 6)
if sys.version_info < MIN_PYTHON:
sys.exit('python %s.%s or later required' % MIN_PYTHON)
class ContextMarker():
def __init__(self, val, out):
self.val = val
self.out = out
def __enter__(self):
self.out.append(f'start_{self.val}')
return self
def __exit__(self, exception, value, traceback):
self.out.append(f'stop_{self.val}')
return False
def test_context_marker():
expected = ['start_z', 'working', 'stop_z']
buf = []
with ContextMarker('z', buf) as b:
buf.append('working')
assert buf == expected
class ScopedIterator():
def __init__(self, iterator, dispatch_table):
self.iter = more_itertools.peekable(iterator)
self.scope = self.scope_key = None
self.scope_dispatch = dispatch_table
self.sentinel = str(uuid.uuid4())
def __iter__(self):
return self
def __next__(self):
i = self.iter.peek(self.sentinel)
if i == self.sentinel:
# end of iteration stream
if self.scope is not None:
self.scope.__exit__(None, None, None)
self.scope = self.scope_key = None
elif self.scope is None and i in self.scope_dispatch:
# start of new scope
self.scope_key = i
self.scope = self.scope_dispatch[self.scope_key](self.scope_key)
self.scope.__enter__()
elif i != self.scope_key and i in self.scope_dispatch:
# change in scope
self.scope.__exit__(None, None, None)
self.scope_key = i
self.scope = self.scope_dispatch[self.scope_key](self.scope_key)
self.scope.__enter__()
elif i != self.scope_key and self.scope_key is not None and i not in self.scope_dispatch:
# stop scope, but don't start a new one
self.scope.__exit__(None, None, None)
self.scope = self.scope_key = None
return next(self.iter)
from collections import namedtuple
Fixture = namedtuple('Fixture', 'buf iterator expected dispatch_table')
def empty_stream():
return Fixture(list(), [], [], {})
def typical_stream():
'typical stream of sorted values, each value with an entry in the dispatch_table.'
def expected_vals():
return list(itertools.chain('A' * 3, 'B' * 2))
def vals_iter():
for v in expected_vals():
yield v
def expected_vals_with_markers():
return list(
itertools.chain.from_iterable((('start_A',),
'A' * 3,
('stop_A', 'start_B'),
'B' * 2,
('stop_B',))))
buf = list()
def a_scope_factory(marker):
return ContextMarker(marker, buf)
def b_scope_factory(marker):
return ContextMarker(marker, buf)
dispatch_table = { 'A': a_scope_factory,
'B': b_scope_factory }
return Fixture(buf, vals_iter(), expected_vals_with_markers(), dispatch_table)
def partial_coverage():
'a stream where the dispatch_table does not cover all elements'
def expected_vals():
return list(itertools.chain('B' * 1, 'C' * 3, 'D' * 2, 'E' * 1, 'F' * 2))
def vals_iter():
for v in expected_vals():
yield v
def expected_vals_with_markers():
return list(
itertools.chain.from_iterable((('B',
'start_C',),
'C' * 3,
('stop_C',),
'D' * 2,
('start_E', 'E', 'stop_E'),
'F' * 2)))
buf = list()
def c_scope_factory(marker):
return ContextMarker(marker, buf)
def e_scope_factory(marker):
return ContextMarker(marker, buf)
dispatch_table = { 'C': c_scope_factory,
'E': e_scope_factory }
return Fixture(buf, vals_iter(), expected_vals_with_markers(), dispatch_table)
@pytest.fixture
def fixtures():
return list((empty_stream(),
typical_stream(),
partial_coverage()))
def test_by_fixtures(fixtures):
for fix in fixtures:
for i in ScopedIterator(fix.iterator, fix.dispatch_table):
fix.buf.append(i)
assert fix.buf == fix.expected