Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pika): adding support for channel.consume instrumentation #2397

Merged
merged 14 commits into from
Apr 21, 2024

Conversation

galbash
Copy link
Contributor

@galbash galbash commented Apr 8, 2024

Description

An attempt at adding support for pika's channel.consume function (today we only support basic_get and basic_consume.
Fixes #1432

Currently works only as a global instrumentation (using PikaInstrumentor.instrument()) and not for a per-channel instrumentation (using PikaInstrumentor.instrument_channel(c)). This is due to an implementation difficulty.

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

  • Added unit tests
  • compared code since pika 0.12 to make sure it works the same way
  • Tested against the following real-world scenario code (replicates my use case):
import pika
import requests
from pika.adapters.blocking_connection import BlockingChannel


def do_consume(channel: BlockingChannel):
    method, properties, body = next(channel.consume("test_queue", auto_ack=True, ))
    return method, properties, body

def main():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    while True:
        method, property, body = do_consume(channel)
        print(method, property, body)
        res = requests.get('https://example.com')


if __name__ == '__main__':
  main()
  • Tested against the following real-world scenario code (replicates doc's use case):
import pika
import requests
from pika.adapters.blocking_connection import BlockingChannel

def main():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    for method, property, body in channel.consume("test_queue", auto_ack=True):
        print(method, property, body)
        res = requests.get('https://example.com')

if __name__ == '__main__':
  main()

Does This PR Require a Core Repo Change?

  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

@galbash galbash marked this pull request as draft April 8, 2024 11:47
@galbash galbash force-pushed the pika-consume branch 2 times, most recently from 6201424 to 336478a Compare April 8, 2024 13:40
@galbash galbash marked this pull request as ready for review April 8, 2024 15:24
@galbash galbash requested a review from a team April 8, 2024 15:24
Copy link
Member

@shalevr shalevr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this!

@shalevr shalevr merged commit ad06e70 into open-telemetry:main Apr 21, 2024
258 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

trace_id and span_id are 0 with channel.consume() generator function
5 participants