Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 227 add type in case it's not provided #228

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

import java.util.UUID;

public class NameUtil {
private static final String DEFAULT_TYPE = "DEFAULT_NAMESPACE";
/**
* Extracts the name from the fully qualified type name. Name represents the last token after ".".
* If the qualified name does not contain "." then the name is same as qualified name.
Expand Down Expand Up @@ -59,4 +62,19 @@ public static String qualifiedName(String qualifier, String name) {
Preconditions.checkNotNull(name, "Name cannot be null");
return Strings.isNullOrEmpty(qualifier) ? name : String.format("%s.%s", qualifier, name);
}

/**
* Type value if the 'type' is not null of empty.
* If type is null or empty and the namespace is null or empty created name is 'default_namespace.randomUUID'.
* If type is null or empty and namespace is not null or empty then created name is 'namespace.randomUUID'.
*
* @param type the value provided with API call (schemaInfo.getType()).
* @param namespace the namespace for the schema
* @return Provided name or Created name for type in SchemaInfo
*/
public static String createTypeIfAbsent(String type, String namespace) {
String typeName = Strings.isNullOrEmpty(namespace) ? DEFAULT_TYPE : namespace;
String uuid = UUID.randomUUID().toString();
return Strings.isNullOrEmpty(type) ? String.format("%s.%s", typeName, uuid) : type;
}
}
13 changes: 13 additions & 0 deletions contract/src/main/swagger/SchemaRegistry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -131,6 +132,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -158,6 +160,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -201,6 +204,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: type
type: string
Expand Down Expand Up @@ -261,6 +265,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -293,6 +298,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -329,6 +335,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -376,6 +383,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -433,6 +441,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -476,6 +485,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -512,6 +522,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -559,6 +570,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down Expand Up @@ -592,6 +604,7 @@ paths:
description: Group name
required: true
type: string
minLength: 3
- in: query
name: namespace
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public CompletableFuture<VersionInfo> addSchema(String namespace, String group,
Preconditions.checkArgument(group != null);
shshashwat marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(schemaInfo != null);
log.debug("addSchema called for group {} {}. schema {}", namespace, group, schemaInfo.getType());
SchemaInfo schema = normalizeSchemaBinary(schemaInfo);
SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace);
// 1. get group policy
// 2. get checker for serialization format.
// validate schema against group compatibility policy on schema
Expand Down Expand Up @@ -497,7 +497,7 @@ public CompletableFuture<VersionInfo> getSchemaVersion(String namespace, String
Preconditions.checkArgument(group != null);
Preconditions.checkArgument(schemaInfo != null);
log.debug("Group {} {}, getSchemaVersion for {}.", namespace, group, schemaInfo.getType());
SchemaInfo schema = normalizeSchemaBinary(schemaInfo);
SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace);

return store.getSchemaVersion(namespace, group, schema, getFingerprint(schema))
.whenComplete((r, e) -> {
Expand Down Expand Up @@ -526,7 +526,7 @@ public CompletableFuture<Boolean> validateSchema(String namespace, String group,
Preconditions.checkArgument(group != null);
Preconditions.checkArgument(schemaInfo != null);
log.debug("Group {} {}, validateSchema for {}.", namespace, group, schemaInfo.getType());
SchemaInfo schema = normalizeSchemaBinary(schemaInfo);
SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace);

return store.getGroupProperties(namespace, group)
.thenCompose(prop -> {
Expand Down Expand Up @@ -563,7 +563,7 @@ public CompletableFuture<Boolean> canRead(String namespace, String group, Schema
Preconditions.checkArgument(schemaInfo != null);
log.debug("Group {} {}, canRead for {}.", namespace, group, schemaInfo.getType());

SchemaInfo schema = normalizeSchemaBinary(schemaInfo);
SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace);
return store.getGroupProperties(namespace, group)
.thenCompose(prop -> getSchemasForValidation(namespace, group, schema, prop)
.thenApply(schemasWithVersion -> canReadChecker(schema, prop, schemasWithVersion)))
Expand Down Expand Up @@ -812,7 +812,7 @@ private boolean checkCompatibility(SchemaInfo schema, GroupProperties groupPrope
}
}

private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) {
private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo, String namespace) {
// validates and the schema binary.
ByteBuffer schemaBinary = schemaInfo.getSchemaData();
boolean isValid = true;
Expand Down Expand Up @@ -896,11 +896,14 @@ private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) {
// treated to be equal.
JsonNode jsonNode = OBJECT_MAPPER.readTree(schemaString);
Object obj = OBJECT_MAPPER.treeToValue(jsonNode, Object.class);

shrids marked this conversation as resolved.
Show resolved Hide resolved
type = NameUtil.createTypeIfAbsent(type, namespace);
schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(obj).getBytes(Charsets.UTF_8));
break;
case Any:
break;
case Custom:
type = NameUtil.createTypeIfAbsent(type, namespace);
break;
default:
break;
Expand Down Expand Up @@ -1032,7 +1035,7 @@ private Boolean canReadChecker(SchemaInfo schema, GroupProperties prop, List<Sch
* @return Map of group id to version that identifies the schema in the group.
*/
public CompletableFuture<Map<String, VersionInfo>> getSchemaReferences(String namespace, SchemaInfo schemaInfo) {
SchemaInfo schema = normalizeSchemaBinary(schemaInfo);
SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace);

return store.getGroupsUsing(namespace, schema)
.thenCompose(groups -> Futures.allOfWithResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public CompletableFuture<Version> updateEntry(String tableName, byte[] key, byte
}

public CompletableFuture<List<Version>> updateEntries(String tableName, Map<byte[], VersionedRecord<byte[]>> batch) {
log.debug("Update entries called for table {}", tableName);
Preconditions.checkNotNull(batch);
List<TableSegmentEntry> entries = batch.entrySet().stream().map(x -> {
return x.getValue().getVersion() == null ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ CompletableFuture<Void> deleteTableSegment(final String tableName,
CompletableFuture<List<TableSegmentKeyVersion>> updateTableEntries(final String tableName,
final List<TableSegmentEntry> entries,
String delegationToken) {
log.trace("Update table entries wirecommand is called for tableName {}", tableName);
return getTableUri(tableName).thenCompose(uri -> {
final WireCommandType type = WireCommandType.UPDATE_TABLE_ENTRIES;
List<Map.Entry<WireCommands.TableKey, WireCommands.TableValue>> wireCommandEntries = entries.stream().map(te -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import io.pravega.schemaregistry.storage.impl.group.Group;
import io.pravega.schemaregistry.storage.impl.groups.Groups;
import io.pravega.schemaregistry.storage.impl.schemas.Schemas;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class SchemaStoreImpl<T> implements SchemaStore {
private final Groups<T> groups;
private final Schemas<T> schemas;
Expand Down Expand Up @@ -133,7 +135,8 @@ public CompletableFuture<SchemaWithVersion> getLatestSchemaVersion(String namesp
@Override
public CompletableFuture<VersionInfo> addSchema(String namespace, String groupId, SchemaInfo schemaInfo, SchemaInfo normalized,
BigInteger fingerprint, GroupProperties prop, Etag etag) {
// Store normalized form of schema with the global schemas while the original form is stored within the group.
// Store normalized form of schema with the global schemas while the original form is stored within the group.
log.info("Add schema called to add a new schema in namespace {} and groupId {}", namespace, groupId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here namespace passed by user may be an empty string, but we're adding the schema to default namespace in this case. So the log message should indicate that. Infact this is the best place to set the namespace string to its default value if the user has provided an empty String.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not allowing empty String as value for any parameter can be enforced by specifying the minLength parameter in swagger .yaml file.
See: https://swagger.io/docs/specification/data-models/data-types/

return schemas.addSchema(normalized, namespace, groupId)
.thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, fingerprint, prop, etag)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.pravega.schemaregistry.storage.client.VersionedRecord;
import io.pravega.schemaregistry.common.ChunkUtil;
import io.pravega.schemaregistry.storage.impl.group.records.NamespaceAndGroup;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -49,6 +50,7 @@
import static io.pravega.schemaregistry.storage.impl.schemas.SchemaRecords.KeySerializer;
import static io.pravega.schemaregistry.storage.impl.schemas.SchemaRecords.fromBytes;

@Slf4j
public class PravegaKeyValueSchemas implements Schemas<Version> {
private static final String SCHEMAS = TableStore.SCHEMA_REGISTRY_SCOPE + "/schemas/0";
private static final KeySerializer KEY_SERIALIZER = new KeySerializer();
Expand Down Expand Up @@ -132,6 +134,7 @@ private CompletionStage<String> addNewSchemaRecord(SchemaInfo schemaInfo, Schema
entries.put(KEY_SERIALIZER.toBytes(new SchemaIdChunkKey(id, i)),
new VersionedRecord<>(bytes, null));
}
log.trace("Call for update entries for new schema addition with schema {}", schemaInfo);
return tableStore.updateEntries(SCHEMAS, entries)
.thenApply(v -> id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,68 @@ public void testDeleteUsingTypeAndVersion() {
() -> service.deleteSchema(null, groupName, SerializationFormat.Avro.getFullTypeName(), schemaName, version).join(),
e -> e instanceof RuntimeException);
}


@Test
public void testAddCustomSchema() {
shshashwat marked this conversation as resolved.
Show resolved Hide resolved
SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor);
SchemaRegistryService service = new SchemaRegistryService(schemaStore, executor);
String namespace = "namespace";
String group = "group";
service.createGroup(namespace, group,
GroupProperties.builder().allowMultipleTypes(true).properties(ImmutableMap.<String, String>builder().build())
.serializationFormat(SerializationFormat.Any)
.compatibility(Compatibility.allowAny()).build()).join();
String jsonSchemaString = "{" +
"\"title\": \"Person\", " +
"\"type\": \"object\", " +
"\"properties\": { " +
"\"name\": {" +
"\"type\": \"string\"" +
"}," +
"\"age\": {" +
"\"type\": \"integer\", \"minimum\": 0" +
"}" +
"}" +
"}";
String jsonSchemaString2 = "{" +
"\"title\": \"Person\", " +
"\"type\": \"object\", " +
"\"properties\": { " +
"\"age\": {" +
"\"type\": \"integer\", \"minimum\": 0" +
"}," +
"\"name\": {" +
"\"type\": \"string\"" +
"}" +
"}" +
"}";
SchemaInfo original = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.Json)
.schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8)))
.properties(ImmutableMap.of()).build();
VersionInfo v = service.addSchema(namespace, group, original).join();
SchemaInfo schema = service.getSchema(namespace, group, v.getId()).join();
assertTrue(schema.getType().contains("namespace."));

SchemaInfo custom = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.custom("cust"))
.schemaData(ByteBuffer.wrap(jsonSchemaString2.getBytes(Charsets.UTF_8)))
.properties(ImmutableMap.of()).build();
VersionInfo v1 = service.addSchema(namespace, group, custom).join();
SchemaInfo schema1 = service.getSchema(namespace, group, v1.getId()).join();
assertTrue(schema1.getType().contains("namespace."));

// Check if namespace is null or empty
service.createGroup(null, group,
GroupProperties.builder().allowMultipleTypes(true).properties(ImmutableMap.<String, String>builder().build())
.serializationFormat(SerializationFormat.Any)
.compatibility(Compatibility.allowAny()).build()).join();
SchemaInfo original2 = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.Json)
.schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8)))
.properties(ImmutableMap.of()).build();
VersionInfo v2 = service.addSchema(null, group, original2).join();
SchemaInfo schema2 = service.getSchema(null, group, v2.getId()).join();
assertTrue(schema2.getType().contains("DEFAULT_NAMESPACE."));
}

@Test
public void testSchemaNormalization() {
SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor);
Expand Down