Skip to content

Commit

Permalink
Add dereference pushdown support in ElasticSearch.
Browse files Browse the repository at this point in the history
`equals` and `hashcode` are overridden in most of the `DecoderDescriptor` as `DecoderDescriptor` is a member of `ElasticsearchColumnHandle`, and `TestElasticsearchProjectionPushdownPlans` requires comparison between two `ElasticsearchColumnHandle`.
  • Loading branch information
striderarun committed Oct 25, 2024
1 parent fbd316a commit 42fd5aa
Show file tree
Hide file tree
Showing 29 changed files with 1,851 additions and 17 deletions.
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ The following table details all general configuration properties:
queries. Some deployments map Elasticsearch ports to a random public port
and enabling this property can help in these cases.
- `false`
* - `elasticsearch.projection-pushdown-enabled`
- Read only projected fields from row columns while performing `SELECT` queries
- `true`
:::

### Authentication
Expand Down
7 changes: 7 additions & 0 deletions plugin/trino-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.elasticsearch;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.plugin.elasticsearch.decoders.IdColumnDecoder;
import io.trino.plugin.elasticsearch.decoders.ScoreColumnDecoder;
Expand Down Expand Up @@ -86,7 +87,7 @@ public ColumnMetadata getMetadata()
public ColumnHandle getColumnHandle()
{
return new ElasticsearchColumnHandle(
name,
ImmutableList.of(name),
type,
elasticsearchType,
decoderDescriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
*/
package io.trino.plugin.elasticsearch;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.elasticsearch.client.IndexMetadata;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;

import java.util.List;

import static java.util.Objects.requireNonNull;

public record ElasticsearchColumnHandle(
String name,
List<String> path,
Type type,
IndexMetadata.Type elasticsearchType,
DecoderDescriptor decoderDescriptor,
Expand All @@ -29,12 +34,18 @@ public record ElasticsearchColumnHandle(
{
public ElasticsearchColumnHandle
{
requireNonNull(name, "name is null");
path = ImmutableList.copyOf(path);
requireNonNull(type, "type is null");
requireNonNull(elasticsearchType, "elasticsearchType is null");
requireNonNull(decoderDescriptor, "decoderDescriptor is null");
}

@JsonIgnore
public String name()
{
return Joiner.on('.').join(path);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public enum Security
private String truststorePassword;
private boolean ignorePublishAddress;
private boolean verifyHostnames = true;
private boolean projectionPushdownEnabled = true;

private Security security;

Expand Down Expand Up @@ -342,6 +343,19 @@ public ElasticsearchConfig setIgnorePublishAddress(boolean ignorePublishAddress)
return this;
}

public boolean isProjectionPushdownEnabled()
{
return projectionPushdownEnabled;
}

@Config("elasticsearch.projection-pushdown-enabled")
@ConfigDescription("Read only required fields from a row type")
public ElasticsearchConfig setProjectionPushdownEnabled(boolean projectionPushDownEnabled)

Check failure on line 353 in plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Found the field 'projectionPushdownEnabled' with the same name as the parameter 'projectionPushDownEnabled' but with different capitalization.

Check failure on line 353 in plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Found the field 'projectionPushdownEnabled' with the same name as the parameter 'projectionPushDownEnabled' but with different capitalization.
{
this.projectionPushdownEnabled = projectionPushDownEnabled;
return this;
}

@NotNull
public Optional<Security> getSecurity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSourceProvider;
Expand All @@ -24,10 +25,13 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

import java.util.List;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand All @@ -41,6 +45,7 @@ public class ElasticsearchConnector
private final ElasticsearchPageSourceProvider pageSourceProvider;
private final NodesSystemTable nodesSystemTable;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public ElasticsearchConnector(
Expand All @@ -49,14 +54,18 @@ public ElasticsearchConnector(
ElasticsearchSplitManager splitManager,
ElasticsearchPageSourceProvider pageSourceProvider,
NodesSystemTable nodesSystemTable,
Set<ConnectorTableFunction> connectorTableFunctions)
Set<ConnectorTableFunction> connectorTableFunctions,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.nodesSystemTable = requireNonNull(nodesSystemTable, "nodesSystemTable is null");
this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null"));
this.sessionProperties = sessionPropertiesProviders.stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
}

@Override
Expand Down Expand Up @@ -96,6 +105,12 @@ public Set<ConnectorTableFunction> getTableFunctions()
return connectorTableFunctions;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

@Override
public final void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.elasticsearch.client.ElasticsearchClient;
import io.trino.plugin.elasticsearch.ptf.RawQuery;
import io.trino.spi.function.table.ConnectorTableFunction;
Expand Down Expand Up @@ -49,6 +50,7 @@ protected void setup(Binder binder)
newOptionalBinder(binder, AwsSecurityConfig.class);
newOptionalBinder(binder, PasswordConfig.class);

newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(ElasticsearchSessionProperties.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(RawQuery.class).in(Scopes.SINGLETON);

install(conditionalModule(
Expand Down
Loading

0 comments on commit 42fd5aa

Please sign in to comment.