-
Notifications
You must be signed in to change notification settings - Fork 801
/
multiprocess.py
170 lines (149 loc) · 7.39 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from collections import defaultdict
import glob
import json
import os
import warnings
from .metrics import Gauge
from .metrics_core import Metric
from .mmap_dict import MmapedDict
from .samples import Sample
from .utils import floatToGoString
try: # Python3
FileNotFoundError
except NameError: # Python >= 2.5
FileNotFoundError = IOError
class MultiProcessCollector:
"""Collector for files for multi-process mode."""
def __init__(self, registry, path=None):
if path is None:
# This deprecation warning can go away in a few releases when removing the compatibility
if 'prometheus_multiproc_dir' in os.environ and 'PROMETHEUS_MULTIPROC_DIR' not in os.environ:
os.environ['PROMETHEUS_MULTIPROC_DIR'] = os.environ['prometheus_multiproc_dir']
warnings.warn("prometheus_multiproc_dir variable has been deprecated in favor of the upper case naming PROMETHEUS_MULTIPROC_DIR", DeprecationWarning)
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR')
if not path or not os.path.isdir(path):
raise ValueError('env PROMETHEUS_MULTIPROC_DIR is not set or not a directory')
self._path = path
if registry:
registry.register(self)
@staticmethod
def merge(files, accumulate=True):
"""Merge metrics from given mmap files.
By default, histograms are accumulated, as per prometheus wire format.
But if writing the merged data back to mmap files, use
accumulate=False to avoid compound accumulation.
"""
metrics = MultiProcessCollector._read_metrics(files)
return MultiProcessCollector._accumulate_metrics(metrics, accumulate)
@staticmethod
def _read_metrics(files):
metrics = {}
key_cache = {}
def _parse_key(key):
val = key_cache.get(key)
if not val:
metric_name, name, labels, help_text = json.loads(key)
labels_key = tuple(sorted(labels.items()))
val = key_cache[key] = (metric_name, name, labels, labels_key, help_text)
return val
for f in files:
parts = os.path.basename(f).split('_')
typ = parts[0]
try:
file_values = MmapedDict.read_all_values_from_file(f)
except FileNotFoundError:
if typ == 'gauge' and parts[1].startswith('live'):
# Files for 'live*' gauges can be deleted between the glob of collect
# and now (via a mark_process_dead call) so don't fail if
# the file is missing
continue
raise
for key, value, timestamp, _ in file_values:
metric_name, name, labels, labels_key, help_text = _parse_key(key)
metric = metrics.get(metric_name)
if metric is None:
metric = Metric(metric_name, help_text, typ)
metrics[metric_name] = metric
if typ == 'gauge':
pid = parts[2][:-3]
metric._multiprocess_mode = parts[1]
metric.add_sample(name, labels_key + (('pid', pid),), value, timestamp)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
return metrics
@staticmethod
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
sample_timestamps = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
name, labels, value, timestamp, exemplar, native_histogram_value = s
if metric.type == 'gauge':
without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid'))
if metric._multiprocess_mode in ('min', 'livemin'):
current = samples_setdefault(without_pid_key, value)
if value < current:
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('max', 'livemax'):
current = samples_setdefault(without_pid_key, value)
if value > current:
samples[without_pid_key] = value
elif metric._multiprocess_mode in ('sum', 'livesum'):
samples[without_pid_key] += value
elif metric._multiprocess_mode in ('mostrecent', 'livemostrecent'):
current_timestamp = sample_timestamps[without_pid_key]
timestamp = float(timestamp or 0)
if current_timestamp < timestamp:
samples[without_pid_key] = value
sample_timestamps[without_pid_key] = timestamp
else: # all/liveall
samples[(name, labels)] = value
elif metric.type == 'histogram':
# A for loop with early exit is faster than a genexpr
# or a listcomp that ends up building unnecessary things
for l in labels:
if l[0] == 'le':
bucket_value = float(l[1])
# _bucket
without_le = tuple(l for l in labels if l[0] != 'le')
buckets[without_le][bucket_value] += value
break
else: # did not find the `le` key
# _sum/_count
samples[(name, labels)] += value
else:
# Counter and Summary.
samples[(name, labels)] += value
# Accumulate bucket values.
if metric.type == 'histogram':
for labels, values in buckets.items():
acc = 0.0
for bucket, value in sorted(values.items()):
sample_key = (
metric.name + '_bucket',
labels + (('le', floatToGoString(bucket)),),
)
if accumulate:
acc += value
samples[sample_key] = acc
else:
samples[sample_key] = value
if accumulate:
samples[(metric.name + '_count', labels)] = acc
# Convert to correct sample format.
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
return metrics.values()
def collect(self):
files = glob.glob(os.path.join(self._path, '*.db'))
return self.merge(files, accumulate=True)
_LIVE_GAUGE_MULTIPROCESS_MODES = {m for m in Gauge._MULTIPROC_MODES if m.startswith('live')}
def mark_process_dead(pid, path=None):
"""Do bookkeeping for when one process dies in a multi-process setup."""
if path is None:
path = os.environ.get('PROMETHEUS_MULTIPROC_DIR', os.environ.get('prometheus_multiproc_dir'))
for mode in _LIVE_GAUGE_MULTIPROCESS_MODES:
for f in glob.glob(os.path.join(path, f'gauge_{mode}_{pid}.db')):
os.remove(f)