Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The asyncio integration breaks Faust #10323

Open
j00bar opened this issue Aug 21, 2024 · 2 comments
Open

The asyncio integration breaks Faust #10323

j00bar opened this issue Aug 21, 2024 · 2 comments

Comments

@j00bar
Copy link

j00bar commented Aug 21, 2024

Summary of problem

I'm trying to instrument workers for Faust, the successor to Celery in Python that does streaming message processing, developed at Robinhood.

When the asyncio integration is enabled, the Faust worker fails to start up properly.

2024-08-21T16:07:35.278760 [info     ] Configured ddtrace instrumentation for 1 integration(s). The following modules have been patched: asyncio [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:07:35.279820 [info     ] [^Worker]: Starting...         [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364271 [info     ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364537 [info     ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364807 [info     ] [^--Producer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.365019 [info     ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375441 [info     ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375681 [info     ] [^--Web]: Starting...          [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.389261 [info     ] [^---Server]: Starting...      [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.390258 [info     ] [^--Consumer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.391236 [info     ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.398984 [info     ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.400007 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403748 [info     ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403987 [info     ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.404264 [info     ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.407193 [info     ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.408952 [info     ] [^---Conductor]: Starting...   [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409188 [info     ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409804 [info     ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.410273 [info     ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.411220 [info     ] [^---Recovery]: Starting...    [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.412044 [debug    ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.413783 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.417529 [info     ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
/Users/jag/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py:1921: RuntimeWarning: coroutine 'flight_recorder._waiting' was never awaited
  handle = self._ready.popleft()
Object allocated at (most recent call last):
  File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/utils/logging.py", lineno 732
    self._fut = asyncio.ensure_future(self._waiting(), loop=self.loop)
2024-08-21T16:07:36.452266 [info     ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:07:36.454319 [info     ] generation id 1 app consumers id 1 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:07:36.459239 [info     ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.467772 [info     ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.468228 [info     ] [^---Fetcher]: Starting...     [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.469210 [info     ] [^---Recovery]: Worker ready   [faust.tables.recovery] func_name=log lineno=284 module=logging
 😊
/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py:726: RuntimeWarning: coroutine 'Event.wait' was never awaited
  pass
Object allocated at (most recent call last):
  File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py", lineno 723
    self._stopped.wait(), timeout=want_seconds(n)

Note the two RuntimeWarnings that certain coroutines were never awaited. At this point, while the Faust process is still running, none of the loops consuming events are actively running.

The two warnings come out of mode, a related service/worker creation library from the same authors of Faust. Links to those lines of code throwing the warnings are here:

Without asyncio, this is the output, where everything is working fine:

2024-08-21T16:12:43.720566 [info     ] Configured ddtrace instrumentation for 0 integration(s). The following modules have been patched:  [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:12:43.721608 [info     ] [^Worker]: Starting...         [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803652 [info     ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803908 [info     ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804161 [info     ] [^--Producer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804359 [info     ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813663 [info     ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813904 [info     ] [^--Web]: Starting...          [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.827304 [info     ] [^---Server]: Starting...      [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.828269 [info     ] [^--Consumer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.829140 [info     ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.836763 [info     ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.837944 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842077 [info     ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842338 [info     ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842608 [info     ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.845350 [info     ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847106 [info     ] [^---Conductor]: Starting...   [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847334 [info     ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847936 [info     ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.848366 [info     ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849170 [info     ] [^---Recovery]: Starting...    [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849724 [debug    ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.850974 [info     ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.853770 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.879137 [info     ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:12:44.880802 [info     ] generation id 3 app consumers id 3 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:12:44.883283 [info     ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.891936 [info     ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.892406 [info     ] [^---Fetcher]: Starting...     [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.893529 [info     ] [^---Recovery]: Worker ready   [faust.tables.recovery] func_name=log lineno=284 module=logging

Which version of dd-trace-py are you using?

dd-trace-py==2.11.1

Which version of pip are you using?

pip==24.2

Which libraries and their versions are you using?

`pip freeze` aioconsole==0.7.1 aiodogstatsd==0.16.0.post0 aiohappyeyeballs==2.3.2 aiohttp==3.10.0 aiohttp-cors==0.7.0 aiokafka==0.10.0 aiomonitor==0.7.0 aiosignal==1.3.1 alembic==1.13.2 annotated-types==0.7.0 anyio==4.4.0 aredis==1.1.8 async-timeout==4.0.3 asyncpg==0.29.0 attrs==23.2.0 build==1.2.1 bytecode==0.15.1 casefy==0.1.7 certifi==2024.7.4 charset-normalizer==3.3.2 click==8.1.7 colorlog==6.8.2 cramjam==2.8.3 croniter==2.0.7 dacite==1.8.1 dataclasses-avroschema==0.60.2 datadog==0.49.1 ddtrace==2.11.1 Deprecated==1.2.14 dill==0.3.8 dnspython==2.6.1 email_validator==2.2.0 envier==0.5.2 fastapi==0.111.1 fastapi-cli==0.0.4 fastavro==1.9.5 faust-streaming==0.11.2 faust-streaming-rocksdb==0.9.3 frozenlist==1.4.1 greenlet==3.0.3 h11==0.14.0 httpcore==1.0.5 httptools==0.6.1 httpx==0.27.0 humanize==4.10.0 idna==3.7 importlib_metadata==8.0.0 Inflector==3.1.1 iniconfig==2.0.0 intervaltree==3.1.0 janus==1.0.0 Jinja2==3.1.4 Mako==1.3.5 markdown-it-py==3.0.0 MarkupSafe==2.1.5 mdurl==0.1.2 mode-streaming==0.4.1 molotov==2.6 multidict==6.0.5 multiprocess==0.70.16 mypy==1.11.1 mypy-extensions==1.0.0 opentelemetry-api==1.26.0 opentracing==2.4.0 orjson==3.10.6 packaging==24.1 pip-tools==7.4.1 pluggy==1.5.0 prompt_toolkit==3.0.47 protobuf==5.27.3 psycopg2-binary==2.9.9 pydantic==2.8.2 pydantic-settings==2.4.0 pydantic_core==2.20.1 Pygments==2.18.0 pyproject_hooks==1.1.0 pytest==8.3.2 pytest-asyncio==0.23.8 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 python-multipart==0.0.9 python-snappy==0.7.2 pytz==2024.1 PyYAML==5.3.1 redis==5.0.8 requests==2.32.3 rich==13.7.1 shellingham==1.5.4 six==1.16.0 sniffio==1.3.1 sortedcontainers==2.4.0 SQLAlchemy==2.0.31 starlette==0.37.2 structlog==24.4.0 terminaltables==3.1.10 trafaret==2.1.1 typer==0.12.3 typing_extensions==4.12.2 urllib3==2.2.2 uvicorn==0.30.3 uvloop==0.19.0 venusian==3.1.0 watchfiles==0.22.0 wcwidth==0.2.13 websockets==12.0 wrapt==1.16.0 xmltodict==0.13.0 yarl==1.9.4 zipp==3.20.0

How can we reproduce your problem?

You can set up a simple Kafka compatible broker using Docker Compose:

services:
  redpanda:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      # Address the broker advertises to clients that connect to the Kafka API.
      # Use the internal addresses to connect to the Redpanda brokers'
      # from inside the same Docker network.
      # Use the external addresses to connect to the Redpanda brokers'
      # from outside the Docker network.
      - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      # Address the broker advertises to clients that connect to the HTTP Proxy.
      - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      # Redpanda brokers use the RPC API to communicate with each other internally.
      - --rpc-addr redpanda:33145
      - --advertise-rpc-addr redpanda:33145
      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      - --default-log-level=info
    image: docker.redpanda.com/redpandadata/redpanda:latest
    volumes:
      - redpanda:/var/lib/redpanda/data
    networks:
      - saladshooter
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  redpanda-console:
    image: docker.redpanda.com/redpandadata/console:latest
    networks:
      - saladshooter
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda:9092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda
volumes:
  redpanda:
    driver: local
networks:
  saladshooter:
    driver: bridge

Then you can make a placeholder Faust application like:

from typing import Any
import faust

app = faust.App(
    "saladshooter",
    broker="kafka://localhost:19092/",
    store="memory://")

ingestion_topic = app.topic("saladshooter_uiaction_ingestion", internal=True)

@app.agent(ingestion_topic)
async def ingestor(event_stream: faust.StreamT[Any]) -> None:
    async for event in event_stream:
        print(event)

def main() -> None:
    # I set up structlog configuration here.
    
    import ddtrace
    ddtrace.patch(asyncio=True)
    
    faust.Worker(app, loglevel="info").execute_from_commandline()

if __name__ == "__main__":
    main()
@emmettbutler
Copy link
Collaborator

Thank you for the detailed replication code @j00bar. We'll look into it.

cc @majorgreys

@j00bar
Copy link
Author

j00bar commented Aug 22, 2024

@emmettbutler Grateful to you for taking a look - and yeah, it's gotta be tough maintaining a wrapper library with literally infinite use cases and breadth of domain knowledge you'd have to assimilate, so glad I can make it easier to help us in this case!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants