Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for async custom collector #13

Open
hanikesn opened this issue May 18, 2018 · 11 comments
Open

Support for async custom collector #13

hanikesn opened this issue May 18, 2018 · 11 comments

Comments

@hanikesn
Copy link

The upstream client supports adding custom collectors. Adding support for async collectors would make it possible easily to gather metrics over http for example.

@hynek
Copy link
Owner

hynek commented May 25, 2018

What API do you have in mind? An async collect()? I don’t think that should be a slow operation?

@hanikesn
Copy link
Author

The problem is that I need to query external services via HTTP to aggregate the metrics. We could also run the collection in a separate task, but I'd prefer doing it the async way.

@hynek
Copy link
Owner

hynek commented May 28, 2018

My understanding is that the whole point of custom collectors is to collect data somewhere else (like you said in a separate thread) and then expose them using a custom collector.

prometheus_client calls it synchronously, I have no idea how you’d want to get that async? You’d have to run loop_run_until_complete on each collection? Bonus points for collect being a generator.

I’m not even arguing with you, I just don’t see a good way to achieve what you’d like to do?

@hanikesn
Copy link
Author

hanikesn commented May 29, 2018

So here's basically what I want to do:

class Collector(object):
    async def collect(self):
        async with session.get(url) as response:
            async for s in response.content:
                yield CounterMetricFamily('my_counter_total', 'Help text', labels=['foo'], value=s)

I'm not sure why a loop_run_until_complete would be needed separately here. The server already runs in a event loop. And I think it should be possible to collect with an async for loop.

Edit: Also according the official documentation metrics should be acquired just in time when possible: https://prometheus.io/docs/instrumenting/writing_exporters/#scheduling

@ofen
Copy link

ofen commented Oct 11, 2020

So here's basically what I want to do:

class Collector(object):
    async def collect(self):
        async with session.get(url) as response:
            async for s in response.content:
                yield CounterMetricFamily('my_counter_total', 'Help text', labels=['foo'], value=s)

I'm not sure why a loop_run_until_complete would be needed separately here. The server already runs in a event loop. And I think it should be possible to collect with an async for loop.

Edit: Also according the official documentation metrics should be acquired just in time when possible: https://prometheus.io/docs/instrumenting/writing_exporters/#scheduling

One of possible solution is to override generate_latest function and several CollectorRegistry class methods (change sync loops to async one):

class CustomRegistry(CollectorRegistry):
    async def collect(self):
        """Yields metrics from the collectors in the registry."""
        collectors = None
        ti = None
        with self._lock:
            collectors = copy.copy(self._collector_to_names)
            if self._target_info:
                ti = self._target_info_metric()
        if ti:
            yield ti
        for collector in collectors:
            async for metric in collector.collect():
                yield metric

    async def register(self, collector):
        """Add a collector to the registry."""
        with self._lock:
            names = await self._get_names(collector)
            duplicates = set(self._names_to_collectors).intersection(names)
            if duplicates:
                raise ValueError(
                    'Duplicated timeseries in CollectorRegistry: {0}'.format(
                        duplicates))
            for name in names:
                self._names_to_collectors[name] = collector
            self._collector_to_names[collector] = names

    async def _get_names(self, collector):
        """Get names of timeseries the collector produces."""
        desc_func = None
        # If there's a describe function, use it.
        try:
            desc_func = collector.describe
        except AttributeError:
            pass
        # Otherwise, if auto describe is enabled use the collect function.
        if not desc_func and self._auto_describe:
            desc_func = collector.collect

        if not desc_func:
            return []

        result = []
        type_suffixes = {
            'counter': ['_total', '_created'],
            'summary': ['', '_sum', '_count', '_created'],
            'histogram': ['_bucket', '_sum', '_count', '_created'],
            'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
            'info': ['_info'],
        }
        async for metric in desc_func():
            for suffix in type_suffixes.get(metric.type, ['']):
                result.append(metric.name + suffix)
        return result

def sample_line(line):
    if line.labels:
        labelstr = '{{{0}}}'.format(','.join(
            ['{0}="{1}"'.format(
                k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
                for k, v in sorted(line.labels.items())]))
    else:
        labelstr = ''
    timestamp = ''
    if line.timestamp is not None:
        # Convert to milliseconds.
        timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
    return '{0}{1} {2}{3}\n'.format(
        line.name, labelstr, floatToGoString(line.value), timestamp)

async def generate_latest(registry):
    """Returns the metrics from the registry in latest text format as a string."""
    output = []
    async for metric in registry.collect():
        try:
            mname = metric.name
            mtype = metric.type
            # Munging from OpenMetrics into Prometheus format.
            if mtype == 'counter':
                mname = mname + '_total'
            elif mtype == 'info':
                mname = mname + '_info'
                mtype = 'gauge'
            elif mtype == 'stateset':
                mtype = 'gauge'
            elif mtype == 'gaugehistogram':
                # A gauge histogram is really a gauge,
                # but this captures the structure better.
                mtype = 'histogram'
            elif mtype == 'unknown':
                mtype = 'untyped'

            output.append('# HELP {0} {1}\n'.format(
                mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0} {1}\n'.format(mname, mtype))

            om_samples = {}
            for s in metric.samples:
                for suffix in ['_created', '_gsum', '_gcount']:
                    if s.name == metric.name + suffix:
                        # OpenMetrics specific sample, put in a gauge at the end.
                        om_samples.setdefault(suffix, []).append(sample_line(s))
                        break
                else:
                    output.append(sample_line(s))
        except Exception as exception:
            exception.args = (exception.args or ('',)) + (metric,)
            raise

        for suffix, lines in sorted(om_samples.items()):
            output.append('# HELP {0}{1} {2}\n'.format(metric.name, suffix, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
            output.append('# TYPE {0}{1} gauge\n'.format(metric.name, suffix))
            output.extend(lines)
    return ''.join(output).encode('utf-8')

the custom collector class:

class CustomCollector(object):
    def __init__(self, app):
        self.session = app['session']
        self.metrics_url = app['metrics_url']

    async def collect(self):
        resp = await self.session.get(self.metrics_url)
        payload = await resp.json()
        for name, v in payload['metrics'].items():
            name = name.replace('.', '_')
            name = name.replace('-', '_')
            name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
            kind = v['kind']
            if kind == 'Gauge':
                m = GaugeMetricFamily(name, name, labels=[])
            if kind == 'Timer':
                m = SummaryMetricFamily(name, name, labels=[])
            if kind == 'Counter':
                m = CounterMetricFamily(name, name, labels=[])
            for i in v['values']:
                tags = i['tags']
                labels = {tag['key']: tag['value'] for tag in tags}
                value = i['values'][0]['v']
                timestamp = i['values'][0]['t']
                m.add_sample(name, labels, value, timestamp=timestamp)
            yield m

collector = CustomCollector(app)
registry = CustomRegistry(auto_describe=True)
await registry.register(collector)
await generate_latest(registry)

@Yamakaky
Copy link

Hello, is there a plan to have something similar to what @ofen suggest builtin? I have the following setup:

  • aiohttp server with rest api + prometheus exporter (async)
  • prometheus-async (sync collector)
  • aiohttp client (async)

I'm not sure where to go from here which doesn't imply having a copy of @ofen's collector and registry + reimplementing server_stats.

Maybe have both collect and async_collect methods in the async collector, that way the async registry stays compatible with sync stuff.

@hynek
Copy link
Owner

hynek commented Aug 15, 2023

What API would y'all like?

@hynek hynek closed this as not planned Won't fix, can't repro, duplicate, stale Jan 26, 2024
@Yamakaky
Copy link

What are you missing from what @hanikesn and @ofen are suggesting?

@hynek
Copy link
Owner

hynek commented Jan 29, 2024

Well, I asked for an API ya'll would like in August and got crickets so I've assumed the interest has waned?

@Yamakaky
Copy link

Yamakaky commented Feb 4, 2024

I think the main API is to have add an async generator collect_async() to the collector API, and maybe run both version in sequence. If that's too complicated to implement, maybe a separate AsyncCollector type. Do you want a PR with a first draft of implementation?

@hynek
Copy link
Owner

hynek commented Feb 7, 2024

Unless I'm missing something, we don't run any collect whatsoever. We just run prometheus_client's generate_latest.

So, unless we wanted to reimplement the official client, I guess the only way would be to somehow run generate_latest in a thread pool?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants