Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Elasticsearch to 8 #33135

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2379,7 +2379,7 @@ elasticsearch:
elasticsearch_configs:
description: ~
options:
use_ssl:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In elasticsearch 7, use_ssl is an accepted parameter when constructing ElasticSearch client. See the following source code :

https://github.com/elastic/elasticsearch-py/blob/7.14/elasticsearch/client/__init__.py#L113

However, in elasticsearch 8, it no longer accepts use_ssl parameter. See the following source code:

https://github.com/elastic/elasticsearch-py/blob/8.9/elasticsearch/_sync/client/__init__.py#L129

Therefore to make the testsuite compile with the ES8 , I use http_compress as the argument (which is one of the accepted arguments for constructing ES client

http_compress:
description: ~
version_added: 1.10.5
type: string
Expand Down
19 changes: 8 additions & 11 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch.exceptions import ElasticsearchException, NotFoundError
from elasticsearch.exceptions import NotFoundError

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(
json_fields: str,
host_field: str = "host",
offset_field: str = "offset",
host: str = "localhost:9200",
host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
Expand All @@ -101,8 +101,8 @@ def __init__(
super().__init__(base_log_folder, filename_template)
self.closed = False

self.client = elasticsearch.Elasticsearch(host.split(";"), **es_kwargs) # type: ignore[attr-defined]

self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type: ignore[attr-defined]
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know what error do we see if the protocol is not included in the set value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ES 8, when constructing a new ElasticSearch client, a full URL including http / https is expected, and you will get a ValueError when calling the __init__ like below:

image

if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
Expand Down Expand Up @@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str, metadata: dict) -> list | Elas
}

try:
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"]
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have here a type ignore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - if we look at the official ES package, the count API from the ElasticSearch class actually didn't have the body as parameter. See this link: https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/_sync/client/__init__.py#L801

But the body parameter is still accepted because there's a decorator at the beginning, which modifies the function to accept body as the argument. See this few lines : https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/_sync/client/__init__.py#L798-L800

Therefore, without type ignore, the pre-commit job mypy at provider will actually get failed because it thinks that body is not an accepted parameter (which actually is)

except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist", self.index_patterns)
raise e
except ElasticsearchException as e:
self.log.exception("Could not get current log size with log_id: %s", log_id)
raise e

logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
res = self.client.search(
res = self.client.search( # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question regarding type ignore

index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
logs = ElasticSearchResponse(self, res)
except elasticsearch.exceptions.ElasticsearchException:
self.log.exception("Could not read log with log_id: %s", log_id)
except Exception as err:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we not narrow down the exception we catch? Is the previous exception no longer present? If so, have they added any other similar class and can we use that?

Having such a broad level exception catch and not re-raising it might lead to some silent failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the exception elasticsearch.exceptions.ElasticsearchException is no longer present. Instead new class of exceptions are defined such as UnsupportedProductError, NotFoundError and so on. See this file:

https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/exceptions.py

And I feel like all those errors can occur when calling the ES API. So maybe we should raise the exception after logging to the error log ?

self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err)

return logs

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/elasticsearch/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ description: |

suspended: false
versions:
- 5.1.0
- 5.0.0
- 4.5.1
- 4.5.0
Expand Down Expand Up @@ -53,7 +54,7 @@ versions:
dependencies:
- apache-airflow>=2.4.0
- apache-airflow-providers-common-sql>=1.3.1
- elasticsearch>7,<7.15.0
- elasticsearch>8,<9

integrations:
- integration-name: Elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
"elasticsearch>7,<7.15.0"
"elasticsearch>8,<9"
],
"cross-providers-deps": [
"common.sql"
Expand Down
44 changes: 41 additions & 3 deletions tests/providers/elasticsearch/log/elasticmock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,55 @@
"""Elastic mock module used for testing"""
from functools import wraps
from unittest.mock import patch

from elasticsearch.client.utils import _normalize_hosts
from urllib.parse import unquote, urlparse

from .fake_elasticsearch import FakeElasticsearch

ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}


def _normalize_hosts(hosts):
"""
Helper function to transform hosts argument to
:class:`~elasticsearch.Elasticsearch` to a list of dicts.
"""
# if hosts are empty, just defer to defaults down the line
if hosts is None:
return [{}]

hosts = [hosts]

out = []

for host in hosts:
if "://" not in host:
host = f"//{host}"

parsed_url = urlparse(host)
h = {"host": parsed_url.hostname}

if parsed_url.port:
h["port"] = parsed_url.port

if parsed_url.scheme == "https":
h["port"] = parsed_url.port or 443
h["use_ssl"] = True

if parsed_url.username or parsed_url.password:
h["http_auth"] = f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"

if parsed_url.path and parsed_url.path != "/":
h["url_prefix"] = parsed_url.path

out.append(h)
else:
out.append(host)
return out


def _get_elasticmock(hosts=None, *args, **kwargs):
host = _normalize_hosts(hosts)[0]
elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 9200)}"

if elastic_key in ELASTIC_INSTANCES:
connection = ELASTIC_INSTANCES.get(elastic_key)
Expand Down
26 changes: 11 additions & 15 deletions tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
import json

from elasticsearch import Elasticsearch
from elasticsearch.client.utils import query_params
from elasticsearch.exceptions import NotFoundError

from .utilities import get_random_id
from .utilities import MissingIndexException, get_random_id, query_params

#
# The MIT License (MIT)
Expand Down Expand Up @@ -53,7 +52,7 @@ class FakeElasticsearch(Elasticsearch):
__documents_dict = None

def __init__(self):
super().__init__()
super().__init__("http://localhost:9200")
self.__documents_dict = {}

@query_params()
Expand Down Expand Up @@ -327,9 +326,8 @@ def get_source(self, index, doc_type, id, params=None):
"version",
)
def count(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index)
searchable_indexes = self._normalize_index_to_list(index, body)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)

i = 0
for searchable_index in searchable_indexes:
for document in self.__documents_dict[searchable_index]:
Expand Down Expand Up @@ -376,7 +374,7 @@ def count(self, index=None, doc_type=None, body=None, params=None, headers=None)
"version",
)
def search(self, index=None, doc_type=None, body=None, params=None, headers=None):
searchable_indexes = self._normalize_index_to_list(index)
searchable_indexes = self._normalize_index_to_list(index, body)

matches = self._find_match(index, doc_type, body)

Expand Down Expand Up @@ -446,7 +444,7 @@ def suggest(self, body, index=None):
return result_dict

def _find_match(self, index, doc_type, body):
searchable_indexes = self._normalize_index_to_list(index)
searchable_indexes = self._normalize_index_to_list(index, body)
searchable_doc_types = self._normalize_doc_type_to_list(doc_type)

must = body["query"]["bool"]["must"][0] # only support one must
Expand Down Expand Up @@ -477,19 +475,20 @@ def match_must_phrase(document, matches, must):
matches.append(document)

# Check index(es) exists.
def _validate_search_targets(self, targets):
def _validate_search_targets(self, targets, body):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
print(f"Loop over:::target = {target}")
if target == "_all" or target == "":
matches.update(self.__documents_dict)
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
raise NotFoundError(404, f"IndexMissingException[[{target}] missing]")
raise MissingIndexException(msg=f"IndexMissingException[[{target}] missing]", body=body)
return matches

def _normalize_index_to_list(self, index):
def _normalize_index_to_list(self, index, body):
# Ensure to have a list of index
if index is None:
searchable_indexes = self.__documents_dict.keys()
Expand All @@ -501,11 +500,8 @@ def _normalize_index_to_list(self, index):
# Is it the correct exception to use ?
raise ValueError("Invalid param 'index'")

return list(
self._validate_search_targets(
target for index in searchable_indexes for target in index.split(",")
)
)
generator = (target for index in searchable_indexes for target in index.split(","))
return list(self._validate_search_targets(generator, body))

@staticmethod
def _normalize_doc_type_to_list(doc_type):
Expand Down
Loading