Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Elasticsearch to 8 #33135

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 0 additions & 92 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2299,98 +2299,6 @@ kerberos:
type: boolean
example: ~
default: "True"
elasticsearch:
description: ~
options:
host:
description: |
Elasticsearch host
version_added: 1.10.4
type: string
example: ~
default: ""
log_id_template:
description: |
Format of the log_id, which is used to query for a given tasks logs
version_added: 1.10.4
type: string
example: ~
is_template: true
default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
end_of_log_mark:
description: |
Used to mark the end of a log stream for a task
version_added: 1.10.4
type: string
example: ~
default: "end_of_log"
frontend:
description: |
Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
Code will construct log_id using the log_id template from the argument above.
NOTE: scheme will default to https if one is not provided
version_added: 1.10.4
type: string
example: "http://localhost:5601/app/kibana#/discover\
?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))"
default: ""
write_stdout:
description: |
Write the task logs to the stdout of the worker, rather than the default files
version_added: 1.10.4
type: string
example: ~
default: "False"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
version_added: 1.10.4
type: string
example: ~
default: "False"
json_fields:
description: |
Log fields to also attach to the json output, if enabled
version_added: 1.10.4
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
host_field:
description: |
The field where host name is stored (normally either `host` or `host.name`)
version_added: 2.1.1
type: string
example: ~
default: "host"
offset_field:
description: |
The field where offset is stored (normally either `offset` or `log.offset`)
version_added: 2.1.1
type: string
example: ~
default: "offset"
index_patterns:
description: |
Comma separated list of index patterns to use when searching for logs (default: `_all`).
version_added: 2.6.0
type: string
example: something-*
default: "_all"
elasticsearch_configs:
description: ~
options:
use_ssl:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In elasticsearch 7, use_ssl is an accepted parameter when constructing ElasticSearch client. See the following source code :

https://github.com/elastic/elasticsearch-py/blob/7.14/elasticsearch/client/__init__.py#L113

However, in elasticsearch 8, it no longer accepts use_ssl parameter. See the following source code:

https://github.com/elastic/elasticsearch-py/blob/8.9/elasticsearch/_sync/client/__init__.py#L129

Therefore to make the testsuite compile with the ES8 , I use http_compress as the argument (which is one of the accepted arguments for constructing ES client

description: ~
version_added: 1.10.5
type: string
example: ~
default: "False"
verify_certs:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "True"
sensors:
description: ~
options:
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/elasticsearch/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
Changelog
---------

.. note::
Upgrade to Elasticsearch 8. The ElasticsearchTaskHandler & ElasticsearchSQLHook will now use Elasticsearch 8 package.
As explained https://elasticsearch-py.readthedocs.io/en/stable , Elasticsearch language clients are only backwards
compatible with default distributions and without guarantees made, we recommend upgrading the version of
Elasticsearch database to 8 to ensure compatibility with the language client.

5.0.0
.....

Expand Down
19 changes: 8 additions & 11 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch.exceptions import ElasticsearchException, NotFoundError
from elasticsearch.exceptions import NotFoundError

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(
json_fields: str,
host_field: str = "host",
offset_field: str = "offset",
host: str = "localhost:9200",
host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
Expand All @@ -101,8 +101,8 @@ def __init__(
super().__init__(base_log_folder, filename_template)
self.closed = False

self.client = elasticsearch.Elasticsearch(host.split(";"), **es_kwargs) # type: ignore[attr-defined]

self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type: ignore[attr-defined]
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know what error do we see if the protocol is not included in the set value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ES 8, when constructing a new ElasticSearch client, a full URL including http / https is expected, and you will get a ValueError when calling the __init__ like below:

image

if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
Expand Down Expand Up @@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str, metadata: dict) -> list | Elas
}

try:
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"]
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have here a type ignore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - if we look at the official ES package, the count API from the ElasticSearch class actually didn't have the body as parameter. See this link: https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/_sync/client/__init__.py#L801

But the body parameter is still accepted because there's a decorator at the beginning, which modifies the function to accept body as the argument. See this few lines : https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/_sync/client/__init__.py#L798-L800

Therefore, without type ignore, the pre-commit job mypy at provider will actually get failed because it thinks that body is not an accepted parameter (which actually is)

except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist", self.index_patterns)
raise e
except ElasticsearchException as e:
self.log.exception("Could not get current log size with log_id: %s", log_id)
raise e

logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
res = self.client.search(
res = self.client.search( # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question regarding type ignore

index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
logs = ElasticSearchResponse(self, res)
except elasticsearch.exceptions.ElasticsearchException:
self.log.exception("Could not read log with log_id: %s", log_id)
except Exception as err:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we not narrow down the exception we catch? Is the previous exception no longer present? If so, have they added any other similar class and can we use that?

Having such a broad level exception catch and not re-raising it might lead to some silent failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the exception elasticsearch.exceptions.ElasticsearchException is no longer present. Instead new class of exceptions are defined such as UnsupportedProductError, NotFoundError and so on. See this file:

https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/exceptions.py

And I feel like all those errors can occur when calling the ES API. So maybe we should raise the exception after logging to the error log ?

self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err)

return logs

Expand Down
96 changes: 95 additions & 1 deletion airflow/providers/elasticsearch/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ versions:
dependencies:
- apache-airflow>=2.4.0
- apache-airflow-providers-common-sql>=1.3.1
- elasticsearch>7,<7.15.0
- elasticsearch>8,<9

integrations:
- integration-name: Elasticsearch
Expand All @@ -72,3 +72,97 @@ connection-types:

logging:
- airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On top of also moving [elasticsearch] section (which I thin makes sense) it should be accompanied by adding the link to elastisearch provider configuration in the Airflow documentation: (following #32777).

When we have more of those, we might want to choose to do it automatically but for now we need to add it "manually"

config:
elasticsearch:
description: ~
options:
host:
description: |
Elasticsearch host
version_added: 1.10.4
type: string
example: ~
default: ""
log_id_template:
description: |
Format of the log_id, which is used to query for a given tasks logs
version_added: 1.10.4
type: string
example: ~
is_template: true
default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
end_of_log_mark:
description: |
Used to mark the end of a log stream for a task
version_added: 1.10.4
type: string
example: ~
default: "end_of_log"
frontend:
description: |
Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
Code will construct log_id using the log_id template from the argument above.
NOTE: scheme will default to https if one is not provided
version_added: 1.10.4
type: string
example: "http://localhost:5601/app/kibana#/discover\
?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))"
default: ""
write_stdout:
description: |
Write the task logs to the stdout of the worker, rather than the default files
version_added: 1.10.4
type: string
example: ~
default: "False"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
version_added: 1.10.4
type: string
example: ~
default: "False"
json_fields:
description: |
Log fields to also attach to the json output, if enabled
version_added: 1.10.4
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
host_field:
description: |
The field where host name is stored (normally either `host` or `host.name`)
version_added: 2.1.1
type: string
example: ~
default: "host"
offset_field:
description: |
The field where offset is stored (normally either `offset` or `log.offset`)
version_added: 2.1.1
type: string
example: ~
default: "offset"
index_patterns:
description: |
Comma separated list of index patterns to use when searching for logs (default: `_all`).
version_added: 2.6.0
type: string
example: something-*
default: "_all"
elasticsearch_configs:
description: ~
options:
http_compress:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "False"
verify_certs:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "True"
18 changes: 18 additions & 0 deletions docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. include:: ../exts/includes/providers-configurations-ref.rst
3 changes: 2 additions & 1 deletion docs/apache-airflow-providers-elasticsearch/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
:maxdepth: 1
:caption: References

Configuration <configurations-ref>
Python API <_api/airflow/providers/elasticsearch/index>

.. toctree::
Expand Down Expand Up @@ -103,7 +104,7 @@ PIP package Version required
======================================= ==================
``apache-airflow`` ``>=2.4.0``
``apache-airflow-providers-common-sql`` ``>=1.3.1``
``elasticsearch`` ``>7,<7.15.0``
``elasticsearch`` ``>8,<9``
======================================= ==================

Cross provider package dependencies
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/configurations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ in the provider's documentation. The pre-installed providers that you may want t
* :doc:`Configuration Reference for SMTP Provider <apache-airflow-providers-smtp:configurations-ref>`
* :doc:`Configuration Reference for IMAP Provider <apache-airflow-providers-imap:configurations-ref>`
* :doc:`Configuration Reference for OpenLineage Provider <apache-airflow-providers-openlineage:configurations-ref>`
* :doc:`Configuration Reference for Elasticsearch Provider <apache-airflow-providers-elasticsearch:configurations-ref>`

.. note::
For more information see :doc:`/howto/set-config`.
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
"elasticsearch>7,<7.15.0"
"elasticsearch>8,<9"
],
"cross-providers-deps": [
"common.sql"
Expand Down
44 changes: 41 additions & 3 deletions tests/providers/elasticsearch/log/elasticmock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,55 @@
"""Elastic mock module used for testing"""
from functools import wraps
from unittest.mock import patch

from elasticsearch.client.utils import _normalize_hosts
from urllib.parse import unquote, urlparse

from .fake_elasticsearch import FakeElasticsearch

ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}


def _normalize_hosts(hosts):
"""
Helper function to transform hosts argument to
:class:`~elasticsearch.Elasticsearch` to a list of dicts.
"""
# if hosts are empty, just defer to defaults down the line
if hosts is None:
return [{}]

hosts = [hosts]

out = []

for host in hosts:
if "://" not in host:
host = f"//{host}"

parsed_url = urlparse(host)
h = {"host": parsed_url.hostname}

if parsed_url.port:
h["port"] = parsed_url.port

if parsed_url.scheme == "https":
h["port"] = parsed_url.port or 443
h["use_ssl"] = True

if parsed_url.username or parsed_url.password:
h["http_auth"] = f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"

if parsed_url.path and parsed_url.path != "/":
h["url_prefix"] = parsed_url.path

out.append(h)
else:
out.append(host)
return out


def _get_elasticmock(hosts=None, *args, **kwargs):
host = _normalize_hosts(hosts)[0]
elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 9200)}"

if elastic_key in ELASTIC_INSTANCES:
connection = ELASTIC_INSTANCES.get(elastic_key)
Expand Down
Loading