-
Notifications
You must be signed in to change notification settings - Fork 0
/
subject.py
99 lines (83 loc) · 1.78 KB
/
subject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Subject:
operators = []
subs = []
cachedItems = []
cachedNum = 0
completed = False
'''
__init__
'''
def __init__(self, *, cached = 0, init = []):
if (cached > 0):
self.cachedNum = cached
if (len(init)):
self.cachedItems = [*init[-cached:]]
'''
pipe
'''
def pipe(self, *ops):
self.operators.extend(ops)
return self
'''
next
'''
def next(self, value):
if self.completed:
return
self.__putInCache(value)
value = self.__applyOperators(value)
for sub in self.subs:
if isinstance(sub, type({})):
if 'next' in sub:
sub['next'](value)
else:
sub(value)
'''
complete
'''
def complete(self):
if self.completed:
return
self.completed = True
for sub in self.subs:
if isinstance(sub, type({})):
if 'complete' in sub:
sub['complete']()
'''
error
'''
def error(self, message):
if self.completed:
return
errorHandlers = 0
for sub in self.subs:
if isinstance(sub, type({})):
if 'error' in sub:
errorHandlers += 1
sub['error'](message)
if (errorHandlers == 0):
raise Exception(message)
'''
subscribe
'''
def subscribe(self, fn):
self.subs.append(fn)
if (self.cachedNum > 0 and len(self.cachedItems)):
for cachedVal in self.cachedItems:
self.next(cachedVal)
'''
__applyOperators
'''
def __applyOperators(self, value):
if (len(self.operators)):
for op in self.operators:
value = op(value)
return value
'''
__putInCache
'''
def __putInCache(self, value):
if (len(self.cachedItems) < self.cachedNum):
self.cachedItems.append(value)
else:
self.cachedItems = self.cachedItems[1:] + [value]