Skip to content

Commit

Permalink
feat: improve LangWatch integration by introducing langchain callback…
Browse files Browse the repository at this point in the history
…s on the tracing service, and component and workflow span types (#3094)

* Improve LangWatch integration by introducing langchain callbacks on the tracing service, and component and workflow span types. Bump LangWatch to v0.1.14

* [autofix.ci] apply automated fixes

* Fix type checks

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
rogeriochaves and autofix-ci[bot] authored Aug 1, 2024
1 parent 1dd840c commit 916fca4
Show file tree
Hide file tree
Showing 21 changed files with 94 additions and 67 deletions.
16 changes: 6 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ langchain-nvidia-ai-endpoints = "0.1.6"
langchain-google-calendar-tools = "^0.0.1"
langchain-milvus = "^0.1.1"
crewai = {extras = ["tools"], version = "^0.36.0"}
langwatch = "^0.1.10"
langwatch = "^0.1.14"
langsmith = "^0.1.86"
yfinance = "^0.2.40"
langchain-google-community = "1.0.7"
Expand Down
9 changes: 7 additions & 2 deletions src/backend/base/langflow/base/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ async def run_agent(self, agent: AgentExecutor) -> Text:
self.chat_history = self.get_chat_history_data()
if self.chat_history:
input_dict["chat_history"] = data_to_messages(self.chat_history)
result = await agent.ainvoke(input_dict, config={"callbacks": [AgentAsyncHandler(self.log)]})
result = await agent.ainvoke(
input_dict, config={"callbacks": [AgentAsyncHandler(self.log)] + self.get_langchain_callbacks()}
)
self.status = result
if "output" not in result:
raise ValueError("Output key not found in result. Tried 'output'.")
Expand Down Expand Up @@ -141,7 +143,10 @@ async def run_agent(
input_dict: dict[str, str | list[BaseMessage]] = {"input": self.input_value}
if self.chat_history:
input_dict["chat_history"] = data_to_messages(self.chat_history)
result = await runnable.ainvoke(input_dict, config={"callbacks": [AgentAsyncHandler(self.log)]})

result = await runnable.ainvoke(
input_dict, config={"callbacks": [AgentAsyncHandler(self.log)] + self.get_langchain_callbacks()}
)
self.status = result
if "output" not in result:
raise ValueError("Output key not found in result. Tried 'output'.")
Expand Down
6 changes: 5 additions & 1 deletion src/backend/base/langflow/base/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ def get_chat_result(
inputs: Union[list, dict] = messages or {}
try:
runnable = runnable.with_config( # type: ignore
{"run_name": self.display_name, "project_name": self._tracing_service.project_name} # type: ignore
{
"run_name": self.display_name,
"project_name": self._tracing_service.project_name, # type: ignore
"callbacks": self.get_langchain_callbacks(),
}
)
if stream:
return runnable.stream(inputs) # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def invoke_chain(self) -> Message:
else:
chain = ConversationChain(llm=self.llm, memory=self.memory)

result = chain.invoke({"input": self.input_value})
result = chain.invoke({"input": self.input_value}, config={"callbacks": self.get_langchain_callbacks()})
if isinstance(result, dict):
result = result.get(chain.output_key, "") # type: ignore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class LLMCheckerChainComponent(LCChainComponent):

def invoke_chain(self) -> Message:
chain = LLMCheckerChain.from_llm(llm=self.llm)
response = chain.invoke({chain.input_key: self.input_value})
response = chain.invoke(
{chain.input_key: self.input_value}, config={"callbacks": self.get_langchain_callbacks()}
)
result = response.get(chain.output_key, "")
result = str(result)
self.status = result
Expand Down
4 changes: 3 additions & 1 deletion src/backend/base/langflow/components/chains/LLMMathChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class LLMMathChainComponent(LCChainComponent):

def invoke_chain(self) -> Message:
chain = LLMMathChain.from_llm(llm=self.llm)
response = chain.invoke({chain.input_key: self.input_value})
response = chain.invoke(
{chain.input_key: self.input_value}, config={"callbacks": self.get_langchain_callbacks()}
)
result = response.get(chain.output_key, "")
result = str(result)
self.status = result
Expand Down
2 changes: 1 addition & 1 deletion src/backend/base/langflow/components/chains/RetrievalQA.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def invoke_chain(self) -> Message:
return_source_documents=True,
)

result = runnable.invoke({"query": self.input_value})
result = runnable.invoke({"query": self.input_value}, config={"callbacks": self.get_langchain_callbacks()})

source_docs = self.to_data(result.get("source_documents", []))
result_str = str(result.get("result", ""))
Expand Down
4 changes: 3 additions & 1 deletion src/backend/base/langflow/components/chains/SQLGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def invoke_chain(self) -> Message:
raise ValueError("Prompt must contain `{question}` to be used with Natural Language to SQL.")
sql_query_chain = create_sql_query_chain(llm=self.llm, db=self.db, prompt=prompt_template, k=self.top_k)
query_writer: Runnable = sql_query_chain | {"query": lambda x: x.replace("SQLQuery:", "").strip()}
response = query_writer.invoke({"question": self.input_value})
response = query_writer.invoke(
{"question": self.input_value}, config={"callbacks": self.get_langchain_callbacks()}
)
query = response.get("query")
self.status = query
return query
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ def build(self, llm: LanguageModel, question: str, context: str, retries: int =
chain = prompt | llm
error_message = ""
for i in range(retries):
result = chain.invoke(dict(question=question, context=context, error_message=error_message))
result = chain.invoke(
dict(question=question, context=context, error_message=error_message),
config={"callbacks": self.get_langchain_callbacks()},
)
if isinstance(result, BaseMessage):
content = result.content
elif isinstance(result, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def build_base_retriever(self) -> Retriever: # type: ignore[type-var]

async def search_documents(self) -> List[Data]: # type: ignore
retriever = self.build_base_retriever()
documents = await retriever.ainvoke(self.search_query)
documents = await retriever.ainvoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
data = self.to_data(documents)
self.status = data
return data
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def build_base_retriever(self) -> Retriever: # type: ignore[type-var]

async def search_documents(self) -> List[Data]: # type: ignore
retriever = self.build_base_retriever()
documents = await retriever.ainvoke(self.search_query)
documents = await retriever.ainvoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
data = self.to_data(documents)
self.status = data
return data
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def build(

if not isinstance(query, str):
raise ValueError(f"Query type {type(query)} not supported.")
documents = self_query_retriever.invoke(input=input_text)
documents = self_query_retriever.invoke(input=input_text, config={"callbacks": self.get_langchain_callbacks()})
data = [Data.from_document(document) for document in documents]
self.status = data
return data
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def generate_response(
rerank_config=rerank_config,
)
rag = vectara.as_rag(config)
response = rag.invoke(self.search_query)
response = rag.invoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})

text_output = response["answer"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from langflow.graph.vertex.base import Vertex
from langflow.services.storage.service import StorageService
from langflow.services.tracing.service import TracingService
from langchain.callbacks.base import BaseCallbackHandler


class CustomComponent(BaseComponent):
Expand Down Expand Up @@ -528,3 +529,8 @@ def post_code_processing(self, new_frontend_node: dict, current_frontend_node: d
frontend_node=new_frontend_node, raw_frontend_node=current_frontend_node
)
return frontend_node

def get_langchain_callbacks(self) -> List["BaseCallbackHandler"]:
if self._tracing_service:
return self._tracing_service.get_langchain_callbacks()
return []
4 changes: 3 additions & 1 deletion src/backend/base/langflow/schema/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def to_lc_message(
contents = [{"type": "text", "text": text}]
for file_path in files:
image_template = ImagePromptTemplate()
image_prompt_value: ImagePromptValue = image_template.invoke(input={"path": file_path}) # type: ignore
image_prompt_value: ImagePromptValue = image_template.invoke(
input={"path": file_path}, config={"callbacks": self.get_langchain_callbacks()}
) # type: ignore
contents.append({"type": "image_url", "image_url": image_prompt_value.image_url})
human_message = HumanMessage(content=contents) # type: ignore
else:
Expand Down
4 changes: 3 additions & 1 deletion src/backend/base/langflow/schema/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ async def get_file_content_dicts(self):
content_dicts.append(file.to_content_dict())
else:
image_template = ImagePromptTemplate()
image_prompt_value: ImagePromptValue = image_template.invoke(input={"path": file})
image_prompt_value: ImagePromptValue = image_template.invoke(
input={"path": file}, config={"callbacks": self.get_langchain_callbacks()}
) # type: ignore
content_dicts.append({"type": "image_url", "image_url": image_prompt_value.image_url})
return content_dicts

Expand Down
5 changes: 5 additions & 0 deletions src/backend/base/langflow/services/tracing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
from langchain.callbacks.base import BaseCallbackHandler


class BaseTracer(ABC):
Expand Down Expand Up @@ -49,3 +50,7 @@ def end(
metadata: dict[str, Any] | None = None,
):
raise NotImplementedError

@abstractmethod
def get_langchain_callback(self) -> Optional["BaseCallbackHandler"]:
raise NotImplementedError
4 changes: 4 additions & 0 deletions src/backend/base/langflow/services/tracing/langsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

if TYPE_CHECKING:
from langflow.graph.vertex.base import Vertex
from langchain.callbacks.base import BaseCallbackHandler


class LangSmithTracer(BaseTracer):
Expand Down Expand Up @@ -158,3 +159,6 @@ def end(
self._run_tree.end(outputs=outputs, error=self._error_to_string(error))
self._run_tree.post()
self._run_link = self._run_tree.get_url()

def get_langchain_callback(self) -> Optional["BaseCallbackHandler"]:
return None
Loading

0 comments on commit 916fca4

Please sign in to comment.