Skip to content

Commit

Permalink
Fix LLM Agents Kafka pipeline (#1793)
Browse files Browse the repository at this point in the history
* Simple agents pipeline moved to `common.py` to be shared with the kafka pipeline
* Add `--bootstrap_servers` and `--topic` flags.
* Fix mis-named "Upload rate" monitor description with "Agent rate"

Closes #1791

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1793
  • Loading branch information
dagardner-nv authored Jul 3, 2024
1 parent 3e067c2 commit 84efdc6
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 106 deletions.
1 change: 0 additions & 1 deletion docs/source/extra_info/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ limitations under the License.

- TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/issues/1641))
- vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/issues/1649))
- LLM Agents Kafka pipeline is broken ([#1791](https://github.com/nv-morpheus/Morpheus/issues/1791))

Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues)
10 changes: 8 additions & 2 deletions examples/llm/agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ python examples/llm/main.py agents simple [OPTIONS]

### Run example (Kafka Pipeline):

> **Warning**: The Kafka Agents pipeline is currently broken [#1791](https://github.com/nv-morpheus/Morpheus/issues/1791)
The Kafka Example in the Morpheus LLM Agents demonstrates an streaming implementation, utilizing Kafka messages to
facilitate the near real-time processing of LLM queries. This example is similar to the Simple example but makes use of
a KafkaSourceStage to stream and retrieve messages from the Kafka topic
Expand Down Expand Up @@ -202,6 +200,14 @@ python examples/llm/main.py agents kafka [OPTIONS]
- **Description**: The name of the model to use in OpenAI.
- **Default**: `gpt-3.5-turbo-instruct`

- `--bootstrap_servers TEXT`
- **Description**: The Kafka bootstrap servers to connect to, if undefined the client will attempt to infer the bootrap servers from the environment.
- **Default**: `auto`

- `--topic TEXT`
- **Description**: The Kafka topic to listen to for input messages.
- **Default**: `input`

- `--help`
- **Description**: Show the help message with options and commands details.

Expand Down
81 changes: 81 additions & 0 deletions examples/llm/agents/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAI

from morpheus.config import Config
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAI(model=model_name, temperature=0.0, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine


def build_common_pipeline(config: Config, pipe: LinearPipeline, task_payload: dict,
model_name: str) -> InMemorySinkStage:
"""
Construct the elements of the pipeline common to the simple and kafka agent pipelines.
This method should be called after the source stage has been set.
"""
pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))

pipe.add_stage(MonitorStage(config, description="Agent rate", unit="events", delayed_start=True))

return sink
62 changes: 10 additions & 52 deletions examples/llm/agents/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,22 @@
import logging
import time

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAIChat

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAIChat(model_name=model_name, model_kwargs={"temperature": 0.0}, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:
from .common import build_common_pipeline

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler())

return engine
logger = logging.getLogger(__name__)


def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, model_name: str) -> float:
def pipeline(num_threads: int,
pipeline_batch_size: int,
model_max_batch_size: int,
model_name: str,
bootstrap_servers: str,
topic: str) -> float:
config = Config()
config.mode = PipelineModes.OTHER

Expand All @@ -78,18 +45,9 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i

pipe = LinearPipeline(config)

pipe.set_source(KafkaSourceStage(config, bootstrap_servers="auto", input_topic=["input"]))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

# pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))
pipe.set_source(KafkaSourceStage(config, bootstrap_servers=bootstrap_servers, input_topic=[topic]))

# pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True))
sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name)

start_time = time.time()

Expand Down
14 changes: 14 additions & 0 deletions examples/llm/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ def simple(**kwargs):
default='gpt-3.5-turbo-instruct',
help="The name of the model to use in OpenAI",
)
@click.option(
"--bootstrap_servers",
required=True,
type=str,
default='auto',
help="The Kafka bootstrap servers to connect to",
)
@click.option(
"--topic",
required=True,
type=str,
default='input',
help="The Kafka topic to listen to for input messages",
)
def kafka(**kwargs):

from .kafka_pipeline import pipeline as _pipeline
Expand Down
54 changes: 3 additions & 51 deletions examples/llm/agents/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,17 @@
import logging
import time

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAI

import cudf

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAI(model=model_name, temperature=0.0, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:
from .common import build_common_pipeline

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine
logger = logging.getLogger(__name__)


def pipeline(
Expand Down Expand Up @@ -95,16 +56,7 @@ def pipeline(

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))

pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True))
sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name)

start_time = time.time()

Expand Down

0 comments on commit 84efdc6

Please sign in to comment.