Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry 4/N #1381

Merged
merged 138 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from 125 commits
Commits
Show all changes
138 commits
Select commit Hold shift + click to select a range
fe86173
Schema Registry proto changes
Feb 13, 2018
6db4333
Infrastructure to store schemas
Feb 13, 2018
a40e96d
A default schema registry implementation
Feb 13, 2018
3e5a517
Add admin api for the schema registry
Feb 13, 2018
c12b69c
Merge branch 'master' into schema-registry-1
Feb 13, 2018
b5fa7a6
Renumber schema fields
Feb 13, 2018
3d1b465
Merge branch 'schema-registry-1' into schema-registry-2
Feb 13, 2018
815d518
Merge branch 'schema-registry-2' into schema-registry-3
Feb 13, 2018
5ac94dc
Merge branch 'schema-registry-3' into schema-registry-4
Feb 13, 2018
4525dce
Update Pulsar API with schema changes
Feb 14, 2018
685b2df
Merge branch 'schema-registry-1' into schema-registry-2
Feb 14, 2018
0895f12
Merge branch 'schema-registry-2' into schema-registry-3
Feb 14, 2018
e4b8a16
Merge branch 'schema-registry-3' into schema-registry-4
Feb 14, 2018
33a4363
Revert field number change
Feb 14, 2018
46cb957
Merge branch 'master' into schema-registry-1
Feb 23, 2018
e5e650b
Merge branch 'schema-registry-1' into schema-registry-2
Feb 23, 2018
9268478
Merge branch 'schema-registry-2' into schema-registry-3
Feb 23, 2018
979dbb9
Merge branch 'schema-registry-3' into schema-registry-4
Feb 23, 2018
74aa08c
Fix merge conflict
Feb 23, 2018
cd2ec8d
Merge branch 'schema-registry-2' into schema-registry-3
Feb 23, 2018
268511b
Merge branch 'schema-registry-3' into schema-registry-4
Feb 23, 2018
f9bd5e5
Merge branch 'master' into schema-registry-1
Feb 26, 2018
61897b3
Merge branch 'schema-registry-1' into schema-registry-2
Feb 26, 2018
3383efd
Merge branch 'schema-registry-2' into schema-registry-3
Feb 26, 2018
853774e
Merge branch 'schema-registry-3' into schema-registry-4
Feb 26, 2018
f20b920
Merge branch 'master' into schema-registry-1
Feb 28, 2018
184beb9
Merge branch 'schema-registry-1' into schema-registry-2
Feb 28, 2018
6530f44
Merge branch 'schema-registry-2' into schema-registry-3
Feb 28, 2018
b68678b
Merge branch 'schema-registry-3' into schema-registry-4
Feb 28, 2018
64df492
Fix broken merge
Feb 28, 2018
2152a9f
Merge branch 'schema-registry-2' into schema-registry-3
Feb 28, 2018
ca25ead
Merge branch 'schema-registry-3' into schema-registry-4
Feb 28, 2018
1159119
DestinationName has been renamed to TopicName
Feb 28, 2018
0405750
Merge branch 'master' into schema-registry-1
Mar 1, 2018
c0d68e5
Address issues in review
Mar 1, 2018
a2de9ad
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
d68696d
Merge branch 'schema-registry-2' into schema-registry-3
Mar 1, 2018
fb550a2
Merge branch 'schema-registry-3' into schema-registry-4
Mar 1, 2018
3bad767
Merge branch 'schema-registry-4' into schema-registry-5
Mar 1, 2018
d9147a6
Add schema type back to proto definition
Mar 1, 2018
9b11edf
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
3e8e31e
Address comments regarding lombok usage
Mar 1, 2018
cf8dd9a
Remove reserved future enum fields
Mar 1, 2018
077ec3c
regenerate code from protobuf
Mar 1, 2018
d2010e2
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
a2b09b4
Remove unused code
Mar 1, 2018
e7e72f4
Add schema version to producer success message
Mar 1, 2018
9753933
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
4b788ea
plumb schema through to producer
Mar 1, 2018
7b902f6
Revert "Add schema version to producer success message"
Mar 1, 2018
755ac8c
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
dda21bb
Revert "Revert "Add schema version to producer success message""
Mar 1, 2018
14f18bb
Merge branch 'master' into schema-registry-1
Mar 1, 2018
efb09de
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
d29d87c
Persist schema on producer connect
Mar 1, 2018
a13f1fc
Merge branch 'schema-registry-2' into schema-registry-3
Mar 1, 2018
1b3bf24
Merge branch 'schema-registry-3' into schema-registry-4
Mar 1, 2018
942b028
Merge branch 'schema-registry-4' into schema-registry-5
Mar 1, 2018
fdec73a
Merge branch 'master' into schema-registry-1
Mar 2, 2018
23f8475
Merge branch 'schema-registry-1' into schema-registry-2
Mar 2, 2018
1b10dd2
Add principal to schema on publish
Mar 2, 2018
92acf7b
Merge branch 'schema-registry-2' into schema-registry-3
Mar 2, 2018
d1205f0
Merge branch 'schema-registry-3' into schema-registry-4
Mar 2, 2018
b995f6e
Merge branch 'schema-registry-4' into schema-registry-5
Mar 2, 2018
3145bbe
Reformat function for readability
Mar 2, 2018
6026e52
Remove unused protoc profile
Mar 2, 2018
661c757
Rename put on schema registry to putIfAbsent
Mar 2, 2018
2706f23
fix lombok tomfoolery on builder
Mar 2, 2018
da59c5b
Reformat function for readability
Mar 2, 2018
96562d7
Remove unused protoc profile
Mar 2, 2018
3243249
Rename put on schema registry to putIfAbsent
Mar 2, 2018
9484f2b
fix compile errors from parent branch changes
Mar 2, 2018
2b6a179
plumb hash through and allow lookup by data
Mar 2, 2018
7d4089d
wip
Mar 2, 2018
989441f
run tests
Mar 2, 2018
03da686
wip: address review comments
Mar 2, 2018
b578ff9
switch underscore to slash in schema name
Mar 2, 2018
b9a2596
blah
Mar 2, 2018
de01cb0
Merge remote-tracking branch 'origin/master' into schema-registry-2
Mar 5, 2018
9103f52
Merge branch 'schema-registry-2' into schema-registry-3
Mar 5, 2018
9800acf
Get duplicate schema detection to work
Mar 5, 2018
9133378
Merge branch 'master' into schema-registry-2
Mar 5, 2018
9947dd6
Fix protobuf version incompatibility
Mar 5, 2018
a8765e0
Merge branch 'master' into schema-registry-2
Mar 5, 2018
3edaa1f
Merge branch 'schema-registry-2' into schema-registry-3
Mar 5, 2018
ccce142
Merge branch 'schema-registry-3' into schema-registry-4
Mar 5, 2018
1beeb31
Merge branch 'schema-registry-4' into schema-registry-5
Mar 5, 2018
3d489a3
Merge branch 'master' into schema-registry-2
Mar 6, 2018
b36a016
Merge branch 'schema-registry-2' into schema-registry-3
Mar 6, 2018
e36934e
Merge branch 'schema-registry-3' into schema-registry-4
Mar 6, 2018
ebb5545
Merge branch 'schema-registry-4' into schema-registry-5
Mar 6, 2018
d756cb0
Merge branch 'master' into schema-registry-3
Mar 9, 2018
daf1161
fix merge issues
Mar 9, 2018
fcbbb21
Merge branch 'master' into schema-registry-3
Mar 12, 2018
eb66272
Merge branch 'schema-registry-3' into schema-registry-4
Mar 12, 2018
a942ce7
Merge branch 'schema-registry-4' into schema-registry-5
Mar 12, 2018
e17516a
Fix license headers
Mar 12, 2018
c74bbcd
Fix license headers
Mar 12, 2018
2d4b75a
Address review
Mar 12, 2018
44a2add
Fix webservice
Mar 12, 2018
ae09b75
Merge branch 'schema-registry-3' into schema-registry-4
Mar 12, 2018
edebff6
Merge branch 'schema-registry-4' into schema-registry-5
Mar 12, 2018
29a2ab7
plumb schema from producer to server and back
Mar 12, 2018
3cc37de
Plumb schema through subscriber
Mar 12, 2018
9f5e5fc
Create and return schema via rest endpoint
Mar 13, 2018
517bdff
Merge branch 'master' into schema-registry-3
Mar 13, 2018
4ef6e14
Merge branch 'schema-registry-3' into schema-registry-4
Mar 13, 2018
7ec0b61
Merge branch 'schema-registry-4' into schema-registry-5
Mar 13, 2018
979b5d8
Make DELETE great again
Mar 13, 2018
ec78536
Clean up imports
Mar 13, 2018
5a1d53d
Merge branch 'master' into schema-registry-3
Mar 13, 2018
85f1b87
Merge branch 'schema-registry-3' into schema-registry-4
Mar 13, 2018
d06ad1f
Merge branch 'schema-registry-4' into schema-registry-5
Mar 13, 2018
9f42830
Merge branch 'master' into schema-registry-4
Mar 14, 2018
4063895
Merge branch 'schema-registry-4' into schema-registry-5
Mar 14, 2018
b709ebd
Move resource objects to common package
Mar 14, 2018
3e36d56
Merge branch 'master' into schema-registry-4
Mar 15, 2018
f9ceea9
Merge branch 'schema-registry-4' into schema-registry-5
Mar 15, 2018
588d910
Merge branch 'master' into schema-registry-4
Mar 19, 2018
9893ce9
Fix licenses
Mar 19, 2018
f4bbb20
Update error message for schema registry service
Mar 19, 2018
95901ad
Remove cruft
Mar 19, 2018
96dcbf9
Merge branch 'master' into schema-registry-4
Mar 19, 2018
5f8c322
Merge branch 'master' into schema-registry-4
Mar 20, 2018
945fa72
Address review comments
Mar 20, 2018
20bc160
Merge branch 'master' into schema-registry-4
Apr 2, 2018
c809d95
Address review comments
Apr 2, 2018
83ad166
Merge branch 'master' into schema-registry-4
Apr 2, 2018
efa4db5
Fix license headers
Apr 3, 2018
7d46959
Merge branch 'master' into schema-registry-4
Apr 3, 2018
04d5ce0
Merge branch 'master' into schema-registry-4
Apr 4, 2018
2603cb3
deal with lombock stuff causing issues
Apr 4, 2018
b4aa948
Merge branch 'master' into schema-registry-4
Apr 4, 2018
70ecc05
Merge branch 'master' into schema-registry-4
Apr 5, 2018
b795df0
Merge branch 'master' into schema-registry-4
Apr 6, 2018
e307464
Resolve conflict
Apr 6, 2018
53c5b15
Merge branch 'master' into schema-registry-4
merlimat Apr 6, 2018
50e37fa
Merge branch 'master' into schema-registry-4
Apr 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import static org.apache.pulsar.common.util.Codec.decode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import io.swagger.annotations.ApiOperation;
import java.time.Clock;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.schema.GetSchemaResponse;
import org.apache.pulsar.common.schema.PostSchemaPayload;
import org.apache.pulsar.common.schema.PostSchemaResponse;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;

@Path("/schemas")
public class SchemasResource extends AdminResource {

private final Clock clock;

public SchemasResource() {
this(Clock.systemUTC());
}

@VisibleForTesting
public SchemasResource(Clock clock) {
super();
this.clock = clock;
}

@GET
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get topic schema", response = GetSchemaResponse.class)
public void getSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
pulsar().getSchemaRegistryService().getSchema(schemaId)
.handle((schema, error) -> {
if (isNull(error)) {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(schema.version)
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.props)
.build()
)
.build()
);
} else {
response.resume(error);
}
return null;
});
}

@GET
@Path("/{property}/{namespace}/{topic}/schema/{version}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get topic schema")
public void getSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@PathParam("version") @Encoded String version,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
pulsar().getSchemaRegistryService().getSchema(schemaId, v)
.handle((schema, error) -> {
if (isNull(error)) {
if (schema.schema.isDeleted()) {
response.resume(Response.noContent());
} else {
response.resume(
Response.ok()
.encoding(MediaType.APPLICATION_JSON)
.entity(GetSchemaResponse.builder()
.version(schema.version)
.type(schema.schema.getType())
.timestamp(schema.schema.getTimestamp())
.data(new String(schema.schema.getData()))
.properties(schema.schema.props)
.build()
).build()
);
}
} else {
response.resume(error);
}
return null;
});
}

@DELETE
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Delete topic schema")
public void deleteSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

String schemaId = buildSchemaId(property, namespace, topic);
pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), ""))
.handle((version, error) -> {
if (isNull(error)) {
response.resume(
Response.ok().entity(
DeleteSchemaResponse.builder()
.version(version)
.build()
).build()
);
} else {
response.resume(error);
}
return null;
});
}

@POST
@Path("/{property}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Post topic schema")
public void postSchema(
@PathParam("property") String property,
@PathParam("namespace") String namespace,
@PathParam("topic") String topic,
PostSchemaPayload payload,
@Suspended final AsyncResponse response
) {
validateDestinationAndAdminOperation(property, namespace, topic);

pulsar().getSchemaRegistryService().putSchemaIfAbsent(
buildSchemaId(property, namespace, topic),
SchemaData.builder()
.data(payload.getSchema().getBytes(Charsets.UTF_8))
.isDeleted(false)
.timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.build()
).thenAccept(version ->
response.resume(
Response.accepted().entity(
PostSchemaResponse.builder()
.version(version)
.build()
).build()
)
);
}

private String buildSchemaId(String property, String namespace, String topic) {
return TopicName.get("persistent", property, namespace, topic).getSchemaName();
}

private void validateDestinationAndAdminOperation(String property, String namespace, String topic) {
TopicName destinationName = TopicName.get(
"persistent", property, namespace, decode(topic)
);

try {
validateAdminAccessOnProperty(destinationName.getProperty());
validateTopicOwnership(destinationName, false);
} catch (RestException e) {
if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) {
throw new RestException(Response.Status.NOT_FOUND, "Not Found");
} else {
throw e;
}
}
}

private void validateDestinationExists(TopicName dn) {
try {
Topic topic = pulsar().getBrokerService().getTopicReference(dn.toString());
checkNotNull(topic);
} catch (Exception e) {
throw new RestException(Response.Status.NOT_FOUND, "Topic not found");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -21,28 +21,29 @@
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.protobuf.ByteString.copyFrom;
import static java.util.Collections.emptyMap;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
Expand All @@ -55,17 +56,20 @@
public class BookkeeperSchemaStorage implements SchemaStorage {
private static final String SchemaPath = "/schemas";
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static final byte[] LedgerPassword = "".getBytes();

private final PulsarService pulsar;
private final ZooKeeper zooKeeper;
private final ZooKeeperCache localZkCache;
private final ServiceConfiguration config;
private BookKeeper bookKeeper;

@VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) {
this.pulsar = pulsar;
this.localZkCache = pulsar.getLocalZkCache();
this.zooKeeper = localZkCache.getZooKeeper();
this.config = pulsar.getConfiguration();
}

@VisibleForTesting
Expand All @@ -79,6 +83,7 @@ public void init() throws KeeperException, InterruptedException {
}
}

@Override
public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar.getConfiguration(),
Expand Down Expand Up @@ -119,8 +124,7 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId) {
.thenApply(entry ->
new StoredSchema(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion()),
emptyMap()
new LongSchemaVersion(schemaLocator.getInfo().getVersion())
)
);
});
Expand Down Expand Up @@ -156,8 +160,7 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId, long version)
.thenApply(entry ->
new StoredSchema(
entry.getSchemaData().toByteArray(),
new LongSchemaVersion(version),
emptyMap()
new LongSchemaVersion(version)
)
);
});
Expand Down Expand Up @@ -377,22 +380,30 @@ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, SchemaStorag
@NotNull
private CompletableFuture<LedgerHandle> createLedger() {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{},
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
config.getManagedLedgerDefaultAckQuorum(),
config.getManagedLedgerDigestType(),
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(BKException.create(rc));
} else {
future.complete(handle);
}
}, null
}, null, Collections.emptyMap()
);
return future;
}

@NotNull
private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{},
bookKeeper.asyncOpenLedger(
ledgerId,
config.getManagedLedgerDigestType(),
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(BKException.create(rc));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ static SchemaRegistryService create(PulsarService pulsar) {
Object factoryInstance = storageClass.newInstance();
Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class);
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar);
schemaStorage.start();
return new SchemaRegistryServiceImpl(schemaStorage);
} catch (Exception e) {
log.warn("Error when trying to create scehema registry storage: {}", e);
log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e);
}
return new DefaultSchemaRegistryService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface SchemaStorage {

SchemaVersion versionFromBytes(byte[] version);

void start() throws Exception;

void close() throws Exception;

}
Loading