-
Notifications
You must be signed in to change notification settings - Fork 0
/
mr_tools.py
233 lines (195 loc) · 6.71 KB
/
mr_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import sys
class LineReader(object):
"""A simple class to read lines from a File (like stdin) that
supports pushing lines back into the buffer"""
def __init__(self, stream):
self.stream = stream
self.pushed_back = []
def readline(self):
if self.pushed_back:
return self.pushed_back.pop()
else:
return self.stream.readline()
def push_back(self, line):
self.pushed_back.append(line)
def in_chunks(it, size=25):
chunk = []
it = iter(it)
try:
while True:
chunk.append(it.next())
if len(chunk) >= size:
yield chunk
chunk = []
except StopIteration:
if chunk:
yield chunk
def valiter(key, lr, firstline):
line = firstline
while line:
linevals = line.strip('\n').split('\t')
readkey, vals = linevals[0], linevals[1:]
if readkey == key:
yield vals
line = lr.readline()
else:
lr.push_back(line)
line = None
def keyiter(stream):
lr = LineReader(stream)
line = lr.readline()
while line:
key = line.strip('\n').split('\t',1)[0]
vi = valiter(key, lr, line)
yield key, vi
# read the rest of the valueiter before reading any more lines
try:
while vi.next():
pass
except StopIteration:
pass
line = lr.readline()
def status(msg, **opts):
if opts:
msg = msg % opts
sys.stderr.write("%s\n" % msg)
def emit(vals):
print '\t'.join(str(val) for val in vals)
def emit_all(vals):
for val in vals:
emit(val)
class Storage(dict):
def __getattr__(self, attr):
return self[attr]
def format_dataspec(msg, specs):
# spec() =:= name | (name, fn)
# specs =:= [ spec() ]
ret = Storage()
for val, spec in zip(msg, specs):
if isinstance(spec, basestring):
name = spec
ret[name] = val
else:
name, fn = spec
ret[name] = fn(val)
return Storage(**ret)
class dataspec_m(object):
def __init__(self, *specs):
self.specs = specs
def __call__(self, fn):
specs = self.specs
def wrapped_fn(args):
return fn(format_dataspec(args, specs))
return wrapped_fn
class dataspec_r(object):
def __init__(self, *specs):
self.specs = specs
def __call__(self, fn):
specs = self.specs
def wrapped_fn(key, msgs):
return fn(key, ( format_dataspec(msg, specs)
for msg in msgs ))
return wrapped_fn
def mr_map(process, fd = sys.stdin):
for line in fd:
vals = line.strip('\n').split('\t')
for res in process(vals):
emit(res)
def mr_reduce(process, fd = sys.stdin):
for key, vals in keyiter(fd):
for res in process(key, vals):
emit(res)
def mr_foldl(process, init, emit = False, fd = sys.stdin):
acc = init
for key, vals in keyiter(fd):
acc = process(key, vals, acc)
if emit:
emit(acc)
return acc
def mr_max(process, idx = 0, num = 10, emit = False, fd = sys.stdin):
"""a reducer that, in the process of reduction, only returns the
top N results"""
maxes = []
for key, vals in keyiter(fd):
for newvals in in_chunks(process(key, vals)):
for val in newvals:
if len(maxes) < num or newval[idx] > maxes[-1][idx]:
maxes.append(newval)
maxes.sort(reverse=True)
maxes = maxes[:num]
if emit:
emit_all(maxes)
return maxes
def mr_reduce_max_per_key(sort_key, post = None, num = 10, fd = sys.stdin):
def process(key, vals):
maxes = []
for val_chunk in in_chunks(vals, num):
maxes.extend(val_chunk)
maxes.sort(reverse=True, key=sort_key)
maxes = maxes[:num]
if post:
# if we were passed a "post" function, he takes
# responsibility for emitting
post(key, maxes)
else:
for item in maxes:
yield [key] + item
return mr_reduce(process, fd = fd)
def join_things(fields, deleted=False, spam=True):
"""A reducer that joins thing table dumps and data table dumps"""
def process(thing_id, vals):
data = {}
thing = None
for val in vals:
if val[0] == 'thing':
thing = format_dataspec(val,
['data_type', # e.g. 'thing'
'thing_type', # e.g. 'link'
'ups',
'downs',
'deleted',
'spam',
'timestamp'])
elif val[0] == 'data':
val = format_dataspec(val,
['data_type', # e.g. 'data'
'thing_type', # e.g. 'link'
'key', # e.g. 'sr_id'
'value'])
if val.key in fields:
data[val.key] = val.value
if (
# silently ignore if we didn't see the 'thing' row
thing is not None
# remove spam and deleted as appriopriate
and (deleted or thing.deleted == 'f')
and (spam or thing.spam == 'f')
# and silently ignore items that don't have all of the
# data that we need
and all(field in data for field in fields)):
yield ((thing_id, thing.thing_type, thing.ups, thing.downs,
thing.deleted, thing.spam, thing.timestamp)
+ tuple(data[field] for field in fields))
mr_reduce(process)
def dataspec_m_rel(*fields):
return dataspec_m(*((('rel_id', int),
'rel_type',
('thing1_id', int),
('thing2_id', int),
'name',
('timestamp', float))
+ fields))
def dataspec_m_thing(*fields):
return dataspec_m(*((('thing_id', int),
'thing_type',
('ups', int),
('downs', int),
('deleted', lambda x: x == 't'),
('spam', lambda x: x == 't'),
('timestamp', float))
+ fields))
if __name__ == '__main__':
for key, vals in keyiter(sys.stdin):
print key, vals
for val in vals:
print '\t', val