Skip to content

Commit

Permalink
Support Serialized Presto Page in Thrift UDF service
Browse files Browse the repository at this point in the history
PrestoThriftBlock has limited type support (only supports array<bigint> for
array types, no support for map types). The overall performance of Presto
SerializedPage is better than using PrestoThriftBLock.

Benchmark                                         (typeSignature)  Mode  Cnt        Score        Error  Units
BenchmarkThriftPageSerDe.testSerializedPageSerde          boolean  avgt   20   166566.169 ±  18267.340  ns/op
BenchmarkThriftPageSerDe.testSerializedPageSerde           bigint  avgt   20   216654.607 ±  32235.276  ns/op
BenchmarkThriftPageSerDe.testSerializedPageSerde          varchar  avgt   20  1250560.212 ± 176070.250  ns/op
BenchmarkThriftPageSerDe.testSerializedPageSerde    array<bigint>  avgt   20   488109.753 ±  55165.876  ns/op
BenchmarkThriftPageSerDe.testSerializedPageSerde   map<int, real>  avgt   20   633627.296 ±  73473.657  ns/op
BenchmarkThriftPageSerDe.testThriftPageSerde              boolean  avgt   20   335448.969 ±  22666.853  ns/op
BenchmarkThriftPageSerDe.testThriftPageSerde               bigint  avgt   20   432328.947 ±  46902.591  ns/op
BenchmarkThriftPageSerDe.testThriftPageSerde              varchar  avgt   20  1658012.817 ± 196150.353  ns/op
BenchmarkThriftPageSerDe.testThriftPageSerde        array<bigint>  avgt   20  1291312.373 ± 128077.209  ns/op
BenchmarkThriftPageSerDe.testThriftPageSerde       map<int, real>  avgt   20        2.971 ±      0.373  ns/op
  • Loading branch information
rongrong authored and Rongrong Zhong committed Dec 12, 2020
1 parent b49294b commit 4d5afce
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class Page
* Visible to give trusted classes like {@link PageBuilder} access to a constructor that doesn't
* defensively copy the blocks
*/
static Page wrapBlocksWithoutCopy(int positionCount, Block[] blocks)
public static Page wrapBlocksWithoutCopy(int positionCount, Block[] blocks)
{
return new Page(false, positionCount, blocks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.drift.client.address.SimpleAddressSelectorConfig;
import com.facebook.presto.functionNamespace.SqlInvokedFunctionNamespaceManagerConfig;
import com.facebook.presto.functionNamespace.execution.thrift.SimpleAddressThriftSqlFunctionExecutionModule;
import com.facebook.presto.functionNamespace.execution.thrift.ThriftSqlFunctionExecutionConfig;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.google.common.collect.ImmutableMap;
Expand All @@ -39,15 +40,18 @@ protected void setup(Binder binder)
SqlInvokedFunctionNamespaceManagerConfig config = buildConfigObject(SqlInvokedFunctionNamespaceManagerConfig.class);
ImmutableMap.Builder<Language, SimpleAddressSelectorConfig> thriftConfigs = ImmutableMap.builder();
ImmutableMap.Builder<Language, FunctionImplementationType> languageImplementationTypeMap = ImmutableMap.builder();
ImmutableMap.Builder<Language, ThriftSqlFunctionExecutionConfig> thriftExecutionConfigs = ImmutableMap.builder();
for (String languageName : config.getSupportedFunctionLanguages()) {
Language language = new Language(languageName);
FunctionImplementationType implementationType = buildConfigObject(SqlFunctionLanguageConfig.class, languageName).getFunctionImplementationType();
languageImplementationTypeMap.put(language, implementationType);
if (implementationType.equals(THRIFT)) {
thriftConfigs.put(language, buildConfigObject(SimpleAddressSelectorConfig.class, languageName));
thriftExecutionConfigs.put(language, buildConfigObject(ThriftSqlFunctionExecutionConfig.class, languageName));
}
}
binder.bind(new TypeLiteral<Map<Language, FunctionImplementationType>>() {}).toInstance(languageImplementationTypeMap.build());
binder.install(new SimpleAddressThriftSqlFunctionExecutionModule(thriftConfigs.build()));
binder.bind(new TypeLiteral<Map<Language, ThriftSqlFunctionExecutionConfig>>() {}).toInstance(thriftExecutionConfigs.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.facebook.presto.functionNamespace.execution.thrift;

import com.facebook.drift.client.address.SimpleAddressSelectorConfig;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.facebook.presto.thrift.api.udf.ThriftUdfService;
import com.google.inject.Binder;
Expand Down Expand Up @@ -46,6 +48,8 @@ public void configure(Binder binder)
return;
}
binder.bind(ThriftSqlFunctionExecutor.class).in(SINGLETON);
binder.bind(BlockEncodingManager.class).in(SINGLETON);
binder.bind(BlockEncodingSerde.class).to(BlockEncodingManager.class).in(SINGLETON);

driftClientBinder(binder)
.bindDriftClient(ThriftUdfService.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.thrift;

import com.facebook.airlift.configuration.Config;
import com.facebook.presto.thrift.api.udf.ThriftUdfPageFormat;

import static com.facebook.presto.thrift.api.udf.ThriftUdfPageFormat.PRESTO_THRIFT;

public class ThriftSqlFunctionExecutionConfig
{
private ThriftUdfPageFormat thriftPageFormat = PRESTO_THRIFT;

@Config("thrift-page-format")
public ThriftSqlFunctionExecutionConfig setThriftPageFormat(String pageFormat)
{
this.thriftPageFormat = ThriftUdfPageFormat.valueOf(pageFormat.toUpperCase());
return this;
}

public ThriftUdfPageFormat getThriftPageFormat()
{
return thriftPageFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,57 @@
import com.facebook.drift.client.DriftClient;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.ThriftScalarFunctionImplementation;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.thrift.api.datatypes.PrestoThriftBlock;
import com.facebook.presto.thrift.api.udf.ThriftFunctionHandle;
import com.facebook.presto.thrift.api.udf.ThriftUdfPage;
import com.facebook.presto.thrift.api.udf.ThriftUdfPageFormat;
import com.facebook.presto.thrift.api.udf.ThriftUdfResult;
import com.facebook.presto.thrift.api.udf.ThriftUdfService;
import com.facebook.presto.thrift.api.udf.ThriftUdfServiceException;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.presto.common.Page.wrapBlocksWithoutCopy;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.thrift.api.udf.ThriftUdfPage.prestoPage;
import static com.facebook.presto.thrift.api.udf.ThriftUdfPage.thriftPage;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class ThriftSqlFunctionExecutor
{
private final DriftClient<ThriftUdfService> thriftUdfClient;
private final BlockEncodingSerde blockEncodingSerde;
private final Map<Language, ThriftSqlFunctionExecutionConfig> executionConfigs;

@Inject
public ThriftSqlFunctionExecutor(DriftClient<ThriftUdfService> thriftUdfClient)
public ThriftSqlFunctionExecutor(DriftClient<ThriftUdfService> thriftUdfClient, BlockEncodingSerde blockEncodingSerde, Map<Language, ThriftSqlFunctionExecutionConfig> executionConfigs)
{
this.thriftUdfClient = requireNonNull(thriftUdfClient, "thriftUdfClient is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.executionConfigs = requireNonNull(executionConfigs, "executionConfigs is null");
}

public CompletableFuture<Block> executeFunction(ThriftScalarFunctionImplementation functionImplementation, Page input, List<Integer> channels, List<Type> argumentTypes, Type returnType)
{
ImmutableList.Builder<PrestoThriftBlock> blocks = ImmutableList.builder();
for (int i = 0; i < channels.size(); i++) {
Block block = input.getBlock(channels.get(i));
blocks.add(PrestoThriftBlock.fromBlock(block, argumentTypes.get(i)));
}
ThriftUdfPage page = buildThriftPage(functionImplementation, input, channels, argumentTypes);
SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle();
SqlFunctionId functionId = functionHandle.getFunctionId();
try {
Expand All @@ -69,11 +79,49 @@ public CompletableFuture<Block> executeFunction(ThriftScalarFunctionImplementati
.collect(toImmutableList()),
returnType.toString(),
functionHandle.getVersion()),
thriftPage(blocks.build())))
.thenApply(result -> getOnlyElement(result.getResult().getThriftBlocks()).toBlock(returnType));
page))
.thenApply(result -> getResultBlock(result, returnType));
}
catch (ThriftUdfServiceException | TException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
}

private ThriftUdfPage buildThriftPage(ThriftScalarFunctionImplementation functionImplementation, Page input, List<Integer> channels, List<Type> argumentTypes)
{
ThriftUdfPageFormat pageFormat = executionConfigs.get(functionImplementation.getLanguage()).getThriftPageFormat();
Block[] blocks = new Block[channels.size()];

for (int i = 0; i < channels.size(); i++) {
blocks[i] = input.getBlock(channels.get(i));
}

switch (pageFormat) {
case PRESTO_THRIFT:
ImmutableList.Builder<PrestoThriftBlock> thriftBlocks = ImmutableList.builder();
for (int i = 0; i < blocks.length; i++) {
thriftBlocks.add(PrestoThriftBlock.fromBlock(blocks[i], argumentTypes.get(i)));
}
return thriftPage(thriftBlocks.build());
case PRESTO_SERIALIZED:
PagesSerde pagesSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty());
return prestoPage(pagesSerde.serialize(wrapBlocksWithoutCopy(input.getPositionCount(), blocks)));
default:
throw new IllegalArgumentException(format("Unknown page format: %s", pageFormat));
}
}

private Block getResultBlock(ThriftUdfResult result, Type returnType)
{
ThriftUdfPage page = result.getResult();
switch (page.getPageFormat()) {
case PRESTO_THRIFT:
return getOnlyElement(page.getThriftBlocks()).toBlock(returnType);
case PRESTO_SERIALIZED:
PagesSerde pagesSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty());
return pagesSerde.deserialize(page.getPrestoPage().toSerializedPage()).getBlock(0);
default:
throw new IllegalArgumentException(format("Unknown page format: %s", page.getPageFormat()));
}
}
}
6 changes: 6 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-thrift-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Loading

0 comments on commit 4d5afce

Please sign in to comment.