From 2146f46fb5b0cdde1a91f3eec00775cadc2f5b4e Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 16 Dec 2014 17:45:26 -0800 Subject: [PATCH] Fixed formatting to match the coding style. Added log4j and proper exception handling --- pom.xml | 6 + .../kafka/schemaregistry/rest/Main.java | 48 +- .../rest/SchemaRegistryApplicationConfig.java | 131 +++-- .../rest/SchemaRegistryRestApplication.java | 34 +- .../rest/SchemaRegistryRestConfiguration.java | 240 ++++----- .../kafka/schemaregistry/rest/Versions.java | 40 +- .../schemaregistry/rest/entities/Schema.java | 262 ++++----- .../schemaregistry/rest/entities/Topic.java | 190 ++++--- .../requests/RegisterSchemaRequest.java | 61 ++- .../requests/RegisterSchemaResponse.java | 22 +- .../rest/resources/RootResource.java | 46 +- .../rest/resources/SchemasResource.java | 53 +- .../rest/resources/TopicsResource.java | 25 +- .../schemaregistry/storage/InMemoryStore.java | 89 ++-- .../storage/KafkaSchemaRegistry.java | 140 +++++ .../schemaregistry/storage/KafkaStore.java | 78 ++- .../storage/KafkaStoreConfig.java | 88 ++-- .../storage/KafkaStoreReaderThread.java | 46 +- .../schemaregistry/storage/RocksDbConfig.java | 69 ++- .../schemaregistry/storage/RocksDbStore.java | 18 +- .../storage/SchemaRegistry.java | 120 +---- .../storage/SchemaRegistryConfig.java | 43 +- .../kafka/schemaregistry/storage/Store.java | 48 +- .../exceptions/SchemaRegistryException.java | 20 + .../exceptions/SerializationException.java | 20 + .../StoreInitializationException.java | 2 - .../serialization/SchemaSerializer.java | 54 +- .../storage/serialization/Serializer.java | 37 +- .../serialization/StringSerializer.java | 28 +- .../serialization/ZkStringSerializer.java | 17 +- .../kafka/schemaregistry/utils/Pair.java | 2 +- src/main/resources/log4j.properties | 6 +- .../schemaregistry/ClusterTestHarness.java | 178 ++++--- .../storage/KafkaStoreTest.java | 496 +++++++++--------- .../schemaregistry/storage/RocksDbTest.java | 12 - .../kafka/schemaregistry/utils/TestUtils.java | 16 +- 36 files changed, 1549 insertions(+), 1236 deletions(-) create mode 100644 src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java create mode 100644 src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java create mode 100644 src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java diff --git a/pom.xml b/pom.xml index cb108e2d12b..17cfb5fabe9 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,7 @@ 3.0.0 0.8.2-beta 2.10 + 1.7.6 3.0 3.5.1 0.1-SNAPSHOT @@ -101,6 +102,11 @@ rest-utils ${restutils.version} + + org.slf4j + slf4j-log4j12 + ${log4j.version} + junit diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java index c6f302c0974..e3ee539b113 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java @@ -1,30 +1,36 @@ package io.confluent.kafka.schemaregistry.rest; -import io.confluent.rest.ConfigurationException; import org.eclipse.jetty.server.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import io.confluent.rest.ConfigurationException; + public class Main { - /** - * Starts an embedded Jetty server running the REST server. - */ - public static void main(String[] args) throws IOException { - try { - SchemaRegistryRestConfiguration config = new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null)); - SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config); - Server server = app.createServer(); - server.start(); - System.out.println("Server started, listening for requests..."); - server.join(); - } catch (ConfigurationException e) { - System.out.println("Server configuration failed: " + e.getMessage()); - e.printStackTrace(); - System.exit(1); - } catch (Exception e) { - System.err.println("Server died unexpectedly: " + e.toString()); - e.printStackTrace(); - System.exit(1); - } + + private static final Logger log = LoggerFactory.getLogger(Main.class); + + /** + * Starts an embedded Jetty server running the REST server. + */ + public static void main(String[] args) throws IOException { + + try { + SchemaRegistryRestConfiguration config = + new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null)); + SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config); + Server server = app.createServer(); + server.start(); + log.info("Server started, listening for requests..."); + server.join(); + } catch (ConfigurationException e) { + log.error("Server configuration failed: ", e); + System.exit(1); + } catch (Exception e) { + log.error("Server died unexpectedly: ", e); + System.exit(1); } + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java index bdf7faeb9a6..08fa7f37d99 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java @@ -1,77 +1,76 @@ package io.confluent.kafka.schemaregistry.rest; -import io.confluent.rest.Configuration; -import io.confluent.rest.ConfigurationException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import java.util.Properties; -public class SchemaRegistryApplicationConfig extends Configuration { - public Time time; - - public boolean debug; - public static final String DEFAULT_DEBUG = "false"; - - public int port; - public static final String DEFAULT_PORT = "8080"; - - public String zookeeperConnect; - public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181"; - - public String bootstrapServers; - public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; - - public int producerThreads; - public static final String DEFAULT_PRODUCER_THREADS = "5"; - - /** - * The consumer timeout used to limit consumer iterator operations. This is effectively the maximum error for the - * entire request timeout. It should be small enough to get reasonably close to the timeout, but large enough to - * not result in busy waiting. - */ - public int consumerIteratorTimeoutMs; - public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50"; - - /** The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached. */ - public int consumerRequestTimeoutMs; - public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000"; - - /** The maximum number of messages returned in a single request. */ - public int consumerRequestMaxMessages; - public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100"; - - public int consumerThreads; - public static final String DEFAULT_CONSUMER_THREADS = "1"; - - /** Amount of idle time before a consumer instance is automatically destroyed. */ - public int consumerInstanceTimeoutMs; - public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000"; - - public SchemaRegistryApplicationConfig() throws ConfigurationException { - this(new Properties()); - } - - public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException { - time = new SystemTime(); +import io.confluent.rest.Configuration; +import io.confluent.rest.ConfigurationException; - debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG)); +public class SchemaRegistryApplicationConfig extends Configuration { - port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT)); - zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT); - bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS); - producerThreads = Integer.parseInt(props.getProperty("producer.threads", - DEFAULT_PRODUCER_THREADS)); - consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty( - "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS)); - consumerRequestTimeoutMs = Integer.parseInt(props.getProperty( - "consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS)); - consumerRequestMaxMessages = Integer.parseInt(props.getProperty( - "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES)); - consumerThreads = Integer.parseInt(props.getProperty("consumer.threads", - DEFAULT_CONSUMER_THREADS)); - consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty( - "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS)); - } + public static final String DEFAULT_DEBUG = "false"; + public static final String DEFAULT_PORT = "8080"; + public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181"; + public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + public static final String DEFAULT_PRODUCER_THREADS = "5"; + public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50"; + public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000"; + public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100"; + public static final String DEFAULT_CONSUMER_THREADS = "1"; + public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000"; + public Time time; + public boolean debug; + public int port; + public String zookeeperConnect; + public String bootstrapServers; + public int producerThreads; + /** + * The consumer timeout used to limit consumer iterator operations. This is effectively the + * maximum error for the entire request timeout. It should be small enough to get reasonably close + * to the timeout, but large enough to not result in busy waiting. + */ + public int consumerIteratorTimeoutMs; + /** + * The maximum total time to wait for messages for a request if the maximum number of messages has + * not yet been reached. + */ + public int consumerRequestTimeoutMs; + /** + * The maximum number of messages returned in a single request. + */ + public int consumerRequestMaxMessages; + public int consumerThreads; + /** + * Amount of idle time before a consumer instance is automatically destroyed. + */ + public int consumerInstanceTimeoutMs; + + public SchemaRegistryApplicationConfig() throws ConfigurationException { + this(new Properties()); + } + + public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException { + time = new SystemTime(); + + debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG)); + + port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT)); + zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT); + bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS); + producerThreads = Integer.parseInt(props.getProperty("producer.threads", + DEFAULT_PRODUCER_THREADS)); + consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty( + "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS)); + consumerRequestTimeoutMs = Integer.parseInt(props.getProperty( + "consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS)); + consumerRequestMaxMessages = Integer.parseInt(props.getProperty( + "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES)); + consumerThreads = Integer.parseInt(props.getProperty("consumer.threads", + DEFAULT_CONSUMER_THREADS)); + consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty( + "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS)); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java index 749512070da..4cb2f946fd2 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java @@ -1,21 +1,29 @@ package io.confluent.kafka.schemaregistry.rest; -import io.confluent.kafka.schemaregistry.rest.entities.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +import javax.ws.rs.core.Configurable; + import io.confluent.kafka.schemaregistry.rest.resources.RootResource; import io.confluent.kafka.schemaregistry.rest.resources.SchemasResource; import io.confluent.kafka.schemaregistry.rest.resources.TopicsResource; +import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry; import io.confluent.kafka.schemaregistry.storage.KafkaStoreConfig; import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; import io.confluent.kafka.schemaregistry.storage.SchemaRegistryConfig; +import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.storage.serialization.SchemaSerializer; import io.confluent.rest.Application; import io.confluent.rest.ConfigurationException; -import javax.ws.rs.core.Configurable; -import java.util.Properties; - public class SchemaRegistryRestApplication extends Application { - public SchemaRegistryRestApplication() throws ConfigurationException { + + private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class); + + public SchemaRegistryRestApplication() throws ConfigurationException { this(new Properties()); } @@ -31,11 +39,17 @@ public SchemaRegistryRestApplication(SchemaRegistryRestConfiguration config) { public void setupResources(Configurable config, SchemaRegistryRestConfiguration appConfig) { Properties props = new Properties(); props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, "localhost:2181"); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "schemaregistry"); - SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props); - SchemaRegistry schemaRegistry = new SchemaRegistry(schemaRegistryConfig, - new SchemaSerializer()); - config.register(RootResource.class); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "_schemas"); + SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props); + SchemaRegistry schemaRegistry = null; + try { + schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig, + new SchemaSerializer()); + } catch (SchemaRegistryException e) { + log.error("Error starting the schema registry", e); + System.exit(1); + } + config.register(RootResource.class); config.register(new TopicsResource(schemaRegistry)); config.register(SchemasResource.class); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java index 052b22dbd60..b996d72fe58 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java @@ -1,7 +1,5 @@ package io.confluent.kafka.schemaregistry.rest; -import io.confluent.rest.Configuration; -import io.confluent.rest.ConfigurationException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -9,124 +7,128 @@ import java.io.IOException; import java.util.Properties; -public class SchemaRegistryRestConfiguration extends Configuration { - public Time time; - - /** - * Unique ID for this REST server instance. This is used in generating unique IDs for consumers that do not specify - * their ID. The ID is empty by default, which makes a single server setup easier to get up and running, but is not - * safe for multi-server deployments where automatic consumer IDs are used. - */ - public String id; - public static final String DEFAULT_ID = ""; - - public boolean debug; - public static final String DEFAULT_DEBUG = "false"; - - public int port; - public static final String DEFAULT_PORT = "8080"; - - public String zookeeperConnect; - public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181"; - - public String bootstrapServers; - public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; - - public int producerThreads; - public static final String DEFAULT_PRODUCER_THREADS = "5"; - - /** - * The consumer timeout used to limit consumer iterator operations. This should be very small so we can effectively - * peek() on the iterator. - */ - public int consumerIteratorTimeoutMs; - public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "1"; - - /** - * Amount of time to backoff when an iterator runs out of data. When a consumer has a dedicated worker thread, this - * is effectively the maximum error for the entire request timeout. It should be small enough to get reasonably close - * to the timeout, but large enough to avoid busy waiting. - */ - public int consumerIteratorBackoffMs; - public static final String DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS = "50"; - - /** The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached. */ - public int consumerRequestTimeoutMs; - public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000"; - - /** The maximum number of messages returned in a single request. */ - public int consumerRequestMaxMessages; - public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100"; - - public int consumerThreads; - public static final String DEFAULT_CONSUMER_THREADS = "1"; - - /** Amount of idle time before a consumer instance is automatically destroyed. */ - public int consumerInstanceTimeoutMs; - public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000"; - - public SchemaRegistryRestConfiguration() throws ConfigurationException { - this(new Properties()); - } - - public SchemaRegistryRestConfiguration(String propsFile) throws ConfigurationException { - this(getPropsFromFile(propsFile)); - } - - public SchemaRegistryRestConfiguration(Properties props) throws ConfigurationException { - time = new SystemTime(); - - id = props.getProperty("id", DEFAULT_ID); - - debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG)); - - port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT)); - zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT); - bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS); - producerThreads = Integer.parseInt(props.getProperty("producer.threads", - DEFAULT_PRODUCER_THREADS)); - consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty( - "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS)); - consumerIteratorBackoffMs = Integer.parseInt(props.getProperty( - "consumer.iterator.backoff.ms", DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS)); - consumerRequestTimeoutMs = Integer.parseInt(props.getProperty("consumer.request.timeout.ms", - DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS)); - consumerRequestMaxMessages = Integer.parseInt(props.getProperty( - "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES)); - consumerThreads = Integer.parseInt(props.getProperty("consumer.threads", - DEFAULT_CONSUMER_THREADS)); - consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty( - "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS)); - } - - @Override - public boolean getDebug() { - return debug; - } +import io.confluent.rest.Configuration; +import io.confluent.rest.ConfigurationException; - @Override - public int getPort() { - return port; - } +public class SchemaRegistryRestConfiguration extends Configuration { - @Override - public Iterable getPreferredResponseMediaTypes() { - return Versions.PREFERRED_RESPONSE_TYPES; + public static final String DEFAULT_ID = ""; + public static final String DEFAULT_DEBUG = "false"; + public static final String DEFAULT_PORT = "8080"; + public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181"; + public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + public static final String DEFAULT_PRODUCER_THREADS = "5"; + public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "1"; + public static final String DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS = "50"; + public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000"; + public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100"; + public static final String DEFAULT_CONSUMER_THREADS = "1"; + public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000"; + public Time time; + /** + * Unique ID for this REST server instance. This is used in generating unique IDs for consumers + * that do not specify their ID. The ID is empty by default, which makes a single server setup + * easier to get up and running, but is not safe for multi-server deployments where automatic + * consumer IDs are used. + */ + public String id; + public boolean debug; + public int port; + public String zookeeperConnect; + public String bootstrapServers; + public int producerThreads; + /** + * The consumer timeout used to limit consumer iterator operations. This should be very small so + * we can effectively peek() on the iterator. + */ + public int consumerIteratorTimeoutMs; + /** + * Amount of time to backoff when an iterator runs out of data. When a consumer has a dedicated + * worker thread, this is effectively the maximum error for the entire request timeout. It should + * be small enough to get reasonably close to the timeout, but large enough to avoid busy + * waiting. + */ + public int consumerIteratorBackoffMs; + /** + * The maximum total time to wait for messages for a request if the maximum number of messages has + * not yet been reached. + */ + public int consumerRequestTimeoutMs; + /** + * The maximum number of messages returned in a single request. + */ + public int consumerRequestMaxMessages; + public int consumerThreads; + /** + * Amount of idle time before a consumer instance is automatically destroyed. + */ + public int consumerInstanceTimeoutMs; + + public SchemaRegistryRestConfiguration() throws ConfigurationException { + this(new Properties()); + } + + public SchemaRegistryRestConfiguration(String propsFile) throws ConfigurationException { + this(getPropsFromFile(propsFile)); + } + + public SchemaRegistryRestConfiguration(Properties props) throws ConfigurationException { + time = new SystemTime(); + + id = props.getProperty("id", DEFAULT_ID); + + debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG)); + + port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT)); + zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT); + bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS); + producerThreads = Integer.parseInt(props.getProperty("producer.threads", + DEFAULT_PRODUCER_THREADS)); + consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty( + "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS)); + consumerIteratorBackoffMs = Integer.parseInt(props.getProperty( + "consumer.iterator.backoff.ms", DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS)); + consumerRequestTimeoutMs = Integer.parseInt(props.getProperty("consumer.request.timeout.ms", + DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS)); + consumerRequestMaxMessages = Integer.parseInt(props.getProperty( + "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES)); + consumerThreads = Integer.parseInt(props.getProperty("consumer.threads", + DEFAULT_CONSUMER_THREADS)); + consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty( + "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS)); + } + + private static Properties getPropsFromFile(String propsFile) throws ConfigurationException { + Properties props = new Properties(); + if (propsFile == null) { + return props; } - - @Override - public String getDefaultResponseMediaType() { - return Versions.SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT; + try { + FileInputStream propStream = new FileInputStream(propsFile); + props.load(propStream); + } catch (IOException e) { + throw new ConfigurationException("Couldn't load properties from " + propsFile, e); } - - private static Properties getPropsFromFile(String propsFile) throws ConfigurationException { - Properties props = new Properties(); - if (propsFile == null) return props; - try { - FileInputStream propStream = new FileInputStream(propsFile); - props.load(propStream); - } catch (IOException e) { - throw new ConfigurationException("Couldn't load properties from " + propsFile, e); - } - return props; - }} + return props; + } + + @Override + public boolean getDebug() { + return debug; + } + + @Override + public int getPort() { + return port; + } + + @Override + public Iterable getPreferredResponseMediaTypes() { + return Versions.PREFERRED_RESPONSE_TYPES; + } + + @Override + public String getDefaultResponseMediaType() { + return Versions.SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT; + } +} diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java index 0893bdb4759..005aaef2eb4 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java @@ -4,25 +4,27 @@ import java.util.List; public class Versions { - public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json"; - // Default weight = 1 - public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON; - // These are defaults that track the most recent API version. These should always be specified - // anywhere the latest version is produced/consumed. - public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON; - public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json"; - public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = SCHEMA_REGISTRY_DEFAULT_JSON + - "; qs=0.9"; - public static final String JSON = "application/json"; - public static final String JSON_WEIGHTED = JSON + "; qs=0.5"; - public static final List PREFERRED_RESPONSE_TYPES = Arrays - .asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON, - Versions.JSON); + public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json"; + // Default weight = 1 + public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON; + // These are defaults that track the most recent API version. These should always be specified + // anywhere the latest version is produced/consumed. + public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON; + public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json"; + public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = + SCHEMA_REGISTRY_DEFAULT_JSON + + "; qs=0.9"; + public static final String JSON = "application/json"; + public static final String JSON_WEIGHTED = JSON + "; qs=0.5"; - // This type is completely generic and carries no actual information about the type of data, but - // it is the default for request entities if no content type is specified. Well behaving users - // of the API will always specify the content type, but ad hoc use may omit it. We treat this as - // JSON since that's all we currently support. - public static final String GENERIC_REQUEST = "application/octet-stream"; + public static final List PREFERRED_RESPONSE_TYPES = Arrays + .asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON, + Versions.JSON); + + // This type is completely generic and carries no actual information about the type of data, but + // it is the default for request entities if no content type is specified. Well behaving users + // of the API will always specify the content type, but ad hoc use may omit it. We treat this as + // JSON since that's all we currently support. + public static final String GENERIC_REQUEST = "application/octet-stream"; } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java index 293663a6300..a0de9fb2a7b 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java @@ -1,134 +1,152 @@ package io.confluent.kafka.schemaregistry.rest.entities; import com.fasterxml.jackson.annotation.JsonProperty; + import org.hibernate.validator.constraints.NotEmpty; import javax.validation.constraints.Min; public class Schema { - @NotEmpty - private String name; - @Min(1) - private Integer version; - @NotEmpty - private String schema; - private boolean compatible = true; - private boolean deprecated = false; - private boolean latest = true; - - public Schema(@JsonProperty("name") String name, - @JsonProperty("version") Integer version, - @JsonProperty("schema") String schema, - @JsonProperty("compatible") boolean compatible, - @JsonProperty("deprecated") boolean deprecated, - @JsonProperty("latest") boolean latest) { - this.name = name; - this.version = version; - this.schema = schema; - this.compatible = compatible; - this.deprecated = deprecated; - this.latest = latest; - } - - @JsonProperty("name") - public String getName() { - return name; - } - - @JsonProperty("name") - public void setName(String name) { - this.name = name; - } - - @JsonProperty("schema") - public String getSchema() { - return this.schema; - } - - @JsonProperty("schema") - public void setSchema(String schema) { - this.schema = schema; - } - - @JsonProperty("version") - public Integer getVersion() { - return this.version; - } - - @JsonProperty("version") - public void setVersion(Integer version) { - this.version = version; - } - - @JsonProperty("compatible") - public boolean getCompatible() { - return this.compatible; - } - - @JsonProperty("compatible") - public void setCompatible(boolean compatible) { - this.compatible = compatible; - } - - @JsonProperty("deprecated") - public boolean getDeprecated() { - return this.deprecated; - } - - @JsonProperty("deprecated") - public void setDeprecated(boolean deprecated) { - this.deprecated = deprecated; - } - - @JsonProperty("latest") - public boolean getLatest() { - return this.latest; - } - - @JsonProperty("latest") - public void setLatest(boolean latest) { - this.latest = latest; - } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Schema that = (Schema) o; - - if (!name.equals(that.getName())) return false; - if (!schema.equals(that.schema)) return false; - if (this.version != that.getVersion()) return false; - if (this.compatible && !that.compatible) return false; - if (this.deprecated && !that.deprecated) return false; - if (this.latest && !that.latest) return false; - - return true; - } - - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + schema.hashCode(); - result = 31 * result + version; - result = 31 * result + new Boolean(compatible).hashCode(); - result = 31 * result + new Boolean(deprecated).hashCode(); - result = 31 * result + new Boolean(latest).hashCode(); - return result; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{name=" + this.name + ","); - sb.append("schema=" + this.schema + ","); - sb.append("version=" + this.version + ","); - sb.append("compatible=" + this.compatible + ","); - sb.append("deprecated=" + this.deprecated + ","); - sb.append("latest=" + this.latest + "}"); - return sb.toString(); - } + @NotEmpty + private String name; + @Min(1) + private Integer version; + @NotEmpty + private String schema; + private boolean compatible = true; + private boolean deprecated = false; + private boolean latest = true; + + public Schema(@JsonProperty("name") String name, + @JsonProperty("version") Integer version, + @JsonProperty("schema") String schema, + @JsonProperty("compatible") boolean compatible, + @JsonProperty("deprecated") boolean deprecated, + @JsonProperty("latest") boolean latest) { + this.name = name; + this.version = version; + this.schema = schema; + this.compatible = compatible; + this.deprecated = deprecated; + this.latest = latest; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + @JsonProperty("name") + public void setName(String name) { + this.name = name; + } + + @JsonProperty("schema") + public String getSchema() { + return this.schema; + } + + @JsonProperty("schema") + public void setSchema(String schema) { + this.schema = schema; + } + + @JsonProperty("version") + public Integer getVersion() { + return this.version; + } + + @JsonProperty("version") + public void setVersion(Integer version) { + this.version = version; + } + + @JsonProperty("compatible") + public boolean getCompatible() { + return this.compatible; + } + + @JsonProperty("compatible") + public void setCompatible(boolean compatible) { + this.compatible = compatible; + } + + @JsonProperty("deprecated") + public boolean getDeprecated() { + return this.deprecated; + } + + @JsonProperty("deprecated") + public void setDeprecated(boolean deprecated) { + this.deprecated = deprecated; + } + + @JsonProperty("latest") + public boolean getLatest() { + return this.latest; + } + + @JsonProperty("latest") + public void setLatest(boolean latest) { + this.latest = latest; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Schema that = (Schema) o; + + if (!name.equals(that.getName())) { + return false; + } + if (!schema.equals(that.schema)) { + return false; + } + if (this.version != that.getVersion()) { + return false; + } + if (this.compatible && !that.compatible) { + return false; + } + if (this.deprecated && !that.deprecated) { + return false; + } + if (this.latest && !that.latest) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + schema.hashCode(); + result = 31 * result + version; + result = 31 * result + new Boolean(compatible).hashCode(); + result = 31 * result + new Boolean(deprecated).hashCode(); + result = 31 * result + new Boolean(latest).hashCode(); + return result; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{name=" + this.name + ","); + sb.append("schema=" + this.schema + ","); + sb.append("version=" + this.version + ","); + sb.append("compatible=" + this.compatible + ","); + sb.append("deprecated=" + this.deprecated + ","); + sb.append("latest=" + this.latest + "}"); + return sb.toString(); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java index 267ab74e8da..000e6093a53 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java @@ -1,106 +1,122 @@ package io.confluent.kafka.schemaregistry.rest.entities; import com.fasterxml.jackson.annotation.JsonProperty; + import org.hibernate.validator.constraints.NotEmpty; public class Topic { - @NotEmpty - private String name; - - private String compatibility = "full"; - private String registration = "all"; - private String deprecation = "all"; - private String validators = null; - - public Topic(@JsonProperty("name") String name, - @JsonProperty("compatibility") String compatibility, - @JsonProperty("registration") String registration, - @JsonProperty("deprecation") String deprecation, - @JsonProperty("validators") String validators) { - this.name = name; - this.compatibility = compatibility; - this.registration = registration; - this.deprecation = deprecation; - this.validators = validators; - } - - public Topic(@JsonProperty("name") String name) { - this.name = name; - } - - @JsonProperty("name") - public String getName() { - return name; - } - - @JsonProperty("name") - public void setName(String name) { - this.name = name; - } - - @JsonProperty("compatibility") - public String getCompatibility() { - return this.compatibility; - } - @JsonProperty("compatibility") - public void setCompatibility(String compatibility) { - this.compatibility = compatibility; + @NotEmpty + private String name; + + private String compatibility = "full"; + private String registration = "all"; + private String deprecation = "all"; + private String validators = null; + + public Topic(@JsonProperty("name") String name, + @JsonProperty("compatibility") String compatibility, + @JsonProperty("registration") String registration, + @JsonProperty("deprecation") String deprecation, + @JsonProperty("validators") String validators) { + this.name = name; + this.compatibility = compatibility; + this.registration = registration; + this.deprecation = deprecation; + this.validators = validators; + } + + public Topic(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty("name") + public String getName() { + return name; + } + + @JsonProperty("name") + public void setName(String name) { + this.name = name; + } + + @JsonProperty("compatibility") + public String getCompatibility() { + return this.compatibility; + } + + @JsonProperty("compatibility") + public void setCompatibility(String compatibility) { + this.compatibility = compatibility; + } + + @JsonProperty("registration") + public String getRegistration() { + return this.registration; + } + + @JsonProperty("registration") + public void setRegistration(String registration) { + this.registration = registration; + } + + @JsonProperty("deprecation") + public String getDeprecation() { + return this.deprecation; + } + + @JsonProperty("deprecation") + public void setDeprecation(String deprecation) { + this.deprecation = deprecation; + } + + @JsonProperty("validators") + public String getValidators() { + return this.validators; + } + + @JsonProperty("validators") + public void setValidators(String validators) { + this.validators = validators; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; } - - @JsonProperty("registration") - public String getRegistration() { - return this.registration; + if (o == null || getClass() != o.getClass()) { + return false; } - @JsonProperty("registration") - public void setRegistration(String registration) { - this.registration = registration; - } + Topic topic = (Topic) o; - @JsonProperty("deprecation") - public String getDeprecation() { - return this.deprecation; + if (!name.equals(topic.name)) { + return false; } - - @JsonProperty("deprecation") - public void setDeprecation(String deprecation) { - this.deprecation = deprecation; + if (!this.compatibility.equals(topic.compatibility)) { + return false; } - - @JsonProperty("validators") - public String getValidators() { - return this.validators; + if (!this.registration.equals(topic.registration)) { + return false; } - - @JsonProperty("validators") - public void setValidators(String validators) { - this.validators = validators; + if (!this.deprecation.equals(topic.deprecation)) { + return false; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Topic topic = (Topic) o; - - if (!name.equals(topic.name)) return false; - if (!this.compatibility.equals(topic.compatibility)) return false; - if (!this.registration.equals(topic.registration)) return false; - if (!this.deprecation.equals(topic.deprecation)) return false; - if (!this.validators.equals(topic.validators)) return false; - - return true; + if (!this.validators.equals(topic.validators)) { + return false; } - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + this.registration.hashCode(); - result = 31 * result + this.deprecation.hashCode(); - result = 31 * result + this.validators.hashCode(); - return result; - } + return true; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + this.registration.hashCode(); + result = 31 * result + this.deprecation.hashCode(); + result = 31 * result + this.validators.hashCode(); + return result; + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java index 58953e36710..d4e2199dfc7 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java @@ -1,42 +1,49 @@ package io.confluent.kafka.schemaregistry.rest.entities.requests; import com.fasterxml.jackson.annotation.JsonProperty; -import io.confluent.kafka.schemaregistry.rest.entities.Schema; + import org.hibernate.validator.constraints.NotEmpty; public class RegisterSchemaRequest { - @NotEmpty - private String schema; - @JsonProperty("schema") - public String getSchema() { - return this.schema; - } + @NotEmpty + private String schema; - @JsonProperty("schema") - public void setSchema(String schema) { - this.schema = schema; - } + @JsonProperty("schema") + public String getSchema() { + return this.schema; + } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + @JsonProperty("schema") + public void setSchema(String schema) { + this.schema = schema; + } - RegisterSchemaRequest that = (RegisterSchemaRequest) o; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } - if (schema != null ? !schema.equals(that.schema) : that.schema != null) { - return false; - } + RegisterSchemaRequest that = (RegisterSchemaRequest) o; - return true; + if (schema != null ? !schema.equals(that.schema) : that.schema != null) { + return false; } - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (schema != null ? schema.hashCode() : 0); - return result; - } + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (schema != null ? schema.hashCode() : 0); + return result; + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java index 07ac0fbe7aa..200fa782111 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java @@ -1,20 +1,22 @@ package io.confluent.kafka.schemaregistry.rest.entities.requests; import com.fasterxml.jackson.annotation.JsonProperty; + import org.hibernate.validator.constraints.NotEmpty; public class RegisterSchemaResponse { - @NotEmpty - private int version; - @JsonProperty("version") - public int getVersion() { - return version; - } + @NotEmpty + private int version; + + @JsonProperty("version") + public int getVersion() { + return version; + } - @JsonProperty("version") - public void setVersion(int version) { - this.version = version; - } + @JsonProperty("version") + public void setVersion(int version) { + this.version = version; + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java index 615e3275ea9..c12f03bac47 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java @@ -1,32 +1,38 @@ package io.confluent.kafka.schemaregistry.rest.resources; -import io.confluent.kafka.schemaregistry.rest.Versions; - -import javax.validation.Valid; -import javax.ws.rs.*; import java.util.HashMap; import java.util.Map; +import javax.validation.Valid; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import io.confluent.kafka.schemaregistry.rest.Versions; + @Path("/") -@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED, - Versions.JSON_WEIGHTED}) +@Produces( + {Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED, + Versions.JSON_WEIGHTED}) @Consumes({Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON, - Versions.JSON, Versions.GENERIC_REQUEST}) + Versions.JSON, Versions.GENERIC_REQUEST}) public class RootResource { - @GET - public Map get() { - // Currently this just provides an endpoint that's a nop and can be used to check for - // liveness and can be used for tests that need to test the server setup rather than the - // functionality of a specific resource. Some APIs provide a listing of endpoints as their - // root resource; it might be nice to provide that. - return new HashMap(); - } + @GET + public Map get() { + // Currently this just provides an endpoint that's a nop and can be used to check for + // liveness and can be used for tests that need to test the server setup rather than the + // functionality of a specific resource. Some APIs provide a listing of endpoints as their + // root resource; it might be nice to provide that. + return new HashMap(); + } - @POST - public Map post(@Valid Map request) { - // This version allows testing with posted entities - return new HashMap(); - } + @POST + public Map post(@Valid Map request) { + // This version allows testing with posted entities + return new HashMap(); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java index fce0c4add36..d582e466bda 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java @@ -1,19 +1,32 @@ package io.confluent.kafka.schemaregistry.rest.resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Response; + import io.confluent.kafka.schemaregistry.rest.Versions; import io.confluent.kafka.schemaregistry.rest.entities.Schema; import io.confluent.kafka.schemaregistry.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.rest.entities.requests.RegisterSchemaResponse; import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; +import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import javax.ws.rs.*; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.Suspended; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED, Versions.JSON_WEIGHTED}) @@ -22,8 +35,9 @@ Versions.JSON, Versions.GENERIC_REQUEST}) public class SchemasResource { public final static String MESSAGE_SCHEMA_NOT_FOUND = "Schema not found."; + private static final Logger log = LoggerFactory.getLogger(SchemasResource.class); - private final String topic; + private final String topic; private final boolean isKey; private final SchemaRegistry schemaRegistry; @@ -36,8 +50,15 @@ public SchemasResource(SchemaRegistry registry, String topic, boolean isKey) { @GET @Path("/{id}") public Schema getSchema(@PathParam("id") Integer id) { - Schema schema = schemaRegistry.get(this.topic, id); - if (schema == null) + Schema schema = null; + try { + schema = schemaRegistry.get(this.topic, id); + } catch (SchemaRegistryException e) { + log.debug("Error while retrieving schema with id " + id + " from the schema registry", + e); + throw new NotFoundException(MESSAGE_SCHEMA_NOT_FOUND, e); + } + if (schema == null) throw new NotFoundException(MESSAGE_SCHEMA_NOT_FOUND); return schema; } @@ -49,8 +70,7 @@ public List list() { try { allSchemasForThisTopic = schemaRegistry.getAllVersions(this.topic); } catch (StoreException e) { - // TODO: throw meaningful exception - e.printStackTrace(); + throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e); } while (allSchemasForThisTopic.hasNext()) { Schema schema = allSchemasForThisTopic.next(); @@ -63,8 +83,13 @@ public List list() { public void register(final @Suspended AsyncResponse asyncResponse, @PathParam("topic") String topicName, RegisterSchemaRequest request) { Schema schema = new Schema(topicName, 0, request.getSchema(), true, false, true); - int version = schemaRegistry.register(topicName, schema); - RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); + int version = 0; + try { + version = schemaRegistry.register(topicName, schema); + } catch (SchemaRegistryException e) { + throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e); + } + RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); registerSchemaResponse.setVersion(version); asyncResponse.resume(registerSchemaResponse); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java index 40a77b83cd5..2e92c3a4690 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java @@ -1,13 +1,21 @@ package io.confluent.kafka.schemaregistry.rest.resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; + import io.confluent.kafka.schemaregistry.rest.Versions; -import io.confluent.kafka.schemaregistry.rest.entities.Schema; import io.confluent.kafka.schemaregistry.rest.entities.Topic; import io.confluent.kafka.schemaregistry.storage.SchemaRegistry; -import javax.ws.rs.*; -import java.util.Set; - @Path("/topics") @Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED, @@ -17,7 +25,8 @@ Versions.JSON, Versions.GENERIC_REQUEST}) public class TopicsResource { public final static String MESSAGE_TOPIC_NOT_FOUND = "Topic not found."; - private final SchemaRegistry schemaRegistry; + private static final Logger log = LoggerFactory.getLogger(TopicsResource.class); + private final SchemaRegistry schemaRegistry; public TopicsResource(SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; @@ -26,11 +35,10 @@ public TopicsResource(SchemaRegistry schemaRegistry) { @GET @Path("/{topic}") public Topic getTopic(@PathParam("topic") String topicName) { - System.out.println("Received get topic request for " + topicName); if (!schemaRegistry.listTopics().contains(topicName)) throw new NotFoundException(MESSAGE_TOPIC_NOT_FOUND); - // TODO: Implement topic/schema metadata - return new Topic(topicName); + // TODO: https://github.com/confluentinc/schema-registry/issues/3 Implement metadata + return new Topic(topicName); } @Path("/{topic}/key/versions") @@ -45,7 +53,6 @@ public SchemasResource getValueSchemas(@PathParam("topic") String topicName) { @GET public Set list() { - System.out.println("Received list topics request"); return schemaRegistry.listTopics(); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java index bcd972c2176..c9c087e52a7 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java @@ -1,56 +1,63 @@ /** - * + * */ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; - import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; + /** * In-memory store based on maps - * @author nnarkhed * + * @author nnarkhed */ -public class InMemoryStore implements Store { - private final ConcurrentSkipListMap store; - - public InMemoryStore() { - store = new ConcurrentSkipListMap(); - } - - public void init() throws StoreInitializationException { - // do nothing - } - - @Override public V get(K key) { - return store.get(key); - } - - @Override public void put(K key, V value) throws StoreException { - store.put(key, value); - } - - @Override public Iterator getAll(K key1, K key2) { - ConcurrentNavigableMap subMap = (key1 == null && key2 == null) ? - store : store.subMap(key1, key2); - return subMap.values().iterator(); - } - - @Override public void putAll(Map entries) { - store.putAll(entries); - } - - @Override public void delete(K key) throws StoreException { - store.remove(key); - } - - @Override public void close() { - store.clear(); - } +public class InMemoryStore implements Store { + + private final ConcurrentSkipListMap store; + + public InMemoryStore() { + store = new ConcurrentSkipListMap(); + } + + public void init() throws StoreInitializationException { + // do nothing + } + + @Override + public V get(K key) { + return store.get(key); + } + + @Override + public void put(K key, V value) throws StoreException { + store.put(key, value); + } + + @Override + public Iterator getAll(K key1, K key2) { + ConcurrentNavigableMap subMap = (key1 == null && key2 == null) ? + store : store.subMap(key1, key2); + return subMap.values().iterator(); + } + + @Override + public void putAll(Map entries) { + store.putAll(entries); + } + + @Override + public void delete(K key) throws StoreException { + store.remove(key); + } + + @Override + public void close() { + store.clear(); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java new file mode 100644 index 00000000000..ce1c4d9f680 --- /dev/null +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -0,0 +1,140 @@ +package io.confluent.kafka.schemaregistry.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import io.confluent.kafka.schemaregistry.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; +import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; +import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer; + +public class KafkaSchemaRegistry implements SchemaRegistry { + + private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); + + private final Map schemaVersions; + private final Store kafkaStore; + private final Serializer serializer; + + public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer serializer) + throws SchemaRegistryException { + this.serializer = serializer; + schemaVersions = new HashMap(); + StringSerializer stringSerializer = new StringSerializer(); + kafkaStore = new KafkaStore(config, + stringSerializer, this.serializer, + new InMemoryStore()); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + } + try { + Iterator allSchemas = kafkaStore.getAll(null, null); + while (allSchemas.hasNext()) { + Schema schema = allSchemas.next(); + log.debug("Applying schema " + schema.toString() + " to the schema version " + + "cache"); + schemaVersions.put(schema.getName(), schema.getVersion()); + } + } catch (StoreException e) { + throw new SchemaRegistryException("Error while bootstrapping the schema registry " + + "from the backend Kafka store", e); + } + log.trace("Contents of version cache after bootstrap is complete" + + schemaVersions.toString()); + } + + @Override + public int register(String topic, Schema schema) throws SchemaRegistryException { + int latestVersion = 0; + if (schemaVersions.containsKey(topic)) { + latestVersion = schemaVersions.get(topic); + } + int version = latestVersion + 1; + String newKeyForLatestSchema = topic + "," + version; + String keyForLatestSchema = topic + "," + latestVersion; + Schema latestSchema = null; + try { + latestSchema = kafkaStore.get(keyForLatestSchema); + } catch (StoreException e) { + throw new SchemaRegistryException("Error while retrieving the latest schema from the" + + " backend Kafka store", e); + } + if (isCompatible(topic, schema, latestSchema)) { + try { + schema.setVersion(version); + log.trace("Adding schema to the Kafka store: " + schema.toString()); + kafkaStore.put(newKeyForLatestSchema, schema); + } catch (StoreException e) { + throw new SchemaRegistryException("Error while registering the schema in the" + + " backend Kafka store", e); + } + } + schemaVersions.put(topic, version); + return version; + } + + @Override + public Schema get(String topic, int version) throws SchemaRegistryException { + String key = topic + "," + version; + Schema schema = null; + try { + schema = kafkaStore.get(key); + } catch (StoreException e) { + throw new SchemaRegistryException( + "Error while retrieving schema from the backend Kafka" + + " store", e); + } + return schema; + } + + @Override + public Set listTopics() { + return schemaVersions.keySet(); + } + + @Override + public Iterator getAll(String topic) throws StoreException { + int earliestVersion = 1; + int latestVersion = 1; + if (schemaVersions.containsKey(topic)) { + latestVersion = schemaVersions.get(topic) + 1; + } + String keyEarliestVersion = topic + "," + earliestVersion; + String keyLatestVersion = topic + "," + latestVersion; + return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion); + } + + @Override + public Iterator getAllVersions(String topic) throws StoreException { + int earliestVersion = 1; + int latestVersion = 1; + if (schemaVersions.containsKey(topic)) { + latestVersion = schemaVersions.get(topic) + 1; + } else { + log.trace("Schema for " + topic + " does not exist in version cache. " + + "Defaulting to version 1 as latest version"); + } + String keyEarliestVersion = topic + "," + earliestVersion; + String keyLatestVersion = topic + "," + latestVersion; + log.trace("Getting schemas between versions: " + earliestVersion + "," + latestVersion); + return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion); + } + + @Override + public boolean isCompatible(String topic, Schema schema1, Schema schema2) { + return true; + } + + @Override + public void close() { + kafkaStore.close(); + } +} diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java index 48fb2525a0d..7253a8ed484 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java @@ -1,5 +1,26 @@ package io.confluent.kafka.schemaregistry.storage; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; @@ -13,22 +34,13 @@ import kafka.javaapi.consumer.ZookeeperConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import scala.collection.JavaConversions; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - public class KafkaStore implements Store { - private final String kafkaClusterZkUrl; + + private static final Logger log = LoggerFactory.getLogger(KafkaStore.class); + + private final String kafkaClusterZkUrl; private final int zkSessionTimeoutMs; private final String topic; private final String groupId; @@ -105,11 +117,21 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer, while (iterator.hasNext()) { MessageAndMetadata messageAndMetadata = iterator.next(); byte[] messageBytes = messageAndMetadata.message(); - V message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes); - K messageKey = keySerializer.fromBytes(messageAndMetadata.key()); - try { - System.out.println("Applying update (" + messageKey + "," + message + ") to the " + - "local store during bootstrap"); + V message = null; + try { + message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes); + } catch (SerializationException e) { + throw new StoreInitializationException("Failed to deserialize the schema", e); + } + K messageKey = null; + try { + messageKey = keySerializer.fromBytes(messageAndMetadata.key()); + } catch (SerializationException e) { + throw new StoreInitializationException("Failed to deserialize the schema key", e); + } + try { + log.trace("Applying update (" + messageKey + "," + message + ") to the " + + "local store during bootstrap"); if (message == null) { localStore.delete(messageKey); } else { @@ -126,7 +148,8 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer, // the consumer will checkpoint it's offset in zookeeper, so the background thread will // continue from where the bootstrap consumer left off consumer.shutdown(); - System.out.println("Bootstrap is complete. Now switching to live update"); + log.info("Kafka store bootstrap from the log " + this.topic + " is complete. Now " + + "switching to live update"); } // start the background thread that subscribes to the Kafka topic and applies updates kafkaTopicReader.start(); @@ -148,9 +171,15 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer, throw new StoreException("Key should not be null"); } // write to the Kafka topic - ProducerRecord producerRecord = new ProducerRecord(topic, 0, keySerializer.toBytes(key), + ProducerRecord producerRecord = null; + try { + producerRecord = new ProducerRecord(topic, 0, keySerializer.toBytes(key), value == null ? null : valueSerializer.toBytes(value)); - Future ack = producer.send(producerRecord); + } catch (SerializationException e) { + throw new StoreException("Error serializing schema while creating the Kafka produce " + + "record", e); + } + Future ack = producer.send(producerRecord); try { ack.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -186,8 +215,11 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer, @Override public void close() { kafkaTopicReader.shutdown(); - producer.close(); - localStore.close(); + log.debug("Kafka store reader thread shut down"); + producer.close(); + log.debug("Kafka store producer shut down"); + localStore.close(); + log.debug("Kafka store shut down complete"); } private void assertInitialized() throws StoreException { diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java index d61edd7fa62..fa598849ccd 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java @@ -11,44 +11,54 @@ public class KafkaStoreConfig extends AbstractConfig { - /** kafkastore.connection.url */ - public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url"; - protected static final String KAFKASTORE_CONNECTION_URL_DOC = "Zookeeper url for the Kafka cluster"; - - /** kafkastore.zk.session.timeout.ms */ - public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG - = "kafkastore.zk.session.timeout.ms"; - protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; - - /** kafkastore.topic */ - public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic"; - protected static final String KAFKASTORE_TOPIC_DOC = "The durable single partition topic that acts" + - "as the durable log for the data"; - - /** kafkastore.timeout.ms */ - public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms"; - protected static final String KAFKASTORE_TIMEOUT_DOC = "The timeout for an operation on the Kafka" + - " store"; - - private static final ConfigDef config = new ConfigDef() - .define(KAFKASTORE_CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, - KAFKASTORE_CONNECTION_URL_DOC) - .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, atLeast(0), Importance.LOW, - KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC) - .define(KAFKASTORE_TOPIC_CONFIG, Type.STRING, Importance.HIGH, KAFKASTORE_TOPIC_DOC) - .define(KAFKASTORE_TIMEOUT_CONFIG, Type.INT, 500, atLeast(0), Importance.MEDIUM, - KAFKASTORE_TIMEOUT_DOC); - - public KafkaStoreConfig(ConfigDef arg0, Map arg1) { - super(arg0, arg1); - } - - KafkaStoreConfig(Map props) { - super(config, props); - } - - public static void main(String[] args) { - System.out.println(config.toHtmlTable()); - } + /** + * kafkastore.connection.url + */ + public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url"; + /** + * kafkastore.zk.session.timeout.ms + */ + public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG + = "kafkastore.zk.session.timeout.ms"; + /** + * kafkastore.topic + */ + public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic"; + /** + * kafkastore.timeout.ms + */ + public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms"; + protected static final String KAFKASTORE_CONNECTION_URL_DOC = + "Zookeeper url for the Kafka cluster"; + protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC = + "Zookeeper session timeout"; + protected static final String KAFKASTORE_TOPIC_DOC = + "The durable single partition topic that acts" + + "as the durable log for the data"; + protected static final String KAFKASTORE_TIMEOUT_DOC = + "The timeout for an operation on the Kafka" + + " store"; + + private static final ConfigDef config = new ConfigDef() + .define(KAFKASTORE_CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, + KAFKASTORE_CONNECTION_URL_DOC) + .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, atLeast(0), + Importance.LOW, + KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC) + .define(KAFKASTORE_TOPIC_CONFIG, Type.STRING, Importance.HIGH, KAFKASTORE_TOPIC_DOC) + .define(KAFKASTORE_TIMEOUT_CONFIG, Type.INT, 500, atLeast(0), Importance.MEDIUM, + KAFKASTORE_TIMEOUT_DOC); + + public KafkaStoreConfig(ConfigDef arg0, Map arg1) { + super(arg0, arg1); + } + + KafkaStoreConfig(Map props) { + super(config, props); + } + + public static void main(String[] args) { + System.out.println(config.toHtmlTable()); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java index f6443dca17e..57afadfaccf 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java @@ -1,5 +1,15 @@ package io.confluent.kafka.schemaregistry.storage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; import kafka.consumer.ConsumerConfig; @@ -10,10 +20,11 @@ import kafka.message.MessageAndMetadata; import kafka.utils.ShutdownableThread; -import java.util.*; - public class KafkaStoreReaderThread extends ShutdownableThread { - private final String kafkaClusterZkUrl; + + private static final Logger log = LoggerFactory.getLogger(KafkaStoreReaderThread.class); + + private final String kafkaClusterZkUrl; private final String topic; private final String groupId; private final Serializer keySerializer; @@ -63,7 +74,8 @@ public void start() { } KafkaStream stream = streamsForTheLogTopic.get(0); consumerIterator = stream.iterator(); - System.out.println("Thread started with consumer properties " + consumerProps.toString()); + log.debug("Kafka store reader thread started with consumer properties " + + consumerProps.toString()); } @Override @@ -72,12 +84,23 @@ public void doWork() { while (consumerIterator != null && consumerIterator.hasNext()) { MessageAndMetadata messageAndMetadata = consumerIterator.next(); byte[] messageBytes = messageAndMetadata.message(); - V message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes); - K messageKey = keySerializer.fromBytes(messageAndMetadata.key()); - try { - System.out.println("Applying update (" + messageKey + "," + message + ") to the " + - "local store"); - if (message == null) { + V message = null; + try { + message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes); + } catch (SerializationException e) { + // TODO: fail just this operation or all subsequent operations? + log.error("Failed to deserialize the schema", e); + } + K messageKey = null; + try { + messageKey = keySerializer.fromBytes(messageAndMetadata.key()); + } catch (SerializationException e) { + log.error("Failed to deserialize the schema key", e); + } + try { + log.trace("Applying update (" + messageKey + "," + message + ") to the local " + + "store"); + if (message == null) { localStore.delete(messageKey); } else { localStore.put(messageKey, message); @@ -93,8 +116,7 @@ public void doWork() { * 1. Restart the store hoping that it works subsequently * 2. Look into the issue manually */ - System.err.println("Failed to add record from the Kafka topic" + - topic + " the local store"); + log.error("Failed to add record from the Kafka topic" + topic + " the local store"); } } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java index 90e7fe0e129..4b1ffdcf705 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java @@ -15,49 +15,62 @@ public class RocksDbConfig extends AbstractConfig { - /** rocksdb.data.dir */ + /** + * rocksdb.data.dir + */ public static final String ROCKSDB_DATADIR_CONFIG = "rocksdb.data.dir"; - private static final String ROCKSDB_DATADIR_DOC = "Location of the rocksdb data"; - - /** rocksdb.compression */ + /** + * rocksdb.compression + */ public static final String ROCKSDB_COMPRESSION_CONFIG = "rocksdb.compression"; - private static final String ROCKSDB_COMPRESSION_DOC = "The compression setting and choice for data stored in RocksDB. Could be one of" + - "snappy, bzip2, zlib, lz4, lz4hc, none"; - private static final List compressionOptions = Arrays.asList("snappy", "bzip2", "zlib", "lz4", "lz4hc", "none"); - - /** rocksdb.block.size.bytes */ + /** + * rocksdb.block.size.bytes + */ public static final String ROCKSDB_BLOCK_SIZE_BYTES_CONFIG = "rocksdb.block.size.bytes"; - private static final String ROCKSDB_BLOCK_SIZE_DOC = "Block size in bytes for data in RocksDB"; - - /** rocksdb.write.buffer.size.bytes */ - public static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG = "rocksdb.write.buffer.size.bytes"; - private static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC = "Write buffer size in bytes for data in RocksDB"; - - /** rocksdb.bloomfilter.bits */ + /** + * rocksdb.write.buffer.size.bytes + */ + public static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG = + "rocksdb.write.buffer.size.bytes"; + /** + * rocksdb.bloomfilter.bits + */ public static final String ROCKSDB_BLOOMFILTER_BITS_CONFIG = "rocksdb.bloomfilter.bits"; - public static final String ROCKSDB_COMPACTION_STYLE_CONFIG = "rocksdb.compaction.style"; - private static final String ROCKSDB_COMPACTION_STYLE_DOC = "The compaction style for RocksDB. Could be one of universal, fifo, level"; - private static final List compactionOptions = Arrays.asList("universal", "level", "fifo"); - public static final String ROCKSDB_WRITE_BUFFERS_CONFIG = "rocksdb.write.buffers"; - - private static final ConfigDef config = new ConfigDef().define(ROCKSDB_COMPRESSION_CONFIG, Type.STRING, CompressionType.NO_COMPRESSION.toString(), - Importance.MEDIUM, ROCKSDB_COMPRESSION_DOC) + private static final String ROCKSDB_DATADIR_DOC = "Location of the rocksdb data"; + private static final String ROCKSDB_COMPRESSION_DOC = + "The compression setting and choice for data stored in RocksDB. Could be one of" + + "snappy, bzip2, zlib, lz4, lz4hc, none"; + private static final List compressionOptions = + Arrays.asList("snappy", "bzip2", "zlib", "lz4", "lz4hc", "none"); + private static final String ROCKSDB_BLOCK_SIZE_DOC = "Block size in bytes for data in RocksDB"; + private static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC = + "Write buffer size in bytes for data in RocksDB"; + private static final String ROCKSDB_COMPACTION_STYLE_DOC = + "The compaction style for RocksDB. Could be one of universal, fifo, level"; + private static final List compactionOptions = + Arrays.asList("universal", "level", "fifo"); + private static final ConfigDef config = new ConfigDef() + .define(ROCKSDB_COMPRESSION_CONFIG, Type.STRING, CompressionType.NO_COMPRESSION.toString(), + Importance.MEDIUM, ROCKSDB_COMPRESSION_DOC) .define(ROCKSDB_DATADIR_CONFIG, Type.STRING, Importance.HIGH, ROCKSDB_DATADIR_DOC) - .define(ROCKSDB_COMPACTION_STYLE_CONFIG, Type.STRING, CompactionStyle.UNIVERSAL.toString(), Importance.MEDIUM, - ROCKSDB_COMPACTION_STYLE_DOC) - .define(ROCKSDB_BLOCK_SIZE_BYTES_CONFIG, Type.INT, 4096, Importance.MEDIUM, ROCKSDB_BLOCK_SIZE_DOC) + .define(ROCKSDB_COMPACTION_STYLE_CONFIG, Type.STRING, CompactionStyle.UNIVERSAL.toString(), + Importance.MEDIUM, + ROCKSDB_COMPACTION_STYLE_DOC) + .define(ROCKSDB_BLOCK_SIZE_BYTES_CONFIG, Type.INT, 4096, Importance.MEDIUM, + ROCKSDB_BLOCK_SIZE_DOC) .define(ROCKSDB_BLOOMFILTER_BITS_CONFIG, Type.INT, 10, atLeast(0), Importance.LOW, null) .define(ROCKSDB_WRITE_BUFFERS_CONFIG, Type.INT, 1, Importance.LOW, null) - .define(ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG, Type.LONG, 4096, Importance.MEDIUM, ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC); + .define(ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG, Type.LONG, 4096, Importance.MEDIUM, + ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC); public RocksDbConfig(ConfigDef arg0, Map arg1) { super(arg0, arg1); } RocksDbConfig(Map props) { - super(config, props); + super(config, props); } public static void main(String[] args) { diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java index 83445c2bd15..69b048cba8f 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java @@ -3,27 +3,32 @@ */ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; -import org.rocksdb.*; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; import java.io.File; import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; + /** * @author nnarkhed * */ public class RocksDbStore implements Store { - private RocksDB db; private final Options options; private final File rocksDbDir; private final AtomicBoolean initialized = new AtomicBoolean(false); + private RocksDB db; - public RocksDbStore(RocksDbConfig config) { + public RocksDbStore(RocksDbConfig config) { rocksDbDir = new File(config.getString(RocksDbConfig.ROCKSDB_DATADIR_CONFIG)); options = new Options(); String compression = config.getString(RocksDbConfig.ROCKSDB_COMPRESSION_CONFIG); @@ -61,8 +66,7 @@ public byte[] get(byte[] key) throws StoreException { try { value = db.get(key); } catch (RocksDBException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + throw new StoreException(e); } return value; } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java index 4e666a7703f..ed58f77ae1e 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java @@ -1,123 +1,25 @@ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.rest.entities.Schema; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; -import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; -import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer; - -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; -public class SchemaRegistry { - private final Map schemaVersions; - private final Store kafkaStore; - private final Serializer serializer; +import io.confluent.kafka.schemaregistry.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; - public SchemaRegistry(SchemaRegistryConfig config, Serializer serializer) { - this.serializer = serializer; - schemaVersions = new HashMap(); - StringSerializer stringSerializer = new StringSerializer(); - kafkaStore = new KafkaStore(config, - stringSerializer, this.serializer, new InMemoryStore()); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - } - try { - Iterator allSchemas = kafkaStore.getAll(null, null); - while(allSchemas.hasNext()) { - Schema schema = allSchemas.next(); - System.out.println("Applying schema " + schema.toString() + " to the schema version " + - "cache"); - schemaVersions.put(schema.getName(), schema.getVersion()); - } - } catch (StoreException e) { - // TODO: throw meaningful exception - e.printStackTrace(); - } - System.out.println("Current contents of versionc cache" + schemaVersions.toString()); - } +public interface SchemaRegistry { - public int register(String topic, Schema schema) { - int latestVersion = 0; - if (schemaVersions.containsKey(topic)) { - latestVersion = schemaVersions.get(topic); - } - int version = latestVersion + 1; - String newKeyForLatestSchema = topic + "," + version; - String keyForLatestSchema = topic + "," + latestVersion; - Schema latestSchema = null; - try { - latestSchema = kafkaStore.get(keyForLatestSchema); - } catch (StoreException e) { - // TODO: Throw a meaningful exception - e.printStackTrace(); - return -1; - } - if (isCompatible(topic, schema, latestSchema)) { - try { - schema.setVersion(version); - System.out.println("Adding schema to the Kafka store: " + schema.toString()); - kafkaStore.put(newKeyForLatestSchema, schema); - } catch (StoreException e) { - e.printStackTrace(); - return -1; - } - } - schemaVersions.put(topic, version); - return version; - } + int register(String topic, Schema schema) throws SchemaRegistryException; - public Schema get(String topic, int version) { - String key = topic + "," + version; - Schema schema = null; - try { - schema = kafkaStore.get(key); - } catch (StoreException e) { - // TODO: Throw a meaningful exception - e.printStackTrace(); - } - return schema; - } + Schema get(String topic, int version) throws SchemaRegistryException; - public Set listTopics() { - return schemaVersions.keySet(); - } + Set listTopics(); - public Iterator getAll(String topic) throws StoreException { - int earliestVersion = 1; - int latestVersion = 1; - if (schemaVersions.containsKey(topic)) { - latestVersion = schemaVersions.get(topic) + 1; - } - String keyEarliestVersion = topic + "," + earliestVersion; - String keyLatestVersion = topic + "," + latestVersion; - return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion); - } + Iterator getAll(String topic) throws StoreException; - public Iterator getAllVersions(String topic) throws StoreException { - int earliestVersion = 1; - int latestVersion = 1; - if (schemaVersions.containsKey(topic)) { - latestVersion = schemaVersions.get(topic) + 1; - } else { - System.err.println("Schema for " + topic + " does not exist in version cache. " + - "Defaulting to version 1 as latest version"); - } - String keyEarliestVersion = topic + "," + earliestVersion; - String keyLatestVersion = topic + "," + latestVersion; - System.out.println("Getting schemas between versions: " + earliestVersion + "," + latestVersion); - return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion); - } + Iterator getAllVersions(String topic) throws StoreException; - public boolean isCompatible(String topic, Schema schema1, Schema schema2) { - return true; - } + boolean isCompatible(String topic, Schema schema1, Schema schema2); - public void close() { - kafkaStore.close(); - } + void close(); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java index e7b4479a15c..16b16e8ad17 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java @@ -7,26 +7,27 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; public class SchemaRegistryConfig extends KafkaStoreConfig { - private static final ConfigDef config = new ConfigDef() - .define(KAFKASTORE_CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, - KAFKASTORE_CONNECTION_URL_DOC) - .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 10000, atLeast(0), - ConfigDef.Importance.LOW, KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC) - .define(KAFKASTORE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, - KAFKASTORE_TOPIC_DOC) - .define(KAFKASTORE_TIMEOUT_CONFIG, ConfigDef.Type.INT, 500, atLeast(0), - ConfigDef.Importance.MEDIUM, KAFKASTORE_TIMEOUT_DOC); - - public SchemaRegistryConfig(ConfigDef arg0, Map arg1) { - super(arg0, arg1); - } - - public SchemaRegistryConfig(Map props) { - super(config, props); - } - - public static void main(String[] args) { - System.out.println(config.toHtmlTable()); - } + + private static final ConfigDef config = new ConfigDef() + .define(KAFKASTORE_CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + KAFKASTORE_CONNECTION_URL_DOC) + .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 10000, atLeast(0), + ConfigDef.Importance.LOW, KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC) + .define(KAFKASTORE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + KAFKASTORE_TOPIC_DOC) + .define(KAFKASTORE_TIMEOUT_CONFIG, ConfigDef.Type.INT, 500, atLeast(0), + ConfigDef.Importance.MEDIUM, KAFKASTORE_TIMEOUT_DOC); + + public SchemaRegistryConfig(ConfigDef arg0, Map arg1) { + super(arg0, arg1); + } + + public SchemaRegistryConfig(Map props) { + super(config, props); + } + + public static void main(String[] args) { + System.out.println(config.toHtmlTable()); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java index 4b0a502f7b5..381770662f1 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java @@ -1,32 +1,38 @@ /** - * + * */ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; - import java.util.Iterator; import java.util.Map; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; + /** * @author nnarkhed - * */ -public interface Store { - public void init() throws StoreInitializationException; - public V get(K key) throws StoreException; - public void put(K key, V value) throws StoreException; - /** - * Iterator over keys in the specified range - * @param key1 If key1 is null, start from the first key in sorted order - * @param key2 If key2 is null, end at the last key - * @return Iterator over keys in range (key1, key2]. If both keys are null, return an iterator - * over all keys in the database - * @throws StoreException - */ - public Iterator getAll(K key1, K key2) throws StoreException; - public void putAll(Map entries) throws StoreException; - public void delete(K key) throws StoreException; - public void close(); +public interface Store { + + public void init() throws StoreInitializationException; + + public V get(K key) throws StoreException; + + public void put(K key, V value) throws StoreException; + + /** + * Iterator over keys in the specified range + * + * @param key1 If key1 is null, start from the first key in sorted order + * @param key2 If key2 is null, end at the last key + * @return Iterator over keys in range (key1, key2]. If both keys are null, return an iterator + * over all keys in the database + */ + public Iterator getAll(K key1, K key2) throws StoreException; + + public void putAll(Map entries) throws StoreException; + + public void delete(K key) throws StoreException; + + public void close(); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java new file mode 100644 index 00000000000..a9c51b23c54 --- /dev/null +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java @@ -0,0 +1,20 @@ +package io.confluent.kafka.schemaregistry.storage.exceptions; + +public class SchemaRegistryException extends Exception { + + public SchemaRegistryException(String message, Throwable cause) { + super(message, cause); + } + + public SchemaRegistryException(String message) { + super(message); + } + + public SchemaRegistryException(Throwable cause) { + super(cause); + } + + public SchemaRegistryException() { + super(); + } +} diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java new file mode 100644 index 00000000000..de664f72943 --- /dev/null +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java @@ -0,0 +1,20 @@ +package io.confluent.kafka.schemaregistry.storage.exceptions; + +public class SerializationException extends Exception { + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(String message) { + super(message); + } + + public SerializationException(Throwable cause) { + super(cause); + } + + public SerializationException() { + super(); + } +} diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java index 924466d325a..e97d5c0312e 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java @@ -2,8 +2,6 @@ /** * Error while initializing a io.confluent.kafka.schemaregistry.storage.Store - * @author nnarkhed - * */ public class StoreInitializationException extends Exception { diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java index 9faac455c3a..7eaeac07dc5 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java @@ -1,43 +1,47 @@ package io.confluent.kafka.schemaregistry.storage.serialization; import com.fasterxml.jackson.databind.ObjectMapper; -import io.confluent.kafka.schemaregistry.rest.entities.Schema; import java.io.IOException; import java.util.Map; +import io.confluent.kafka.schemaregistry.rest.entities.Schema; +import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; + public class SchemaSerializer implements Serializer { - public SchemaSerializer() { + public SchemaSerializer() { - } + } - @Override public byte[] toBytes(Schema data) { - try { - return new ObjectMapper().writeValueAsBytes(data); - } catch (IOException e) { - // TODO: throw a SerializationException - e.printStackTrace(); - return null; - } + @Override + public byte[] toBytes(Schema data) throws SerializationException { + try { + return new ObjectMapper().writeValueAsBytes(data); + } catch (IOException e) { + throw new SerializationException("Error while serializing schema " + data.toString(), + e); } - - @Override public Schema fromBytes(byte[] data) { - Schema schema = null; - try { - schema = new ObjectMapper().readValue(data, Schema.class); - } catch (IOException e) { - // TODO: throw a SerializationException - e.printStackTrace(); - } - return schema; + } + + @Override + public Schema fromBytes(byte[] data) throws SerializationException { + Schema schema = null; + try { + schema = new ObjectMapper().readValue(data, Schema.class); + } catch (IOException e) { + throw new SerializationException("Error while deserializing schema", e); } + return schema; + } - @Override public void close() { + @Override + public void close() { - } + } - @Override public void configure(Map stringMap) { + @Override + public void configure(Map stringMap) { - } + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java index d025fc1f3a4..2b47e478c4d 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java @@ -15,27 +15,28 @@ import org.apache.kafka.common.Configurable; +import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; + /** - * - * @param Type to be serialized from. - * - * A class that implements this interface is expected to have a constructor with no parameter. + * @param Type to be serialized from.

A class that implements this interface is expected to + * have a constructor with no parameter. */ public interface Serializer extends Configurable { - /** - * @param data Typed data - * @return bytes of the serialized data - */ - public byte[] toBytes(T data); - /** - * @param data Bytes - * @return Typed deserialized data - */ - public T fromBytes(byte[] data); + /** + * @param data Typed data + * @return bytes of the serialized data + */ + public byte[] toBytes(T data) throws SerializationException; + + /** + * @param data Bytes + * @return Typed deserialized data + */ + public T fromBytes(byte[] data) throws SerializationException; - /** - * Close this serializer - */ - public void close(); + /** + * Close this serializer + */ + public void close(); } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java index a790b4c6dda..1b7cb09acd2 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java @@ -4,19 +4,23 @@ public class StringSerializer implements Serializer { - @Override public byte[] toBytes(String data) { - return data.getBytes(); - } + @Override + public byte[] toBytes(String data) { + return data.getBytes(); + } - @Override public String fromBytes(byte[] data) { - return new String(data); - } + @Override + public String fromBytes(byte[] data) { + return new String(data); + } - @Override public void close() { - // do nothing - } + @Override + public void close() { + // do nothing + } - @Override public void configure(Map stringMap) { - // do nothing - } + @Override + public void configure(Map stringMap) { + // do nothing + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java index f4afc510adb..9b81b24d1da 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java @@ -5,13 +5,16 @@ public class ZkStringSerializer implements ZkSerializer { - @Override public byte[] serialize(Object o) throws ZkMarshallingError { - return ((String) o).getBytes(); - } + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + return ((String) o).getBytes(); + } - @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { - if (bytes == null) - return null; - return new String(bytes); + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + if (bytes == null) { + return null; } + return new String(bytes); + } } diff --git a/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java b/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java index 898c4bbd6d7..4b46f8be85d 100644 --- a/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java +++ b/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java @@ -1,6 +1,6 @@ package io.confluent.kafka.schemaregistry.utils; -public class Pair { +public class Pair { private K item1; private V item2; diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 988f0f0ede7..08b69eca8cc 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -4,7 +4,9 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka.server=INFO, stdout -log4j.logger.kafka.consumer.ZookeeperConsumerConnector=INFO, stdout +log4j.logger.kafka=OFF, stdout +log4j.logger.org.apache.zookeeper=WARN, stdout +log4j.logger.org.I0Itec.zkclient=WARN, stdout +#log4j.logger.kafka.consumer.ZookeeperConsumerConnector=INFO, stdout log4j.additivity.kafka.server=false log4j.additivity.kafka.consumer.ZookeeperConsumerConnector=false diff --git a/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java b/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java index 7badf57cc55..32dccc717e8 100644 --- a/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java +++ b/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java @@ -15,6 +15,16 @@ */ package io.confluent.kafka.schemaregistry; +import org.I0Itec.zkclient.ZkClient; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Properties; +import java.util.Queue; +import java.util.Vector; + import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.SystemTime$; @@ -22,106 +32,104 @@ import kafka.utils.Utils; import kafka.utils.ZKStringSerializer$; import kafka.zk.EmbeddedZookeeper; -import org.I0Itec.zkclient.ZkClient; -import org.eclipse.jetty.server.Server; -import org.junit.After; -import org.junit.Before; import scala.collection.JavaConversions; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.*; - /** - * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially Kafka's - * ZookeeperTestHarness and KafkaServerTestHarness traits combined and ported to Java with the addition - * of the REST proxy. Defaults to a 1-ZK, 3-broker, 1 REST proxy cluster. + * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially + * Kafka's ZookeeperTestHarness and KafkaServerTestHarness traits combined and ported to Java with + * the addition of the REST proxy. Defaults to a 1-ZK, 3-broker, 1 REST proxy cluster. */ public abstract class ClusterTestHarness { - public static final int DEFAULT_NUM_BROKERS = 3; - // Shared config - protected Queue ports; + public static final int DEFAULT_NUM_BROKERS = 3; - // ZK Config - protected int zkPort; - protected String zkConnect; - protected EmbeddedZookeeper zookeeper; - protected ZkClient zkClient; - protected int zkConnectionTimeout = 6000; - protected int zkSessionTimeout = 6000; + // Shared config + protected Queue ports; - // Kafka Config - protected List configs = null; - protected List servers = null; - protected String brokerList = null; + // ZK Config + protected int zkPort; + protected String zkConnect; + protected EmbeddedZookeeper zookeeper; + protected ZkClient zkClient; + protected int zkConnectionTimeout = 6000; + protected int zkSessionTimeout = 6000; - protected String bootstrapServers = null; + // Kafka Config + protected List configs = null; + protected List servers = null; + protected String brokerList = null; - public ClusterTestHarness() { - this(DEFAULT_NUM_BROKERS); - } + protected String bootstrapServers = null; - public ClusterTestHarness(int numBrokers) { - // 1 port per broker + ZK + REST server - this(numBrokers, numBrokers + 2); - } + public ClusterTestHarness() { + this(DEFAULT_NUM_BROKERS); + } - public ClusterTestHarness(int numBrokers, int numPorts) { - ports = new ArrayDeque(); - for(Object portObj : JavaConversions.asJavaList(TestUtils.choosePorts(numPorts))) - ports.add((Integer)portObj); - zkPort = ports.remove(); - zkConnect = String.format("localhost:%d", zkPort); - - configs = new Vector(); - bootstrapServers = ""; - for(int i = 0; i < numBrokers; i++) { - int port = ports.remove(); - Properties props = TestUtils.createBrokerConfig(i, port, false); - // Turn auto creation *off*, unlike the default. This lets us test errors that should be generated when - // brokers are configured that way. - props.setProperty("auto.create.topics.enable", "true"); - props.setProperty("num.partitions", "1"); - // We *must* override this to use the port we allocated (Kafka currently allocates one port that it always - // uses for ZK - props.setProperty("zookeeper.connect", this.zkConnect); - configs.add(new KafkaConfig(props)); - - if (bootstrapServers.length() > 0) - bootstrapServers += ","; - bootstrapServers = bootstrapServers + "localhost:" + ((Integer)port).toString(); - } - } + public ClusterTestHarness(int numBrokers) { + // 1 port per broker + ZK + REST server + this(numBrokers, numBrokers + 2); + } - @Before - public void setUp() throws Exception { - zookeeper = new EmbeddedZookeeper(zkConnect); - zkClient = new ZkClient(zookeeper.connectString(), zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer$.MODULE$); - - if(configs == null || configs.size() <= 0) - throw new RuntimeException("Must supply at least one server config."); - brokerList = TestUtils.getBrokerListStrFromConfigs(JavaConversions.asScalaIterable(configs).toSeq()); - servers = new Vector(configs.size()); - for(KafkaConfig config : configs) { - KafkaServer server = TestUtils.createServer(config, SystemTime$.MODULE$); - servers.add(server); - } + public ClusterTestHarness(int numBrokers, int numPorts) { + ports = new ArrayDeque(); + for (Object portObj : JavaConversions.asJavaList(TestUtils.choosePorts(numPorts))) { + ports.add((Integer) portObj); } + zkPort = ports.remove(); + zkConnect = String.format("localhost:%d", zkPort); + + configs = new Vector(); + bootstrapServers = ""; + for (int i = 0; i < numBrokers; i++) { + int port = ports.remove(); + Properties props = TestUtils.createBrokerConfig(i, port, false); + // Turn auto creation *off*, unlike the default. This lets us test errors that should be generated when + // brokers are configured that way. + props.setProperty("auto.create.topics.enable", "true"); + props.setProperty("num.partitions", "1"); + // We *must* override this to use the port we allocated (Kafka currently allocates one port that it always + // uses for ZK + props.setProperty("zookeeper.connect", this.zkConnect); + configs.add(new KafkaConfig(props)); + + if (bootstrapServers.length() > 0) { + bootstrapServers += ","; + } + bootstrapServers = bootstrapServers + "localhost:" + ((Integer) port).toString(); + } + } - @After - public void tearDown() throws Exception { - for(KafkaServer server: servers) - server.shutdown(); - for(KafkaServer server: servers) - for (String logDir : JavaConversions.asJavaCollection(server.config().logDirs())) - Utils.rm(logDir); + @Before + public void setUp() throws Exception { + zookeeper = new EmbeddedZookeeper(zkConnect); + zkClient = + new ZkClient(zookeeper.connectString(), zkSessionTimeout, zkConnectionTimeout, + ZKStringSerializer$.MODULE$); - zkClient.close(); - zookeeper.shutdown(); + if (configs == null || configs.size() <= 0) { + throw new RuntimeException("Must supply at least one server config."); + } + brokerList = + TestUtils.getBrokerListStrFromConfigs(JavaConversions.asScalaIterable(configs).toSeq()); + servers = new Vector(configs.size()); + for (KafkaConfig config : configs) { + KafkaServer server = TestUtils.createServer(config, SystemTime$.MODULE$); + servers.add(server); } + } + + @After + public void tearDown() throws Exception { + for (KafkaServer server : servers) { + server.shutdown(); + } + for (KafkaServer server : servers) { + for (String logDir : JavaConversions.asJavaCollection(server.config().logDirs())) { + Utils.rm(logDir); + } + } + + zkClient.close(); + zookeeper.shutdown(); + } } diff --git a/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java b/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java index 88e5fecf70b..10b41eee4f0 100644 --- a/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java +++ b/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java @@ -1,267 +1,283 @@ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.ClusterTestHarness; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; -import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Properties; +import io.confluent.kafka.schemaregistry.ClusterTestHarness; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; +import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; +import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; public class KafkaStoreTest extends ClusterTestHarness { - private String topic = "log"; - @Before - public void setup() { - System.out.println("Zk conn url = " + zkConnect); - } - @After - public void teardown() { - System.out.println("Shutting down"); - } + private static final Logger log = LoggerFactory.getLogger(KafkaStoreTest.class); + + private String topic = "log"; - @Test - public void testInitialization() { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, new InMemoryStore()); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } + @Before + public void setup() { + log.debug("Zk conn url = " + zkConnect); + } + + @After + public void teardown() { + log.debug("Shutting down"); + } + + @Test + public void testInitialization() { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + new InMemoryStore()); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); } + } - @Test - public void testIncorrectInitialization() { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, new InMemoryStore()); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - try { - kafkaStore.init(); - fail("Kafka store repeated initialization should fail"); - } catch (StoreInitializationException e) { - // this is expected - } + @Test + public void testIncorrectInitialization() { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + new InMemoryStore()); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); } + try { + kafkaStore.init(); + fail("Kafka store repeated initialization should fail"); + } catch (StoreInitializationException e) { + // this is expected + } + } - @Test - public void testSimplePut() throws InterruptedException { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - Store inMemoryStore = new InMemoryStore(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - String key = "Kafka"; - String value = "Rocks"; - try { - kafkaStore.put(key, value); - } catch (StoreException e) { - fail("Kafka store put(Kafka, Rocks) operation failed"); - } - Thread.sleep(500); - String retrievedValue = null; - try { - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertEquals("Retrieved value should match entered value", value, retrievedValue); - kafkaStore.close(); + @Test + public void testSimplePut() throws InterruptedException { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + Store inMemoryStore = new InMemoryStore(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); + } + String key = "Kafka"; + String value = "Rocks"; + try { + kafkaStore.put(key, value); + } catch (StoreException e) { + fail("Kafka store put(Kafka, Rocks) operation failed"); } + Thread.sleep(500); + String retrievedValue = null; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertEquals("Retrieved value should match entered value", value, retrievedValue); + kafkaStore.close(); + } - @Test - public void testSimpleGetAfterFailure() throws InterruptedException { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - Store inMemoryStore = new InMemoryStore(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - String key = "Kafka"; - String value = "Rocks"; - try { - kafkaStore.put(key, value); - } catch (StoreException e) { - fail("Kafka store put(Kafka, Rocks) operation failed"); - } - Thread.sleep(1000); - String retrievedValue = null; - try { - System.out.println("Reading written value from the kafka store"); - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertEquals("Retrieved value should match entered value", value, retrievedValue); - kafkaStore.close(); - inMemoryStore.close(); - // recreate kafka store - kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer, - inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - retrievedValue = null; - try { - System.out.println("Reading written value from the kafka store"); - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertEquals("Retrieved value should match entered value", value, retrievedValue); - kafkaStore.close(); - inMemoryStore.close(); + @Test + public void testSimpleGetAfterFailure() throws InterruptedException { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + Store inMemoryStore = new InMemoryStore(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); + } + String key = "Kafka"; + String value = "Rocks"; + try { + kafkaStore.put(key, value); + } catch (StoreException e) { + fail("Kafka store put(Kafka, Rocks) operation failed"); + } + Thread.sleep(1000); + String retrievedValue = null; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertEquals("Retrieved value should match entered value", value, retrievedValue); + kafkaStore.close(); + inMemoryStore.close(); + // recreate kafka store + kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); } + retrievedValue = null; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertEquals("Retrieved value should match entered value", value, retrievedValue); + kafkaStore.close(); + inMemoryStore.close(); + } - @Test - public void testSimpleDelete() throws InterruptedException { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - Store inMemoryStore = new InMemoryStore(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - String key = "Kafka"; - String value = "Rocks"; - try { - kafkaStore.put(key, value); - } catch (StoreException e) { - fail("Kafka store put(Kafka, Rocks) operation failed"); - } - Thread.sleep(500); - String retrievedValue = null; - try { - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertEquals("Retrieved value should match entered value", value, retrievedValue); - try { - kafkaStore.delete(key); - } catch (StoreException e) { - fail("Kafka store delete(Kafka) operation failed"); - } - Thread.sleep(500); - // verify that value is deleted - retrievedValue = value; - try { - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertNull("Value should have been deleted", retrievedValue); - kafkaStore.close(); + @Test + public void testSimpleDelete() throws InterruptedException { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + Store inMemoryStore = new InMemoryStore(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); + } + String key = "Kafka"; + String value = "Rocks"; + try { + kafkaStore.put(key, value); + } catch (StoreException e) { + fail("Kafka store put(Kafka, Rocks) operation failed"); + } + Thread.sleep(500); + String retrievedValue = null; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertEquals("Retrieved value should match entered value", value, retrievedValue); + try { + kafkaStore.delete(key); + } catch (StoreException e) { + fail("Kafka store delete(Kafka) operation failed"); } + Thread.sleep(500); + // verify that value is deleted + retrievedValue = value; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertNull("Value should have been deleted", retrievedValue); + kafkaStore.close(); + } - @Test - public void testDeleteAfterRestart() throws InterruptedException { - Properties props = new Properties(); - props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); - props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); - KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); - StringSerializer stringSerializer = new StringSerializer(); - Store inMemoryStore = new InMemoryStore(); - KafkaStore kafkaStore = new KafkaStore(storeConfig, - stringSerializer, stringSerializer, inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - String key = "Kafka"; - String value = "Rocks"; - try { - kafkaStore.put(key, value); - } catch (StoreException e) { - fail("Kafka store put(Kafka, Rocks) operation failed"); - } - Thread.sleep(1000); - String retrievedValue = null; - try { - System.out.println("Reading written value from the kafka store"); - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertEquals("Retrieved value should match entered value", value, retrievedValue); - // delete the key - try { - kafkaStore.delete(key); - } catch (StoreException e) { - fail("Kafka store delete(Kafka) operation failed"); - } - Thread.sleep(500); - // verify that key is deleted - retrievedValue = value; - try { - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertNull("Value should have been deleted", retrievedValue); - kafkaStore.close(); - inMemoryStore.close(); - // recreate kafka store - kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer, - inMemoryStore); - try { - kafkaStore.init(); - } catch (StoreInitializationException e) { - fail("Kafka store failed to initialize"); - } - // verify that key still doesn't exist in the store - retrievedValue = value; - try { - retrievedValue = kafkaStore.get(key); - } catch (StoreException e) { - fail("Kafka store get(Kafka) operation failed"); - } - assertNull("Value should have been deleted", retrievedValue); - kafkaStore.close(); - inMemoryStore.close(); + @Test + public void testDeleteAfterRestart() throws InterruptedException { + Properties props = new Properties(); + props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect); + props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic); + KafkaStoreConfig storeConfig = new KafkaStoreConfig(props); + StringSerializer stringSerializer = new StringSerializer(); + Store inMemoryStore = new InMemoryStore(); + KafkaStore kafkaStore = new KafkaStore(storeConfig, + stringSerializer, + stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); + } + String key = "Kafka"; + String value = "Rocks"; + try { + kafkaStore.put(key, value); + } catch (StoreException e) { + fail("Kafka store put(Kafka, Rocks) operation failed"); + } + Thread.sleep(1000); + String retrievedValue = null; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertEquals("Retrieved value should match entered value", value, retrievedValue); + // delete the key + try { + kafkaStore.delete(key); + } catch (StoreException e) { + fail("Kafka store delete(Kafka) operation failed"); + } + Thread.sleep(500); + // verify that key is deleted + retrievedValue = value; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); + } + assertNull("Value should have been deleted", retrievedValue); + kafkaStore.close(); + inMemoryStore.close(); + // recreate kafka store + kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer, + inMemoryStore); + try { + kafkaStore.init(); + } catch (StoreInitializationException e) { + fail("Kafka store failed to initialize"); + } + // verify that key still doesn't exist in the store + retrievedValue = value; + try { + retrievedValue = kafkaStore.get(key); + } catch (StoreException e) { + fail("Kafka store get(Kafka) operation failed"); } + assertNull("Value should have been deleted", retrievedValue); + kafkaStore.close(); + inMemoryStore.close(); + } } diff --git a/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java b/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java index 38c8912024a..9cdbc65fa78 100644 --- a/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java +++ b/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java @@ -1,15 +1,5 @@ package io.confluent.kafka.schemaregistry.storage; -import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; -import io.confluent.kafka.schemaregistry.utils.TestUtils; -import org.apache.kafka.common.config.ConfigException; -import org.junit.Test; - -import java.util.Properties; - -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - public class RocksDbTest { // @Test @@ -60,7 +50,6 @@ public class RocksDbTest { // try { // db = new RocksDbStore(config); // } catch (StoreInitializationException e) { -// // TODO Auto-generated catch block // e.printStackTrace(); // fail("RocksDB store initialization failed"); // } finally { @@ -73,7 +62,6 @@ public class RocksDbTest { // db.put("Kafka".getBytes(), "rocks".getBytes()); // value = db.get("Kafka".getBytes()); // } catch (StoreException e) { -// // TODO Auto-generated catch block // e.printStackTrace(); // fail("RocksDB store put failed"); // } diff --git a/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java b/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java index 9aa094c3a93..90890ec2953 100644 --- a/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java +++ b/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java @@ -4,9 +4,10 @@ import java.util.Random; public class TestUtils { + static String IoTmpDir = System.getProperty("java.io.tmpdir"); static Random random = new Random(); - + /** * Create a temporary directory */ @@ -14,7 +15,7 @@ public static File tempDir(String namePrefix) { final File f = new File(IoTmpDir, namePrefix + "-" + random.nextInt(1000000)); f.mkdirs(); f.deleteOnExit(); - + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -23,18 +24,19 @@ public void run() { }); return f; } - + /** * Recursively delete the given file/directory and any subfiles (if any exist) + * * @param file The root file at which to begin deleting */ public static void rm(File file) { - if(file == null) { + if (file == null) { return; - } else if(file.isDirectory()) { + } else if (file.isDirectory()) { File[] files = file.listFiles(); - if(files != null) { - for(File f : files) { + if (files != null) { + for (File f : files) { rm(f); } }