Skip to content

Commit

Permalink
keyArgument type sets the key serializer (#150)
Browse files Browse the repository at this point in the history
Closes #120
  • Loading branch information
raminqaf authored Jan 13, 2023
1 parent 67455ef commit 6399340
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 69 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,35 @@
package com.bakdata.quick.common.api.client.mirror;

import com.bakdata.quick.common.api.client.HttpClient;
import com.bakdata.quick.common.resolver.TypeResolver;
import com.bakdata.quick.common.type.QuickTopicData;
import com.bakdata.quick.common.util.Lazy;
import org.apache.kafka.common.serialization.Serde;

/**
* Factory for creating {@link MirrorClient}.
*/
public interface MirrorClientFactory {
<K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client,
final String topic,
final Lazy<QuickTopicData<K, V>> quickTopicData);
/**
* Creates a non-partition aware {@link DefaultMirrorClient}.
*
* @param client An HTTP client
* @param topic The topic name
* @param quickTopicData The quick topic data
* @param <K> Type of the key
* @param <V> Type of the value
* @return A {@link MirrorClient}
*/
default <K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client, final String topic,
final Lazy<QuickTopicData<K, V>> quickTopicData) {
final MirrorHost mirrorHost = MirrorHost.createWithPrefix(topic);
final MirrorRequestManager requestManager = new DefaultMirrorRequestManager(client);
final TypeResolver<V> valueTypeResolver = quickTopicData.get().getValueData().getResolver();
final MirrorValueParser<V> mirrorValueParser =
new MirrorValueParser<>(valueTypeResolver, client.objectMapper());
return new DefaultMirrorClient<>(mirrorHost, mirrorValueParser, requestManager);
}

<K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client, final String topic, final Serde<K> keySerde,
final TypeResolver<V> typeResolver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
public class PartitionedMirrorClientFactory implements MirrorClientFactory {
@Override
public <K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client,
final String topic,
final Lazy<QuickTopicData<K, V>> quickTopicData) {
final String topic, final Lazy<QuickTopicData<K, V>> quickTopicData) {
final MirrorHost mirrorHost = MirrorHost.createWithPrefix(topic);
final MirrorRequestManager requestManager = new MirrorRequestManagerWithFallback(client, mirrorHost);
final StreamsStateHost streamsStateHost = StreamsStateHost.createFromMirrorHost(mirrorHost);
Expand All @@ -43,4 +42,16 @@ public <K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client,
final TypeResolver<V> valueTypeResolver = quickTopicData.get().getValueData().getResolver();
return new PartitionedMirrorClient<>(client, valueTypeResolver, requestManager, partitionRouter);
}

@Override
public <K, V> MirrorClient<K, V> createMirrorClient(final HttpClient client, final String topic,
final Serde<K> keySerde, final TypeResolver<V> valueTypeResolver) {
final MirrorHost mirrorHost = MirrorHost.createWithPrefix(topic);
final MirrorRequestManager requestManager = new MirrorRequestManagerWithFallback(client, mirrorHost);
final StreamsStateHost streamsStateHost = StreamsStateHost.createFromMirrorHost(mirrorHost);
final Router<K> partitionRouter =
new PartitionRouter<>(client, streamsStateHost, keySerde, new DefaultPartitionFinder(), requestManager,
topic);
return new PartitionedMirrorClient<>(client, valueTypeResolver, requestManager, partitionRouter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ private <K> Single<TypeResolverWithSchema<K>> createResolver(final QuickTopicTyp
// get schema and configure the resolver with it
return this.registryFetcher.getSchema(subject)
.doOnError(e -> log.error("No schema found for subject {}", subject, e))
.map(schema -> new TypeResolverWithSchema<>(this.conversionProvider.getTypeResolver(type, schema),
schema));
.map(schema -> new TypeResolverWithSchema<>(this.conversionProvider.getTypeResolver(type, schema), schema));
}
}
14 changes: 0 additions & 14 deletions e2e/functional/key-range/result-range.json

This file was deleted.

54 changes: 54 additions & 0 deletions e2e/functional/range-key/purchases.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
[
{
"key": "abc",
"value": {
"productId": 123,
"userId": 1,
"amount": 1,
"price": {
"total": 19.99,
"currency": "DOLLAR"
},
"timestamp": 1
}
},
{
"key": "def",
"value": {
"productId": 123,
"userId": 2,
"amount": 2,
"price": {
"total": 30.00,
"currency": "DOLLAR"
},
"timestamp": 2
}
},
{
"key": "ghi",
"value": {
"productId": 456,
"userId": 2,
"amount": 1,
"price": {
"total": 79.99,
"currency": "DOLLAR"
},
"timestamp": 3
}
},
{
"key": "jkl",
"value": {
"productId": 789,
"userId": 2,
"amount": 1,
"price": {
"total": 99.99,
"currency": "DOLLAR"
},
"timestamp": 4
}
}
]
File renamed without changes.
14 changes: 14 additions & 0 deletions e2e/functional/range-key/result-range.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"productId": 123,
"price": {
"total": 30.0
}
},
{
"productId": 456,
"price": {
"total": 79.99
}
}
]
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

import com.bakdata.quick.common.graphql.GraphQLUtils;
import com.bakdata.quick.gateway.DataFetcherSpecification;
import com.bakdata.quick.gateway.directives.QuickDirectiveException;
import com.bakdata.quick.gateway.directives.topic.TopicDirectiveContext;
import graphql.language.TypeName;
import graphql.schema.DataFetcher;
import graphql.schema.FieldCoordinates;
import graphql.schema.GraphQLArgument;
import graphql.schema.GraphQLFieldDefinition;
import graphql.schema.GraphQLSchemaElement;
import graphql.schema.GraphQLTypeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Rule for range query fetcher.
Expand Down Expand Up @@ -60,12 +65,16 @@ public List<DataFetcherSpecification> extractDataFetchers(final TopicDirectiveCo
Objects.requireNonNull(context.getTopicDirective().getKeyArgument());
Objects.requireNonNull(context.getTopicDirective().getRangeFrom());
Objects.requireNonNull(context.getTopicDirective().getRangeTo());

final TypeName typeName = this.extractKeyArgumentType(context, context.getTopicDirective().getKeyArgument());

final DataFetcher<?> dataFetcher = context.getFetcherFactory().rangeFetcher(
context.getTopicDirective().getTopicName(),
context.getTopicDirective().getKeyArgument(),
context.getTopicDirective().getRangeFrom(),
context.getTopicDirective().getRangeTo(),
context.isNullable()
context.isNullable(),
typeName
);
final FieldCoordinates coordinates = this.currentCoordinates(context);
return List.of(DataFetcherSpecification.of(coordinates, dataFetcher));
Expand All @@ -80,4 +89,23 @@ public boolean isValid(final TopicDirectiveContext context) {
&& context.getParentContainerName().equals(GraphQLUtils.QUERY_TYPE)
&& GraphQLTypeUtil.isList(context.getEnvironment().getElement().getType());
}

private TypeName extractKeyArgumentType(final TopicDirectiveContext context, final String keyArgument) {
final GraphQLSchemaElement graphQLSchemaElement = context.getEnvironment()
.getElementParentTree()
.getElement();

final List<GraphQLArgument> arguments = ((GraphQLFieldDefinition) graphQLSchemaElement).getArguments();
final Optional<GraphQLArgument> graphQLKeyArgument = arguments.stream()
.filter(inputValueDefinition -> inputValueDefinition.getName().equals(keyArgument))
.findFirst();

if (graphQLKeyArgument.isEmpty()) {
final String errorMessage = String.format(
"Could not find the keyArgument %s in the parent type definition. Please check your schema.",
keyArgument);
throw new QuickDirectiveException(errorMessage);
}
return this.extractTypeName(graphQLKeyArgument.get().getDefinition().getType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import com.bakdata.quick.common.type.QuickTopicData;
import com.bakdata.quick.common.util.Lazy;
import org.apache.kafka.common.serialization.Serde;

/**
* Supplier for creating a new data fetcher client for a topic.
*/
@FunctionalInterface
public interface ClientSupplier {
<K, V> DataFetcherClient<K, V> createClient(final String topic, final Lazy<QuickTopicData<K, V>> quickTopicData);

<K, V> DataFetcherClient<K, V> createClient(final String topic, final Serde<K> keySerde,
final Lazy<QuickTopicData<Object, V>> quickTopicData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.bakdata.quick.common.api.client.mirror.MirrorClientFactory;
import com.bakdata.quick.common.type.QuickTopicData;
import com.bakdata.quick.common.util.Lazy;
import org.apache.kafka.common.serialization.Serde;

final class DefaultClientSupplier implements ClientSupplier {
private final HttpClient client;
Expand All @@ -37,4 +38,12 @@ public <K, V> DataFetcherClient<K, V> createClient(final String topic,
return new MirrorDataFetcherClient<>(new Lazy<>(() ->
this.mirrorClientFactory.createMirrorClient(this.client, topic, quickTopicData)));
}

@Override
public <K, V> DataFetcherClient<K, V> createClient(final String topic, final Serde<K> keySerde,
final Lazy<QuickTopicData<Object, V>> quickTopicData) {
return new MirrorDataFetcherClient<>(new Lazy<>(() ->
this.mirrorClientFactory.createMirrorClient(this.client, topic, keySerde,
quickTopicData.get().getValueData().getResolver())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.bakdata.quick.common.api.client.HttpClient;
import com.bakdata.quick.common.api.client.mirror.PartitionedMirrorClientFactory;
import com.bakdata.quick.common.config.KafkaConfig;
import com.bakdata.quick.common.type.ConversionProvider;
import com.bakdata.quick.common.type.QuickTopicData;
import com.bakdata.quick.common.type.QuickTopicType;
import com.bakdata.quick.common.type.TopicTypeService;
import com.bakdata.quick.common.util.Lazy;
import com.bakdata.quick.gateway.fetcher.subscription.KafkaSubscriptionProvider;
Expand All @@ -29,14 +31,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.Nullable;
import graphql.Scalars;
import graphql.execution.DataFetcherResult;
import graphql.language.NamedNode;
import graphql.language.TypeName;
import graphql.scalars.ExtendedScalars;
import graphql.schema.DataFetcher;
import io.reactivex.Single;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.reactivestreams.Publisher;

/**
Expand All @@ -49,19 +57,27 @@ public class FetcherFactory {
private final ObjectMapper objectMapper;
private final ClientSupplier clientSupplier;
private final TopicTypeService topicTypeService;
private final ConversionProvider conversionProvider;

private static final Map<String, QuickTopicType> typeMap = Map.of(
Scalars.GraphQLInt.getName(), QuickTopicType.INTEGER,
ExtendedScalars.GraphQLLong.getName(), QuickTopicType.LONG,
Scalars.GraphQLString.getName(), QuickTopicType.STRING,
Scalars.GraphQLID.getName(), QuickTopicType.STRING,
Scalars.GraphQLFloat.getName(), QuickTopicType.DOUBLE);

/**
* Visible for testing.
*/
@VisibleForTesting
public FetcherFactory(final KafkaConfig kafkaConfig, final ObjectMapper objectMapper,
final TopicTypeService topicTypeService,
final ClientSupplier clientSupplier) {
final ClientSupplier clientSupplier, final ConversionProvider conversionProvider) {
this.kafkaConfig = kafkaConfig;
this.objectMapper = objectMapper;
this.topicTypeService = topicTypeService;
this.clientSupplier = clientSupplier;
this.conversionProvider = conversionProvider;
}

/**
Expand All @@ -71,9 +87,9 @@ public FetcherFactory(final KafkaConfig kafkaConfig, final ObjectMapper objectMa
*/
@Inject
public FetcherFactory(final KafkaConfig kafkaConfig, final HttpClient client,
final TopicTypeService topicTypeService) {
final TopicTypeService topicTypeService, final ConversionProvider conversionProvider) {
this(kafkaConfig, client.objectMapper(), topicTypeService,
new DefaultClientSupplier(client, new PartitionedMirrorClientFactory()));
new DefaultClientSupplier(client, new PartitionedMirrorClientFactory()), conversionProvider);
}

/**
Expand Down Expand Up @@ -107,8 +123,12 @@ public <K, V> DataFetcher<List<V>> listArgumentFetcher(final String topic, final
* Creates a {@link RangeQueryFetcher}.
*/
public <K, V> DataFetcher<List<V>> rangeFetcher(final String topic, final String argument, final String rangeFrom,
final String rangeTo, final boolean isNullable) {
final DataFetcherClient<K, V> client = this.clientSupplier.createClient(topic, this.getTopicData(topic));
final String rangeTo, final boolean isNullable, final NamedNode<TypeName> type) {
final QuickTopicType quickTopicType = Objects.requireNonNull(typeMap.get(type.getName()));

final Serde<K> keySerde = this.conversionProvider.getSerde(quickTopicType, true);
final Lazy<QuickTopicData<Object, V>> topicData = this.getTopicData(topic);
final DataFetcherClient<K, V> client = this.clientSupplier.createClient(topic, keySerde, topicData);
return new RangeQueryFetcher<>(argument, client, rangeFrom, rangeTo, isNullable);
}

Expand Down
Loading

0 comments on commit 6399340

Please sign in to comment.