diff --git a/common/src/main/java/io/pravega/schemaregistry/common/NameUtil.java b/common/src/main/java/io/pravega/schemaregistry/common/NameUtil.java index 2c2bda322..61a5f6b0f 100644 --- a/common/src/main/java/io/pravega/schemaregistry/common/NameUtil.java +++ b/common/src/main/java/io/pravega/schemaregistry/common/NameUtil.java @@ -12,7 +12,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import java.util.UUID; + public class NameUtil { + private static final String DEFAULT_TYPE = "DEFAULT_NAMESPACE"; /** * Extracts the name from the fully qualified type name. Name represents the last token after ".". * If the qualified name does not contain "." then the name is same as qualified name. @@ -59,4 +62,19 @@ public static String qualifiedName(String qualifier, String name) { Preconditions.checkNotNull(name, "Name cannot be null"); return Strings.isNullOrEmpty(qualifier) ? name : String.format("%s.%s", qualifier, name); } + + /** + * Type value if the 'type' is not null of empty. + * If type is null or empty and the namespace is null or empty created name is 'default_namespace.randomUUID'. + * If type is null or empty and namespace is not null or empty then created name is 'namespace.randomUUID'. + * + * @param type the value provided with API call (schemaInfo.getType()). + * @param namespace the namespace for the schema + * @return Provided name or Created name for type in SchemaInfo + */ + public static String createTypeIfAbsent(String type, String namespace) { + String typeName = Strings.isNullOrEmpty(namespace) ? DEFAULT_TYPE : namespace; + String uuid = UUID.randomUUID().toString(); + return Strings.isNullOrEmpty(type) ? String.format("%s.%s", typeName, uuid) : type; + } } diff --git a/contract/src/main/swagger/SchemaRegistry.yaml b/contract/src/main/swagger/SchemaRegistry.yaml index d6f051ae6..bc5f12165 100644 --- a/contract/src/main/swagger/SchemaRegistry.yaml +++ b/contract/src/main/swagger/SchemaRegistry.yaml @@ -94,6 +94,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -131,6 +132,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -158,6 +160,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -201,6 +204,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: type type: string @@ -261,6 +265,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -293,6 +298,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -329,6 +335,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -376,6 +383,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -433,6 +441,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -476,6 +485,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -512,6 +522,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -559,6 +570,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string @@ -592,6 +604,7 @@ paths: description: Group name required: true type: string + minLength: 3 - in: query name: namespace type: string diff --git a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java index 08b9b6ab9..781f86b84 100644 --- a/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java +++ b/server/src/main/java/io/pravega/schemaregistry/service/SchemaRegistryService.java @@ -264,7 +264,7 @@ public CompletableFuture addSchema(String namespace, String group, Preconditions.checkArgument(group != null); Preconditions.checkArgument(schemaInfo != null); log.debug("addSchema called for group {} {}. schema {}", namespace, group, schemaInfo.getType()); - SchemaInfo schema = normalizeSchemaBinary(schemaInfo); + SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace); // 1. get group policy // 2. get checker for serialization format. // validate schema against group compatibility policy on schema @@ -497,7 +497,7 @@ public CompletableFuture getSchemaVersion(String namespace, String Preconditions.checkArgument(group != null); Preconditions.checkArgument(schemaInfo != null); log.debug("Group {} {}, getSchemaVersion for {}.", namespace, group, schemaInfo.getType()); - SchemaInfo schema = normalizeSchemaBinary(schemaInfo); + SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace); return store.getSchemaVersion(namespace, group, schema, getFingerprint(schema)) .whenComplete((r, e) -> { @@ -526,7 +526,7 @@ public CompletableFuture validateSchema(String namespace, String group, Preconditions.checkArgument(group != null); Preconditions.checkArgument(schemaInfo != null); log.debug("Group {} {}, validateSchema for {}.", namespace, group, schemaInfo.getType()); - SchemaInfo schema = normalizeSchemaBinary(schemaInfo); + SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace); return store.getGroupProperties(namespace, group) .thenCompose(prop -> { @@ -563,7 +563,7 @@ public CompletableFuture canRead(String namespace, String group, Schema Preconditions.checkArgument(schemaInfo != null); log.debug("Group {} {}, canRead for {}.", namespace, group, schemaInfo.getType()); - SchemaInfo schema = normalizeSchemaBinary(schemaInfo); + SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace); return store.getGroupProperties(namespace, group) .thenCompose(prop -> getSchemasForValidation(namespace, group, schema, prop) .thenApply(schemasWithVersion -> canReadChecker(schema, prop, schemasWithVersion))) @@ -812,7 +812,7 @@ private boolean checkCompatibility(SchemaInfo schema, GroupProperties groupPrope } } - private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) { + private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo, String namespace) { // validates and the schema binary. ByteBuffer schemaBinary = schemaInfo.getSchemaData(); boolean isValid = true; @@ -896,11 +896,14 @@ private SchemaInfo normalizeSchemaBinary(SchemaInfo schemaInfo) { // treated to be equal. JsonNode jsonNode = OBJECT_MAPPER.readTree(schemaString); Object obj = OBJECT_MAPPER.treeToValue(jsonNode, Object.class); + + type = NameUtil.createTypeIfAbsent(type, namespace); schemaBinary = ByteBuffer.wrap(OBJECT_MAPPER.writeValueAsString(obj).getBytes(Charsets.UTF_8)); break; case Any: break; case Custom: + type = NameUtil.createTypeIfAbsent(type, namespace); break; default: break; @@ -1032,7 +1035,7 @@ private Boolean canReadChecker(SchemaInfo schema, GroupProperties prop, List> getSchemaReferences(String namespace, SchemaInfo schemaInfo) { - SchemaInfo schema = normalizeSchemaBinary(schemaInfo); + SchemaInfo schema = normalizeSchemaBinary(schemaInfo, namespace); return store.getGroupsUsing(namespace, schema) .thenCompose(groups -> Futures.allOfWithResults( diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/client/TableStore.java b/server/src/main/java/io/pravega/schemaregistry/storage/client/TableStore.java index dfb43328e..57fddae1a 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/client/TableStore.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/client/TableStore.java @@ -183,6 +183,7 @@ public CompletableFuture updateEntry(String tableName, byte[] key, byte } public CompletableFuture> updateEntries(String tableName, Map> batch) { + log.debug("Update entries called for table {}", tableName); Preconditions.checkNotNull(batch); List entries = batch.entrySet().stream().map(x -> { return x.getValue().getVersion() == null ? diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/client/WireCommandClient.java b/server/src/main/java/io/pravega/schemaregistry/storage/client/WireCommandClient.java index 0080bfa0a..6e9e9a059 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/client/WireCommandClient.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/client/WireCommandClient.java @@ -150,6 +150,7 @@ CompletableFuture deleteTableSegment(final String tableName, CompletableFuture> updateTableEntries(final String tableName, final List entries, String delegationToken) { + log.trace("Update table entries wirecommand is called for tableName {}", tableName); return getTableUri(tableName).thenCompose(uri -> { final WireCommandType type = WireCommandType.UPDATE_TABLE_ENTRIES; List> wireCommandEntries = entries.stream().map(te -> { diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java index fb2f10e03..bbef8ef04 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/SchemaStoreImpl.java @@ -27,12 +27,14 @@ import io.pravega.schemaregistry.storage.impl.group.Group; import io.pravega.schemaregistry.storage.impl.groups.Groups; import io.pravega.schemaregistry.storage.impl.schemas.Schemas; +import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; import java.math.BigInteger; import java.util.List; import java.util.concurrent.CompletableFuture; +@Slf4j public class SchemaStoreImpl implements SchemaStore { private final Groups groups; private final Schemas schemas; @@ -133,7 +135,8 @@ public CompletableFuture getLatestSchemaVersion(String namesp @Override public CompletableFuture addSchema(String namespace, String groupId, SchemaInfo schemaInfo, SchemaInfo normalized, BigInteger fingerprint, GroupProperties prop, Etag etag) { - // Store normalized form of schema with the global schemas while the original form is stored within the group. + // Store normalized form of schema with the global schemas while the original form is stored within the group. + log.info("Add schema called to add a new schema in namespace {} and groupId {}", namespace, groupId); return schemas.addSchema(normalized, namespace, groupId) .thenCompose(v -> getGroup(namespace, groupId).thenCompose(grp -> grp.addSchema(schemaInfo, fingerprint, prop, etag))); } diff --git a/server/src/main/java/io/pravega/schemaregistry/storage/impl/schemas/PravegaKeyValueSchemas.java b/server/src/main/java/io/pravega/schemaregistry/storage/impl/schemas/PravegaKeyValueSchemas.java index d85528e1f..65ac4d4d9 100644 --- a/server/src/main/java/io/pravega/schemaregistry/storage/impl/schemas/PravegaKeyValueSchemas.java +++ b/server/src/main/java/io/pravega/schemaregistry/storage/impl/schemas/PravegaKeyValueSchemas.java @@ -22,6 +22,7 @@ import io.pravega.schemaregistry.storage.client.VersionedRecord; import io.pravega.schemaregistry.common.ChunkUtil; import io.pravega.schemaregistry.storage.impl.group.records.NamespaceAndGroup; +import lombok.extern.slf4j.Slf4j; import lombok.val; import java.nio.ByteBuffer; @@ -49,6 +50,7 @@ import static io.pravega.schemaregistry.storage.impl.schemas.SchemaRecords.KeySerializer; import static io.pravega.schemaregistry.storage.impl.schemas.SchemaRecords.fromBytes; +@Slf4j public class PravegaKeyValueSchemas implements Schemas { private static final String SCHEMAS = TableStore.SCHEMA_REGISTRY_SCOPE + "/schemas/0"; private static final KeySerializer KEY_SERIALIZER = new KeySerializer(); @@ -132,6 +134,7 @@ private CompletionStage addNewSchemaRecord(SchemaInfo schemaInfo, Schema entries.put(KEY_SERIALIZER.toBytes(new SchemaIdChunkKey(id, i)), new VersionedRecord<>(bytes, null)); } + log.trace("Call for update entries for new schema addition with schema {}", schemaInfo); return tableStore.updateEntries(SCHEMAS, entries) .thenApply(v -> id); } diff --git a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java index 098e3849e..248ba8f6e 100644 --- a/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java +++ b/server/src/test/java/io/pravega/schemaregistry/service/SchemaRegistryServiceTest.java @@ -720,7 +720,68 @@ public void testDeleteUsingTypeAndVersion() { () -> service.deleteSchema(null, groupName, SerializationFormat.Avro.getFullTypeName(), schemaName, version).join(), e -> e instanceof RuntimeException); } - + + @Test + public void testAddCustomSchema() { + SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor); + SchemaRegistryService service = new SchemaRegistryService(schemaStore, executor); + String namespace = "namespace"; + String group = "group"; + service.createGroup(namespace, group, + GroupProperties.builder().allowMultipleTypes(true).properties(ImmutableMap.builder().build()) + .serializationFormat(SerializationFormat.Any) + .compatibility(Compatibility.allowAny()).build()).join(); + String jsonSchemaString = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"name\": {" + + "\"type\": \"string\"" + + "}," + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}" + + "}" + + "}"; + String jsonSchemaString2 = "{" + + "\"title\": \"Person\", " + + "\"type\": \"object\", " + + "\"properties\": { " + + "\"age\": {" + + "\"type\": \"integer\", \"minimum\": 0" + + "}," + + "\"name\": {" + + "\"type\": \"string\"" + + "}" + + "}" + + "}"; + SchemaInfo original = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v = service.addSchema(namespace, group, original).join(); + SchemaInfo schema = service.getSchema(namespace, group, v.getId()).join(); + assertTrue(schema.getType().contains("namespace.")); + + SchemaInfo custom = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.custom("cust")) + .schemaData(ByteBuffer.wrap(jsonSchemaString2.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v1 = service.addSchema(namespace, group, custom).join(); + SchemaInfo schema1 = service.getSchema(namespace, group, v1.getId()).join(); + assertTrue(schema1.getType().contains("namespace.")); + + // Check if namespace is null or empty + service.createGroup(null, group, + GroupProperties.builder().allowMultipleTypes(true).properties(ImmutableMap.builder().build()) + .serializationFormat(SerializationFormat.Any) + .compatibility(Compatibility.allowAny()).build()).join(); + SchemaInfo original2 = SchemaInfo.builder().type("").serializationFormat(SerializationFormat.Json) + .schemaData(ByteBuffer.wrap(jsonSchemaString.getBytes(Charsets.UTF_8))) + .properties(ImmutableMap.of()).build(); + VersionInfo v2 = service.addSchema(null, group, original2).join(); + SchemaInfo schema2 = service.getSchema(null, group, v2.getId()).join(); + assertTrue(schema2.getType().contains("DEFAULT_NAMESPACE.")); + } + @Test public void testSchemaNormalization() { SchemaStore schemaStore = SchemaStoreFactory.createInMemoryStore(executor);