Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow submission of kafka jobs #94

Merged
merged 1 commit into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions core/src/main/java/feast/core/validators/SpecValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package feast.core.validators;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import feast.core.dao.EntityInfoRepository;
Expand All @@ -35,36 +39,28 @@
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static feast.core.validators.Matchers.checkLowerSnakeCase;
import org.springframework.beans.factory.annotation.Autowired;

public class SpecValidator {

private StorageInfoRepository storageInfoRepository;
private EntityInfoRepository entityInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private FeatureInfoRepository featureInfoRepository;
private static final String FILE_ERROR_STORE_TYPE = "file.json";

private static final String NO_STORE = "";

private static String[] SUPPORTED_WAREHOUSE_STORES =
new String[] {
new String[]{
BigQueryStorageManager.TYPE, FILE_ERROR_STORE_TYPE,
};

private static String[] SUPPORTED_SERVING_STORES =
new String[] {
new String[]{
BigTableStorageManager.TYPE, PostgresStorageManager.TYPE, RedisStorageManager.TYPE,
};
private StorageInfoRepository storageInfoRepository;
private EntityInfoRepository entityInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private FeatureInfoRepository featureInfoRepository;

@Autowired
public SpecValidator(
Expand Down Expand Up @@ -130,7 +126,8 @@ public void validateFeatureSpec(FeatureSpec spec) throws IllegalArgumentExceptio
servingStoreId =
servingStoreId.equals(NO_STORE) ? group.getServingStore().getId() : servingStoreId;
warehouseStoreId =
warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId() : warehouseStoreId;
warehouseStoreId.equals(NO_STORE) ? group.getWarehouseStore().getId()
: warehouseStoreId;
}
Optional<StorageInfo> servingStore = storageInfoRepository.findById(servingStoreId);
Optional<StorageInfo> warehouseStore = storageInfoRepository.findById(warehouseStoreId);
Expand Down Expand Up @@ -221,6 +218,9 @@ public void validateStorageSpec(StorageSpec spec) throws IllegalArgumentExceptio
public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException {
try {
switch (spec.getType()) {
case "kafka":
checkKafkaImportSpecOption(spec);
break;
case "pubsub":
checkPubSubImportSpecOption(spec);
break;
Expand Down Expand Up @@ -261,6 +261,20 @@ public void validateImportSpec(ImportSpec spec) throws IllegalArgumentException
}
}

private void checkKafkaImportSpecOption(ImportSpec spec) {
try {
String topics = spec.getOptionsOrDefault("topics", "");
String server = spec.getOptionsOrDefault("server", "");
if (topics.equals("") && server.equals("")) {
throw new IllegalArgumentException(
"Kafka ingestion requires either topics or servers");
}
} catch (NullPointerException | IllegalArgumentException e) {
throw new IllegalArgumentException(
Strings.lenientFormat("Invalid options: %s", e.getMessage()));
}
}

private void checkFileImportSpecOption(ImportSpec spec) throws IllegalArgumentException {
try {
checkArgument(
Expand Down
75 changes: 66 additions & 9 deletions core/src/test/java/feast/core/validators/SpecValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
import org.mockito.Mockito;

public class SpecValidatorTest {

@Rule
public final ExpectedException exception = ExpectedException.none();
private FeatureInfoRepository featureInfoRepository;
private FeatureGroupInfoRepository featureGroupInfoRepository;
private EntityInfoRepository entityInfoRepository;
private StorageInfoRepository storageInfoRepository;

@Rule public final ExpectedException exception = ExpectedException.none();

@Before
public void setUp() {
featureInfoRepository = Mockito.mock(FeatureInfoRepository.class);
Expand Down Expand Up @@ -369,7 +370,7 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx
StorageInfo redis1 = new StorageInfo();
redis1.setId(servingStoreId);
redis1.setType("redis");
when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1));
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1));

SpecValidator validator =
new SpecValidator(
Expand All @@ -392,7 +393,8 @@ public void featureSpecWithoutExistingWarehouseStoreShouldThrowIllegalArgumentEx
.setDataStores(dataStores)
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(String.format("Warehouse store with id %s does not exist", warehouseStoreId));
exception.expectMessage(
String.format("Warehouse store with id %s does not exist", warehouseStoreId));
validator.validateFeatureSpec(input);
}

Expand All @@ -405,7 +407,7 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() {
StorageInfo redis1 = new StorageInfo();
redis1.setId(servingStoreId);
redis1.setType("redis");
when(storageInfoRepository.findById( servingStoreId)).thenReturn(Optional.of(redis1));
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(redis1));

SpecValidator validator =
new SpecValidator(
Expand All @@ -432,18 +434,21 @@ public void featureSpecWithoutWarehouseStoreShouldBeAllowed() {
@Test
public void featureSpecWithUnsupportedWarehouseStoreShouldThrowIllegalArgumentException() {
String servingStoreId = "REDIS1";
StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis").build();
StorageSpec servingStoreSpec = StorageSpec.newBuilder().setId(servingStoreId).setType("redis")
.build();
StorageInfo servingStoreInfo = new StorageInfo(servingStoreSpec);

String warehouseStoreId = "REDIS2";
StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId).setType("redis").build();
StorageSpec warehouseStoreSpec = StorageSpec.newBuilder().setId(warehouseStoreId)
.setType("redis").build();
StorageInfo warehouseStoreInfo = new StorageInfo(warehouseStoreSpec);

when(entityInfoRepository.existsById("entity")).thenReturn(true);
when(storageInfoRepository.existsById(servingStoreId)).thenReturn(true);
when(storageInfoRepository.existsById(warehouseStoreId)).thenReturn(true);
when(storageInfoRepository.findById(servingStoreId)).thenReturn(Optional.of(servingStoreInfo));
when(storageInfoRepository.findById(warehouseStoreId)).thenReturn(Optional.of(warehouseStoreInfo));
when(storageInfoRepository.findById(warehouseStoreId))
.thenReturn(Optional.of(warehouseStoreInfo));
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
Expand Down Expand Up @@ -488,7 +493,8 @@ public void featureSpecWithUnsupportedServingStoreShouldThrowIllegalArgumentExce
when(entityInfoRepository.existsById("entity")).thenReturn(true);
when(storageInfoRepository.existsById(servingStoreName)).thenReturn(true);
when(storageInfoRepository.existsById(warehouseStorageName)).thenReturn(true);
when(storageInfoRepository.findById(servingStoreName)).thenReturn(Optional.of(redis1StorageInfo));
when(storageInfoRepository.findById(servingStoreName))
.thenReturn(Optional.of(redis1StorageInfo));
when(storageInfoRepository.findById(warehouseStorageName)).thenReturn(Optional.of(bqInfo));
SpecValidator validator =
new SpecValidator(
Expand Down Expand Up @@ -779,4 +785,55 @@ public void importSpecWithUnregisteredFeaturesShouldThrowIllegalArgumentExceptio
"Validation for import spec failed: Feature some_nonexistent_feature not registered");
validator.validateImportSpec(input);
}

@Test
public void importSpecWithKafkaSourceAndCorrectOptionsShouldPassValidation() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.putOptions("topics", "my-kafka-topic")
.putOptions("server", "localhost:54321")
.setSchema(schema)
.addEntities("someEntity")
.build();
validator.validateImportSpec(input);
}

@Test
public void importSpecWithKafkaSourceWithoutOptionsShouldThrowIllegalArgumentException() {
SpecValidator validator =
new SpecValidator(
storageInfoRepository,
entityInfoRepository,
featureGroupInfoRepository,
featureInfoRepository);
when(featureInfoRepository.existsById("some_existing_feature")).thenReturn(true);
when(entityInfoRepository.existsById("someEntity")).thenReturn(true);
Schema schema =
Schema.newBuilder()
.addFields(Field.newBuilder().setFeatureId("some_existing_feature").build())
.build();
ImportSpec input =
ImportSpec.newBuilder()
.setType("kafka")
.setSchema(schema)
.addEntities("someEntity")
.build();
exception.expect(IllegalArgumentException.class);
exception.expectMessage(
"Validation for import spec failed: Invalid options: Kafka ingestion requires either topics or servers");
validator.validateImportSpec(input);
}
}