-
Notifications
You must be signed in to change notification settings - Fork 0
/
make-graph.py
105 lines (86 loc) · 3.68 KB
/
make-graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Generates a random directed graph; The number of outbound edges per node is
distributed normally.
The graph is saved in a file with the following format:
<number of nodes>\n
<number of edges>\n
one row for each edge u-->v: u<space>v
The total number of processes used is n worker + 1 writer + 1 master
best performances can be achieved by setting n to cpu count - 1 because
the writer and the master don't have significan cpu loads so they can
share the same cpu
The size of the nodes queue, with which the master passes work packs
to the workers, does affect the performances as well. It trades off
processing speed for ctrl-c responsiveness, as the workers have to
process all the packs in the queue before encountering the stop
signal.
"""
import click
import random
import multiprocessing as mp
import signal
import sys
def worker_process(in_queue, out_queue, num_nodes):
# ignore keyboard interrupts, the master will take care of them
signal.signal(signal.SIGINT, signal.SIG_IGN)
all_nodes = range(num_nodes) # generate this now and forever
pack = in_queue.get()
while pack is not None:
node, num_edges = pack
edges = random.sample(all_nodes, num_edges)
if edges: # some nodes won't have outbound edges
out_queue.put('\n'.join('%d %d' % (node, v) for v in edges))
pack = in_queue.get()
def writer_process(in_queue, fp):
# ignore keyboard interrupts, the master will take care of them
signal.signal(signal.SIGINT, signal.SIG_IGN)
anim = ['\\', '|', '/', '-']
i = 0
row = in_queue.get()
while row is not None:
fp.write(row)
fp.write('\n')
i += 1
if i % 100 == 0:
sys.stderr.write(' %d Nodes processed\r' % i)
if i % (len(anim) + 1) == 0:
sys.stderr.write('%s\r' % anim[i % len(anim)])
row = in_queue.get()
fp.flush()
@click.command()
@click.argument('out-file', type=click.File('w'))
@click.option('--nodes', '-n', default=10, help='How many nodes the graph will have')
@click.option('--degree-avg', '-d', default=4,
help='Average value of outbout edges per node')
@click.option('--degree-stdev', '-D', default=1,
help='Standard deviation of outbound edge count per node')
@click.option('--processes', '-p', default=mp.cpu_count()-1,
help='How many worker processes to use to generate the graph')
@click.option('--queue-size', '-q', default=1000, help='Size of the nodes queue.')
def main(out_file, nodes, degree_avg, degree_stdev, processes, queue_size):
"""
Generates a random directed graph. The number of outbound edges per node
is distributed normally.
"""
print >> sys.stderr, 'Generating edge counts...'
edges = [max(0, min(nodes, int(random.normalvariate(degree_avg, degree_stdev))))
for _ in range(nodes)]
print >> sys.stderr, 'Graph will have %d edges' % sum(edges)
out_file.write('%d\n%d\n' % (nodes, sum(edges)))
out_file.flush() # do not buffer, actually write this to file
in_queue, out_queue = mp.Queue(queue_size), mp.Queue()
processes = [mp.Process(target=worker_process, args=(in_queue, out_queue, nodes))
for _ in range(processes)]
processes.append(mp.Process(target=writer_process, args=(out_queue, out_file)))
[p.start() for p in processes]
try:
for node, num_edges in enumerate(edges):
in_queue.put((node, num_edges))
except KeyboardInterrupt:
print 'Workers will stop after they process the next %d nodes' % queue_size
[in_queue.put(None) for _ in processes]
[p.join() for p in processes[:-1]]
out_queue.put(None)
processes[-1].join()
if __name__ == '__main__':
main()