Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Extract link retrieval from WebRetriever, introduce LinkContentFetcher #5227

Merged
merged 18 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/link_content_blog_post_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
from haystack.nodes import PromptNode, LinkContentFetcher, PromptTemplate
from haystack import Pipeline

openai_key = os.environ.get("OPENAI_API_KEY")
if not openai_key:
raise ValueError("Please set the OPENAI_API_KEY environment variable")

retriever = LinkContentFetcher()
pt = PromptTemplate(
"Given the paragraphs of the blog post, "
"provide the main learnings and the final conclusion using short bullet points format."
"\n\nParagraphs: {documents}"
)

prompt_node = PromptNode(
"gpt-3.5-turbo-16k-0613",
api_key=openai_key,
max_length=512,
default_prompt_template=pt,
model_kwargs={"stream": True},
)

pipeline = Pipeline()
pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])

blog_posts = [
"https://pythonspeed.com/articles/base-image-python-docker-images/",
"https://lilianweng.github.io/posts/2023-06-23-agent/",
]

for blog_post in blog_posts:
print(f"Blog post summary: {blog_post}")
pipeline.run(blog_post)
print("\n\n\n")
1 change: 1 addition & 0 deletions haystack/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
TfidfRetriever,
TableTextRetriever,
MultiModalRetriever,
LinkContentFetcher,
WebRetriever,
)

Expand Down
1 change: 1 addition & 0 deletions haystack/nodes/retriever/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
)
from haystack.nodes.retriever.multimodal import MultiModalRetriever
from haystack.nodes.retriever.sparse import BM25Retriever, FilterRetriever, TfidfRetriever
from haystack.nodes.retriever.link_content import LinkContentFetcher
from haystack.nodes.retriever.web import WebRetriever
202 changes: 202 additions & 0 deletions haystack/nodes/retriever/link_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import logging
from datetime import datetime
from http import HTTPStatus
from typing import Optional, Dict, List, Union, Callable, Any, Tuple
from urllib.parse import urlparse

import requests
from boilerpy3 import extractors
from requests import Response
from requests.exceptions import InvalidURL

from haystack import __version__
from haystack.nodes import PreProcessor, BaseComponent
from haystack.schema import Document, MultiLabel

logger = logging.getLogger(__name__)


def html_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]:
"""
Extracts content from the response text using the boilerpy3 extractor.
vblagoje marked this conversation as resolved.
Show resolved Hide resolved
:param response: Response object from the request.
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
"""
extractor = extractors.ArticleExtractor(raise_on_failure=raise_on_failure)
content = ""
try:
content = extractor.get_content(response.text)
except Exception as e:
if raise_on_failure:
raise e
return content


def pdf_content_handler(response: Response, raise_on_failure: bool = False) -> Optional[str]:
# TODO: implement this
return None


class LinkContentFetcher(BaseComponent):
"""
LinkContentFetcher fetches content from a URL and converts it into a list of Document objects.

LinkContentFetcher supports the following content types:
- HTML

"""

outgoing_edges = 1

REQUEST_HEADERS = {
"accept": "*/*",
"User-Agent": f"haystack/LinkContentFetcher/{__version__}",
"Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
"referer": "https://www.google.com/",
}

def __init__(self, processor: Optional[PreProcessor] = None, raise_on_failure: Optional[bool] = False):
"""

Creates a LinkContentFetcher instance.
:param processor: PreProcessor to apply to the extracted text
:param raise_on_failure: A boolean indicating whether to raise an exception when a failure occurs
during content extraction. If False, the error is simply logged and the program continues.
Defaults to False.
"""
super().__init__()
self.processor = processor
self.raise_on_failure = raise_on_failure
self.handlers: Dict[str, Callable] = {"html": html_content_handler, "pdf": pdf_content_handler}

def fetch(self, url: str, timeout: Optional[int] = 3, doc_kwargs: Optional[dict] = None) -> List[Document]:
"""
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
Fetches content from a URL and converts it into a list of Document objects.
:param url: URL to fetch content from.
:param timeout: Timeout in seconds for the request.
:param doc_kwargs: Optional kwargs to pass to the Document constructor.
:return: List of Document objects.
"""
if not url or not self._is_valid_url(url):
raise InvalidURL("Invalid or missing URL: {}".format(url))

doc_kwargs = doc_kwargs or {}
extracted_doc: Dict[str, Union[str, dict]] = {
"meta": {"url": url, "timestamp": int(datetime.utcnow().timestamp())}
}
extracted_doc.update(doc_kwargs)

response = self._get_response(url, timeout=timeout)
has_content = response.status_code == HTTPStatus.OK and response.text
fetched_documents = []
if has_content:
handler = "html" # will handle non-HTML content types soon, add content type resolution here
if handler in self.handlers:
extracted_content = self.handlers[handler](response, self.raise_on_failure)
if extracted_content:
extracted_doc["content"] = extracted_content
logger.debug("%s handler extracted content from %s", handler, url)
else:
logger.warning("%s handler failed to extract content from %s", handler, url)
text = extracted_doc.get("text", "")
extracted_doc["content"] = text
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand the meaning of this part.
Can you please explain? Why do we want to provide a default text for the document?
If really useful, it should be documented in some way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid question. See https://github.com/deepset-ai/haystack/pull/5229/files#r1261301963 for details. Whatever we agree on here as a fallback key - I'm fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see...

What puzzles me about the current strategy is that you can end up having both content and text (inside meta) for the same document.

How about explicitly naming it snippet_text and adding a short comment explaining this?

document = Document.from_dict(extracted_doc)

if self.processor:
fetched_documents = self.processor.process(documents=[document])
else:
fetched_documents = [document]
return fetched_documents

def run(
self,
query: Optional[str] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[MultiLabel] = None,
documents: Optional[List[Document]] = None,
meta: Optional[dict] = None,
) -> Tuple[Dict, str]:
"""
Fetches content from a URL specified by query parameter and converts it into a list of Document objects.

param query: The query - a URL to fetch content from.
param filters: Not used.
param top_k: Not used.
param labels: Not used.
param documents: Not used.
param meta: Not used.

return: List of Document objects.
"""
if not query:
raise ValueError("LinkContentFetcher run requires the `query` parameter")
documents = self.fetch(url=query)
return {"documents": documents}, "output_1"

def run_batch(
self,
queries: Optional[Union[str, List[str]]] = None,
file_paths: Optional[List[str]] = None,
labels: Optional[Union[MultiLabel, List[MultiLabel]]] = None,
documents: Optional[Union[List[Document], List[List[Document]]]] = None,
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
params: Optional[dict] = None,
debug: Optional[bool] = None,
):
"""
Takes a list of queries, where each query is expected to be a URL. For each query, the method
fetches content from the specified URL and transforms it into a list of Document objects. The output is a list
of these document lists, where each individual list of Document objects corresponds to the content retrieved
from a specific query URL.

param queries: List of queries - URLs to fetch content from.
param file_paths: Not used.
param labels: Not used.
param documents: Not used.
param meta: Not used.
param params: Not used.
param debug: Not used.

return: List of lists of Document objects.
"""
results = []
if isinstance(queries, str):
queries = [queries]
elif not isinstance(queries, list):
raise ValueError(
"LinkContentFetcher run_batch requires the `queries` parameter to be Union[str, List[str]]"
)
for query in queries:
results.append(self.fetch(url=query))

return {"documents": results}, "output_1"

def _get_response(self, url: str, timeout: Optional[int]) -> requests.Response:
"""
Fetches content from a URL. Returns a response object.
:param url: The URL to fetch content from.
:param timeout: The timeout in seconds.
:return: A response object.
"""
try:
response = requests.get(url, headers=LinkContentFetcher.REQUEST_HEADERS, timeout=timeout)
response.raise_for_status()
except Exception as e:
if self.raise_on_failure:
raise e

logger.warning("Couldn't retrieve content from %s", url)
response = requests.Response()
return response

def _is_valid_url(self, url: str) -> bool:
"""
Checks if a URL is valid.

:param url: The URL to check.
:return: True if the URL is valid, False otherwise.
"""

result = urlparse(url)
# schema is http or https and netloc is not empty
return all([result.scheme in ["http", "https"], result.netloc])
Loading