Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mocked test & benchmark for LLM agents pipeline #1424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4eaac3f
List of UN member nations based upon https://www.un.org/dgacm/sites/w…
dagardner-nv Dec 5, 2023
0c43db8
Add heading row
dagardner-nv Dec 5, 2023
357e657
fix heading row case
dagardner-nv Dec 5, 2023
b5f4b4a
Add flags to read in the input from a file, and optionally shuffle th…
dagardner-nv Dec 6, 2023
9e8d4ca
Move nemo and openai fixtures up a level to be shared with benchmarks
dagardner-nv Dec 6, 2023
575c805
Fix integration test, responses are prefixed with text
dagardner-nv Dec 6, 2023
a8876db
Extend existing mock_chat_completion and mock_nemollm fixtures to ran…
dagardner-nv Dec 6, 2023
8e0f99c
Call asyncio.get_event_loop() rather than using the event_loop fixtur…
dagardner-nv Dec 7, 2023
e58aeb5
Add missing --run_benchmark flag
dagardner-nv Dec 7, 2023
2918bd6
Add benchmark for the completion pipeline
dagardner-nv Dec 7, 2023
477ca6d
Rename benchmark to avoid name clash
dagardner-nv Dec 7, 2023
a03e659
Remove unused importd
dagardner-nv Dec 7, 2023
fe12b9c
Specify string as a regex string
dagardner-nv Dec 7, 2023
e58cd92
Rename benchmark to match existing names
dagardner-nv Dec 7, 2023
ed212ae
Rename test
dagardner-nv Dec 7, 2023
9a60b14
WIP: update docker run
dagardner-nv Dec 7, 2023
4a0ec7e
WIP: benchmark for vdb upload pipeline
dagardner-nv Dec 7, 2023
f97432d
WIP
dagardner-nv Dec 8, 2023
6e01776
WIP
dagardner-nv Dec 8, 2023
5771492
Cleanups
dagardner-nv Dec 11, 2023
2325315
Update docs
dagardner-nv Dec 11, 2023
4ee5e47
Replace usage of random numbers with a fixed sleep based on average o…
dagardner-nv Dec 11, 2023
5acbb2e
Remove unused import
dagardner-nv Dec 11, 2023
bc675da
Merge branch 'david-24.03-llm-bench' into david-24.03-llm-bench-vdb_u…
dagardner-nv Dec 11, 2023
64e52a0
Merge branch 'branch-24.03' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Dec 11, 2023
9a9d5c7
Move populate_milvus helper method to be shared
dagardner-nv Dec 11, 2023
49acded
Fix LLMService type-hints
dagardner-nv Dec 11, 2023
e072be9
Benchmark for stand-alone rag pipeline
dagardner-nv Dec 11, 2023
6093394
WIP
dagardner-nv Dec 12, 2023
7c58396
Move the impl for the import_mod fixture to it's own method allowing …
dagardner-nv Dec 12, 2023
132f9be
WIP: not working
dagardner-nv Dec 12, 2023
62cb2f4
Don't use the load_tools method for invoking serpapi, as it makes it …
dagardner-nv Dec 12, 2023
7861d06
Add comment explaining the explicit tool creation
dagardner-nv Dec 12, 2023
45a7e28
Revert "Move the impl for the import_mod fixture to it's own method a…
dagardner-nv Dec 12, 2023
11d98e6
Add benchmark for agent pipeline
dagardner-nv Dec 12, 2023
5d331b5
Remove out of date comment
dagardner-nv Dec 13, 2023
a620d33
Ignore unused arguments to mocked methods
dagardner-nv Dec 13, 2023
4b196a1
wip
dagardner-nv Dec 13, 2023
05225f0
Merge branch 'branch-24.03' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Dec 13, 2023
1f3b3a9
Allow mocked sleep times to be overidden with an env variable
dagardner-nv Dec 13, 2023
604872a
WIP
dagardner-nv Dec 13, 2023
b9a4335
Merge branch 'david-24.03-llm-bench-vdb_upload' into david-24.03-llm-…
dagardner-nv Dec 13, 2023
69423f0
Mocked sleep times are now fixtures which can be overriden with an en…
dagardner-nv Dec 13, 2023
c32f599
Merge branch 'branch-24.03' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions tests/benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,38 @@ def pytest_benchmark_update_json(config, benchmarks, output_json):
bench['stats']['median_throughput_bytes'] = (byte_count * repeat) / bench['stats']['median']


@pytest.fixture(name="mock_openai_request_time")
def mock_openai_request_time_fixture():
return float(os.environ.get("MOCK_OPENAI_REQUEST_TIME", 1.265))


@pytest.fixture(name="mock_nemollm_request_time")
def mock_nemollm_request_time_fixture():
return float(os.environ.get("MOCK_NEMOLLM_REQUEST_TIME", 0.412))


@pytest.fixture(name="mock_web_scraper_request_time")
def mock_web_scraper_request_time_fixture():
return float(os.environ.get("MOCK_WEB_SCRAPER_REQUEST_TIME", 0.5))


@pytest.fixture(name="mock_feedparser_request_time")
def mock_feedparser_request_time_fixture():
return float(os.environ.get("MOCK_FEEDPARSER_REQUEST_TIME", 0.5))


@pytest.fixture(name="mock_serpapi_request_time")
def mock_serpapi_request_time_fixture():
return float(os.environ.get("MOCK_SERPAPI_REQUEST_TIME", 1.7))


@pytest.mark.usefixtures("openai")
@pytest.fixture(name="mock_chat_completion")
@pytest.mark.usefixtures()
def mock_chat_completion_fixture(mock_chat_completion: mock.MagicMock):
sleep_time = float(os.environ.get("MOCK_OPENAI_REQUEST_TIME", 1.265))
def mock_chat_completion_fixture(mock_chat_completion: mock.MagicMock, mock_openai_request_time: float):

async def sleep_first(*args, **kwargs):
# Sleep time is based on average request time
await asyncio.sleep(sleep_time)
await asyncio.sleep(mock_openai_request_time)
return mock.DEFAULT

mock_chat_completion.acreate.side_effect = sleep_first
Expand All @@ -95,13 +118,12 @@ async def sleep_first(*args, **kwargs):

@pytest.mark.usefixtures("nemollm")
@pytest.fixture(name="mock_nemollm")
def mock_nemollm_fixture(mock_nemollm: mock.MagicMock):
sleep_time = float(os.environ.get("MOCK_NEMOLLM_REQUEST_TIME", 0.412))
def mock_nemollm_fixture(mock_nemollm: mock.MagicMock, mock_nemollm_request_time: float):

# The generate function is a blocking call that returns a future when return_type="async"
async def sleep_first(fut: asyncio.Future, value: typing.Any = mock.DEFAULT):
# Sleep time is based on average request time
await asyncio.sleep(sleep_time)
await asyncio.sleep(mock_nemollm_request_time)
fut.set_result(value)

def create_future(*args, **kwargs) -> asyncio.Future:
Expand Down
174 changes: 174 additions & 0 deletions tests/benchmarks/test_bench_agents_simple_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import collections.abc
import os
import typing
from unittest import mock

import langchain
import pytest
from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.tools import Tool
from langchain.utilities import serpapi

import cudf

from morpheus.config import Config
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage


def _build_agent_executor(model_name: str):

llm = langchain.OpenAI(model=model_name, temperature=0, cache=False)

# Explicitly construct the serpapi tool, loading it via load_tools makes it too difficult to mock
tools = [
Tool(
name="Search",
description="",
func=serpapi.SerpAPIWrapper().run,
coroutine=serpapi.SerpAPIWrapper().arun,
)
]
tools.extend(load_tools(["llm-math"], llm=llm))

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str):

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine


def _run_pipeline(config: Config, source_dfs: list[cudf.DataFrame], model_name: str = "test_model"):
completion_task = {"task_type": "completion", "task_dict": {"input_keys": ["questions"]}}

pipe = LinearPipeline(config)

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

pipe.add_stage(InMemorySinkStage(config))

pipe.run()


@pytest.mark.usefixtures("openai", "restore_environ")
@pytest.mark.use_python
@pytest.mark.benchmark
@mock.patch("langchain.utilities.serpapi.SerpAPIWrapper.aresults")
@mock.patch("langchain.OpenAI._agenerate", autospec=True) # autospec is needed as langchain will inspect the function
def test_agents_simple_pipe(mock_openai_agenerate: mock.AsyncMock,
mock_serpapi_aresults: mock.AsyncMock,
mock_openai_request_time: float,
mock_serpapi_request_time: float,
benchmark: collections.abc.Callable[[collections.abc.Callable], typing.Any],
config: Config):
os.environ.update({'OPENAI_API_KEY': 'test_api_key', 'SERPAPI_API_KEY': 'test_api_key'})

from langchain.schema import Generation
from langchain.schema import LLMResult

assert serpapi.SerpAPIWrapper().aresults is mock_serpapi_aresults

model_name = "test_model"

mock_responses = [
LLMResult(generations=[[
Generation(text="I should use a search engine to find information about unittests.\n"
"Action: Search\nAction Input: \"unittests\"",
generation_info={
'finish_reason': 'stop', 'logprobs': None
})
]],
llm_output={
'token_usage': {}, 'model_name': model_name
}),
LLMResult(generations=[[
Generation(text="I now know the final answer.\nFinal Answer: 3.99.",
generation_info={
'finish_reason': 'stop', 'logprobs': None
})
]],
llm_output={
'token_usage': {}, 'model_name': model_name
})
]

async def _mock_openai_agenerate(self, *args, **kwargs): # pylint: disable=unused-argument
nonlocal mock_responses
call_count = getattr(self, '_unittest_call_count', 0)
response = mock_responses[call_count % 2]

# The OpenAI object will raise a ValueError if we attempt to set the attribute directly or use setattr
self.__dict__['_unittest_call_count'] = call_count + 1
await asyncio.sleep(mock_openai_request_time)
return response

mock_openai_agenerate.side_effect = _mock_openai_agenerate

async def _mock_serpapi_aresults(*args, **kwargs): # pylint: disable=unused-argument
await asyncio.sleep(mock_serpapi_request_time)
return {
'answer_box': {
'answer': '25 years', 'link': 'http://unit.test', 'people_also_search_for': []
},
'inline_people_also_search_for': [],
'knowledge_graph': {},
'organic_results': [],
'pagination': {},
'related_questions': [],
'related_searches': [],
'search_information': {},
'search_metadata': {},
'search_parameters': {},
'serpapi_pagination': None
}

mock_serpapi_aresults.side_effect = _mock_serpapi_aresults

source_df = cudf.DataFrame(
{"questions": ["Who is Leo DiCaprio's girlfriend? What is her current age raised to the 0.43 power?"]})

benchmark(_run_pipeline, config, source_dfs=[source_df], model_name=model_name)
6 changes: 2 additions & 4 deletions tests/benchmarks/test_bench_vdb_upload_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def _run_pipeline(config: Config,
@mock.patch('requests.Session')
def test_vdb_upload_pipe(mock_requests_session: mock.MagicMock,
mock_feedparser_http_get: mock.MagicMock,
mock_web_scraper_request_time: float,
mock_feedparser_request_time: float,
benchmark: collections.abc.Callable[[collections.abc.Callable], typing.Any],
config: Config,
milvus_server_uri: str,
Expand All @@ -106,8 +108,6 @@ def test_vdb_upload_pipe(mock_requests_session: mock.MagicMock,
with open(os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_web_responses.json'), encoding='utf-8') as fh:
web_responses = json.load(fh)

mock_web_scraper_request_time = float(os.environ.get("MOCK_WEB_SCRAPER_REQUEST_TIME", 0.5))

def mock_get_fn(url: str):
mock_response = mock.MagicMock()
mock_response.ok = True
Expand All @@ -119,8 +119,6 @@ def mock_get_fn(url: str):
mock_requests_session.return_value = mock_requests_session
mock_requests_session.get.side_effect = mock_get_fn

mock_feedparser_request_time = float(os.environ.get("MOCK_FEEDPARSER_REQUEST_TIME", 0.5))

def mock_feedparser_http_get_fn(*args, **kwargs): # pylint: disable=unused-argument
time.sleep(mock_feedparser_request_time)
# The RSS Parser expects a bytes string
Expand Down
Loading