Skip to content

Commit

Permalink
Rebase with master
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed Jan 8, 2019
2 parents def65b0 + 9c0ef21 commit 578003f
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 89 deletions.
21 changes: 21 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Contributing Guide

## Code reviews

All submissions, including submissions by project members, require review. We use GitHub pull
requests for this purpose. Consult GitHub Help for more information on using pull requests.

## Code conventions

### Java

We conform to the [java google style guide](https://google.github.io/styleguide/javaguide.html)

If using intellij please import the code styles:
https://github.com/google/styleguide/blob/gh-pages/intellij-java-google-style.xml

### Go

Make sure you apply `go fmt`.

### JavaScript
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ VERSION_FILE=VERSION
FEAST_VERSION=`cat $(VERSION_FILE)`

test:
echo testing not implemented
mvn test

build-deps:
$(MAKE) -C protos gen-go
Expand Down
13 changes: 7 additions & 6 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<org.apache.beam.version>2.5.0</org.apache.beam.version>
<org.apache.beam.version>2.9.0</org.apache.beam.version>
<com.google.cloud.version>1.35.0</com.google.cloud.version>
<grpcVersion>1.2.0</grpcVersion>
<com.google.httpclient.version>1.27.0</com.google.httpclient.version>
<grpcVersion>1.13.1</grpcVersion>
<guice.version>4.1.0</guice.version>
<spring.kafka.version>2.2.2.RELEASE</spring.kafka.version>
</properties>
Expand Down Expand Up @@ -173,19 +174,19 @@
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.23.0</version>
<version>${com.google.httpclient.version}</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-gson</artifactId>
<version>1.23.0</version>
<version>${com.google.httpclient.version}</version>
</dependency>


<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.4.0</version>
<version>${protobufVersion}</version>
</dependency>


Expand All @@ -204,7 +205,7 @@
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-beam</artifactId>
<version>1.6.0</version>
<version>1.8.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,29 +47,19 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;

@AllArgsConstructor
@Slf4j
public class SplitOutputByStore extends PTransform<PFeatureRows, PFeatureRows> {

private Collection<? extends FeatureStore> stores;
private SerializableFunction<FeatureSpec, String> selector;
private Specs specs;

@Override
public PFeatureRows expand(PFeatureRows input) {
Map<String, Write> transforms = getFeatureStoreTransforms();
transforms.put("", new NoOpIO.Write());
Set<String> keys = transforms.keySet();
Preconditions.checkArgument(transforms.size() > 0, "no write transforms found");

Expand Down Expand Up @@ -102,6 +104,7 @@ private Map<String, Write> getFeatureStoreTransforms() {

@AllArgsConstructor
public static class WriteTags extends PTransform<PCollectionTuple, PFeatureRows> {

private Map<TupleTag<FeatureRowExtended>, Write> transforms;
private TupleTag<FeatureRowExtended> mainTag;

Expand All @@ -120,9 +123,7 @@ public PFeatureRows expand(PCollectionTuple input) {
}

String message =
"FeatureRow with output tag.no matching storage, these feature's "
+ "specs may be specifying a store which was unknown when "
+ "ingestion started as they somehow passed validation. ";
"FeatureRows have no matching write transform, these rows should not have passed validation.";
PCollection<FeatureRowExtended> errors =
input.get(mainTag).apply(ParDo.of(new WithErrors(getName(), message)));

Expand All @@ -131,8 +132,11 @@ public PFeatureRows expand(PCollectionTuple input) {
}
}

/** Sets the last attempt error for all rows with a given exception */
/**
* Sets the last attempt error for all rows with a given exception
*/
public static class WithErrors extends DoFn<FeatureRowExtended, FeatureRowExtended> {

private Error error;

public WithErrors(Error error) {
Expand Down
8 changes: 5 additions & 3 deletions ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@
@Slf4j
public class ImportJobCSVTest {

@Rule public TemporaryFolder folder = new TemporaryFolder();
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Rule public TestPipeline testPipeline = TestPipeline.create();
@Rule
public TestPipeline testPipeline = TestPipeline.create();

public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) {
return importSpec.toBuilder().putOptions("path", dataFile).build();
Expand Down Expand Up @@ -125,7 +127,7 @@ public void testImportCSV() throws IOException {

PCollection<FeatureRowExtended> writtenToWarehouse =
PCollectionList.of(
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs())
.apply("flatten warehouse input", Flatten.pCollections());

PCollection<FeatureRowExtended> writtenToErrors =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
/*
* Copyright 2018 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package feast.ingestion.deserializer;

import com.google.protobuf.MessageLite;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -13,9 +29,11 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
Expand All @@ -28,31 +46,29 @@
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;

@RunWith(SpringRunner.class)
@EmbeddedKafka(controlledShutdown = true)
@SpringBootTest
@DirtiesContext
public class KafkaFeatureRowDeserializerTest {

@Autowired private EmbeddedKafkaBroker embeddedKafka;
private static final String topic = "TEST_TOPIC";

@ClassRule public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, topic);
@Autowired private KafkaTemplate<byte[], byte[]> template;

private <MessageType extends MessageLite> void deserialize(MessageType input) {
// generate a random UUID to create a unique topic and consumer group id for each test
String uuid = UUID.randomUUID().toString();
String topic = "topic-" + uuid;

embeddedKafka.addTopics(topic);

Deserializer deserializer = new FeatureRowDeserializer();

Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(uuid, Boolean.FALSE.toString(), embeddedKafka);
KafkaTestUtils.consumerProps("testGroup", "false", embeddedKafka.getEmbeddedKafka());
ConsumerFactory<FeatureRow, FeatureRow> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerProps, deserializer, deserializer);

Expand All @@ -63,7 +79,8 @@ private <MessageType extends MessageLite> void deserialize(MessageType input) {
MessageListenerContainer container =
new KafkaMessageListenerContainer<>(consumerFactory, containerProps);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
ContainerTestUtils.waitForAssignment(
container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());

byte[] data = input.toByteArray();
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, data, data);
Expand Down Expand Up @@ -99,12 +116,10 @@ public void deserializeFeatureRowProto() {

@Configuration
static class ContextConfiguration {

@Autowired private EmbeddedKafkaBroker embeddedKafka;

@Bean
ProducerFactory<byte[], byte[]> producerFactory() {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());

return new DefaultKafkaProducerFactory<>(
producerProps, new ByteArraySerializer(), new ByteArraySerializer());
Expand Down
Loading

0 comments on commit 578003f

Please sign in to comment.