diff --git a/pom.xml b/pom.xml
index cb108e2d12b..17cfb5fabe9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,7 @@
3.0.0
0.8.2-beta
2.10
+ 1.7.6
3.0
3.5.1
0.1-SNAPSHOT
@@ -101,6 +102,11 @@
rest-utils
${restutils.version}
+
+ org.slf4j
+ slf4j-log4j12
+ ${log4j.version}
+
junit
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
index c6f302c0974..e3ee539b113 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
@@ -1,30 +1,36 @@
package io.confluent.kafka.schemaregistry.rest;
-import io.confluent.rest.ConfigurationException;
import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import io.confluent.rest.ConfigurationException;
+
public class Main {
- /**
- * Starts an embedded Jetty server running the REST server.
- */
- public static void main(String[] args) throws IOException {
- try {
- SchemaRegistryRestConfiguration config = new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null));
- SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
- Server server = app.createServer();
- server.start();
- System.out.println("Server started, listening for requests...");
- server.join();
- } catch (ConfigurationException e) {
- System.out.println("Server configuration failed: " + e.getMessage());
- e.printStackTrace();
- System.exit(1);
- } catch (Exception e) {
- System.err.println("Server died unexpectedly: " + e.toString());
- e.printStackTrace();
- System.exit(1);
- }
+
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+ /**
+ * Starts an embedded Jetty server running the REST server.
+ */
+ public static void main(String[] args) throws IOException {
+
+ try {
+ SchemaRegistryRestConfiguration config =
+ new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null));
+ SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
+ Server server = app.createServer();
+ server.start();
+ log.info("Server started, listening for requests...");
+ server.join();
+ } catch (ConfigurationException e) {
+ log.error("Server configuration failed: ", e);
+ System.exit(1);
+ } catch (Exception e) {
+ log.error("Server died unexpectedly: ", e);
+ System.exit(1);
}
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java
index bdf7faeb9a6..08fa7f37d99 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryApplicationConfig.java
@@ -1,77 +1,76 @@
package io.confluent.kafka.schemaregistry.rest;
-import io.confluent.rest.Configuration;
-import io.confluent.rest.ConfigurationException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import java.util.Properties;
-public class SchemaRegistryApplicationConfig extends Configuration {
- public Time time;
-
- public boolean debug;
- public static final String DEFAULT_DEBUG = "false";
-
- public int port;
- public static final String DEFAULT_PORT = "8080";
-
- public String zookeeperConnect;
- public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
-
- public String bootstrapServers;
- public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
-
- public int producerThreads;
- public static final String DEFAULT_PRODUCER_THREADS = "5";
-
- /**
- * The consumer timeout used to limit consumer iterator operations. This is effectively the maximum error for the
- * entire request timeout. It should be small enough to get reasonably close to the timeout, but large enough to
- * not result in busy waiting.
- */
- public int consumerIteratorTimeoutMs;
- public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50";
-
- /** The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached. */
- public int consumerRequestTimeoutMs;
- public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";
-
- /** The maximum number of messages returned in a single request. */
- public int consumerRequestMaxMessages;
- public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";
-
- public int consumerThreads;
- public static final String DEFAULT_CONSUMER_THREADS = "1";
-
- /** Amount of idle time before a consumer instance is automatically destroyed. */
- public int consumerInstanceTimeoutMs;
- public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";
-
- public SchemaRegistryApplicationConfig() throws ConfigurationException {
- this(new Properties());
- }
-
- public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException {
- time = new SystemTime();
+import io.confluent.rest.Configuration;
+import io.confluent.rest.ConfigurationException;
- debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));
+public class SchemaRegistryApplicationConfig extends Configuration {
- port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
- zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
- bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
- producerThreads = Integer.parseInt(props.getProperty("producer.threads",
- DEFAULT_PRODUCER_THREADS));
- consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
- "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
- consumerRequestTimeoutMs = Integer.parseInt(props.getProperty(
- "consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
- consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
- "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
- consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
- DEFAULT_CONSUMER_THREADS));
- consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
- "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
- }
+ public static final String DEFAULT_DEBUG = "false";
+ public static final String DEFAULT_PORT = "8080";
+ public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
+ public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
+ public static final String DEFAULT_PRODUCER_THREADS = "5";
+ public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50";
+ public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";
+ public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";
+ public static final String DEFAULT_CONSUMER_THREADS = "1";
+ public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";
+ public Time time;
+ public boolean debug;
+ public int port;
+ public String zookeeperConnect;
+ public String bootstrapServers;
+ public int producerThreads;
+ /**
+ * The consumer timeout used to limit consumer iterator operations. This is effectively the
+ * maximum error for the entire request timeout. It should be small enough to get reasonably close
+ * to the timeout, but large enough to not result in busy waiting.
+ */
+ public int consumerIteratorTimeoutMs;
+ /**
+ * The maximum total time to wait for messages for a request if the maximum number of messages has
+ * not yet been reached.
+ */
+ public int consumerRequestTimeoutMs;
+ /**
+ * The maximum number of messages returned in a single request.
+ */
+ public int consumerRequestMaxMessages;
+ public int consumerThreads;
+ /**
+ * Amount of idle time before a consumer instance is automatically destroyed.
+ */
+ public int consumerInstanceTimeoutMs;
+
+ public SchemaRegistryApplicationConfig() throws ConfigurationException {
+ this(new Properties());
+ }
+
+ public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException {
+ time = new SystemTime();
+
+ debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));
+
+ port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
+ zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
+ bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
+ producerThreads = Integer.parseInt(props.getProperty("producer.threads",
+ DEFAULT_PRODUCER_THREADS));
+ consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
+ "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
+ consumerRequestTimeoutMs = Integer.parseInt(props.getProperty(
+ "consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
+ consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
+ "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
+ consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
+ DEFAULT_CONSUMER_THREADS));
+ consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
+ "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java
index 749512070da..4cb2f946fd2 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestApplication.java
@@ -1,21 +1,29 @@
package io.confluent.kafka.schemaregistry.rest;
-import io.confluent.kafka.schemaregistry.rest.entities.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+import javax.ws.rs.core.Configurable;
+
import io.confluent.kafka.schemaregistry.rest.resources.RootResource;
import io.confluent.kafka.schemaregistry.rest.resources.SchemasResource;
import io.confluent.kafka.schemaregistry.rest.resources.TopicsResource;
+import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreConfig;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryConfig;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaSerializer;
import io.confluent.rest.Application;
import io.confluent.rest.ConfigurationException;
-import javax.ws.rs.core.Configurable;
-import java.util.Properties;
-
public class SchemaRegistryRestApplication extends Application {
- public SchemaRegistryRestApplication() throws ConfigurationException {
+
+ private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class);
+
+ public SchemaRegistryRestApplication() throws ConfigurationException {
this(new Properties());
}
@@ -31,11 +39,17 @@ public SchemaRegistryRestApplication(SchemaRegistryRestConfiguration config) {
public void setupResources(Configurable> config, SchemaRegistryRestConfiguration appConfig) {
Properties props = new Properties();
props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, "localhost:2181");
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "schemaregistry");
- SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props);
- SchemaRegistry schemaRegistry = new SchemaRegistry(schemaRegistryConfig,
- new SchemaSerializer());
- config.register(RootResource.class);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "_schemas");
+ SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props);
+ SchemaRegistry schemaRegistry = null;
+ try {
+ schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig,
+ new SchemaSerializer());
+ } catch (SchemaRegistryException e) {
+ log.error("Error starting the schema registry", e);
+ System.exit(1);
+ }
+ config.register(RootResource.class);
config.register(new TopicsResource(schemaRegistry));
config.register(SchemasResource.class);
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java
index 052b22dbd60..b996d72fe58 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryRestConfiguration.java
@@ -1,7 +1,5 @@
package io.confluent.kafka.schemaregistry.rest;
-import io.confluent.rest.Configuration;
-import io.confluent.rest.ConfigurationException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -9,124 +7,128 @@
import java.io.IOException;
import java.util.Properties;
-public class SchemaRegistryRestConfiguration extends Configuration {
- public Time time;
-
- /**
- * Unique ID for this REST server instance. This is used in generating unique IDs for consumers that do not specify
- * their ID. The ID is empty by default, which makes a single server setup easier to get up and running, but is not
- * safe for multi-server deployments where automatic consumer IDs are used.
- */
- public String id;
- public static final String DEFAULT_ID = "";
-
- public boolean debug;
- public static final String DEFAULT_DEBUG = "false";
-
- public int port;
- public static final String DEFAULT_PORT = "8080";
-
- public String zookeeperConnect;
- public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
-
- public String bootstrapServers;
- public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
-
- public int producerThreads;
- public static final String DEFAULT_PRODUCER_THREADS = "5";
-
- /**
- * The consumer timeout used to limit consumer iterator operations. This should be very small so we can effectively
- * peek() on the iterator.
- */
- public int consumerIteratorTimeoutMs;
- public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "1";
-
- /**
- * Amount of time to backoff when an iterator runs out of data. When a consumer has a dedicated worker thread, this
- * is effectively the maximum error for the entire request timeout. It should be small enough to get reasonably close
- * to the timeout, but large enough to avoid busy waiting.
- */
- public int consumerIteratorBackoffMs;
- public static final String DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS = "50";
-
- /** The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached. */
- public int consumerRequestTimeoutMs;
- public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";
-
- /** The maximum number of messages returned in a single request. */
- public int consumerRequestMaxMessages;
- public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";
-
- public int consumerThreads;
- public static final String DEFAULT_CONSUMER_THREADS = "1";
-
- /** Amount of idle time before a consumer instance is automatically destroyed. */
- public int consumerInstanceTimeoutMs;
- public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";
-
- public SchemaRegistryRestConfiguration() throws ConfigurationException {
- this(new Properties());
- }
-
- public SchemaRegistryRestConfiguration(String propsFile) throws ConfigurationException {
- this(getPropsFromFile(propsFile));
- }
-
- public SchemaRegistryRestConfiguration(Properties props) throws ConfigurationException {
- time = new SystemTime();
-
- id = props.getProperty("id", DEFAULT_ID);
-
- debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));
-
- port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
- zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
- bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
- producerThreads = Integer.parseInt(props.getProperty("producer.threads",
- DEFAULT_PRODUCER_THREADS));
- consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
- "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
- consumerIteratorBackoffMs = Integer.parseInt(props.getProperty(
- "consumer.iterator.backoff.ms", DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS));
- consumerRequestTimeoutMs = Integer.parseInt(props.getProperty("consumer.request.timeout.ms",
- DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
- consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
- "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
- consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
- DEFAULT_CONSUMER_THREADS));
- consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
- "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
- }
-
- @Override
- public boolean getDebug() {
- return debug;
- }
+import io.confluent.rest.Configuration;
+import io.confluent.rest.ConfigurationException;
- @Override
- public int getPort() {
- return port;
- }
+public class SchemaRegistryRestConfiguration extends Configuration {
- @Override
- public Iterable getPreferredResponseMediaTypes() {
- return Versions.PREFERRED_RESPONSE_TYPES;
+ public static final String DEFAULT_ID = "";
+ public static final String DEFAULT_DEBUG = "false";
+ public static final String DEFAULT_PORT = "8080";
+ public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
+ public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
+ public static final String DEFAULT_PRODUCER_THREADS = "5";
+ public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "1";
+ public static final String DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS = "50";
+ public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";
+ public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";
+ public static final String DEFAULT_CONSUMER_THREADS = "1";
+ public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";
+ public Time time;
+ /**
+ * Unique ID for this REST server instance. This is used in generating unique IDs for consumers
+ * that do not specify their ID. The ID is empty by default, which makes a single server setup
+ * easier to get up and running, but is not safe for multi-server deployments where automatic
+ * consumer IDs are used.
+ */
+ public String id;
+ public boolean debug;
+ public int port;
+ public String zookeeperConnect;
+ public String bootstrapServers;
+ public int producerThreads;
+ /**
+ * The consumer timeout used to limit consumer iterator operations. This should be very small so
+ * we can effectively peek() on the iterator.
+ */
+ public int consumerIteratorTimeoutMs;
+ /**
+ * Amount of time to backoff when an iterator runs out of data. When a consumer has a dedicated
+ * worker thread, this is effectively the maximum error for the entire request timeout. It should
+ * be small enough to get reasonably close to the timeout, but large enough to avoid busy
+ * waiting.
+ */
+ public int consumerIteratorBackoffMs;
+ /**
+ * The maximum total time to wait for messages for a request if the maximum number of messages has
+ * not yet been reached.
+ */
+ public int consumerRequestTimeoutMs;
+ /**
+ * The maximum number of messages returned in a single request.
+ */
+ public int consumerRequestMaxMessages;
+ public int consumerThreads;
+ /**
+ * Amount of idle time before a consumer instance is automatically destroyed.
+ */
+ public int consumerInstanceTimeoutMs;
+
+ public SchemaRegistryRestConfiguration() throws ConfigurationException {
+ this(new Properties());
+ }
+
+ public SchemaRegistryRestConfiguration(String propsFile) throws ConfigurationException {
+ this(getPropsFromFile(propsFile));
+ }
+
+ public SchemaRegistryRestConfiguration(Properties props) throws ConfigurationException {
+ time = new SystemTime();
+
+ id = props.getProperty("id", DEFAULT_ID);
+
+ debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));
+
+ port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
+ zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
+ bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
+ producerThreads = Integer.parseInt(props.getProperty("producer.threads",
+ DEFAULT_PRODUCER_THREADS));
+ consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
+ "consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
+ consumerIteratorBackoffMs = Integer.parseInt(props.getProperty(
+ "consumer.iterator.backoff.ms", DEFAULT_CONSUMER_ITERATOR_BACKOFF_MS));
+ consumerRequestTimeoutMs = Integer.parseInt(props.getProperty("consumer.request.timeout.ms",
+ DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
+ consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
+ "consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
+ consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
+ DEFAULT_CONSUMER_THREADS));
+ consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
+ "consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
+ }
+
+ private static Properties getPropsFromFile(String propsFile) throws ConfigurationException {
+ Properties props = new Properties();
+ if (propsFile == null) {
+ return props;
}
-
- @Override
- public String getDefaultResponseMediaType() {
- return Versions.SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT;
+ try {
+ FileInputStream propStream = new FileInputStream(propsFile);
+ props.load(propStream);
+ } catch (IOException e) {
+ throw new ConfigurationException("Couldn't load properties from " + propsFile, e);
}
-
- private static Properties getPropsFromFile(String propsFile) throws ConfigurationException {
- Properties props = new Properties();
- if (propsFile == null) return props;
- try {
- FileInputStream propStream = new FileInputStream(propsFile);
- props.load(propStream);
- } catch (IOException e) {
- throw new ConfigurationException("Couldn't load properties from " + propsFile, e);
- }
- return props;
- }}
+ return props;
+ }
+
+ @Override
+ public boolean getDebug() {
+ return debug;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public Iterable getPreferredResponseMediaTypes() {
+ return Versions.PREFERRED_RESPONSE_TYPES;
+ }
+
+ @Override
+ public String getDefaultResponseMediaType() {
+ return Versions.SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT;
+ }
+}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java
index 0893bdb4759..005aaef2eb4 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/Versions.java
@@ -4,25 +4,27 @@
import java.util.List;
public class Versions {
- public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json";
- // Default weight = 1
- public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON;
- // These are defaults that track the most recent API version. These should always be specified
- // anywhere the latest version is produced/consumed.
- public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON;
- public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json";
- public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED = SCHEMA_REGISTRY_DEFAULT_JSON +
- "; qs=0.9";
- public static final String JSON = "application/json";
- public static final String JSON_WEIGHTED = JSON + "; qs=0.5";
- public static final List PREFERRED_RESPONSE_TYPES = Arrays
- .asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON,
- Versions.JSON);
+ public static final String SCHEMA_REGISTRY_V1_JSON = "application/vnd.schemaregistry.v1+json";
+ // Default weight = 1
+ public static final String SCHEMA_REGISTRY_V1_JSON_WEIGHTED = SCHEMA_REGISTRY_V1_JSON;
+ // These are defaults that track the most recent API version. These should always be specified
+ // anywhere the latest version is produced/consumed.
+ public static final String SCHEMA_REGISTRY_MOST_SPECIFIC_DEFAULT = SCHEMA_REGISTRY_V1_JSON;
+ public static final String SCHEMA_REGISTRY_DEFAULT_JSON = "application/vnd.schemaregistry+json";
+ public static final String SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED =
+ SCHEMA_REGISTRY_DEFAULT_JSON +
+ "; qs=0.9";
+ public static final String JSON = "application/json";
+ public static final String JSON_WEIGHTED = JSON + "; qs=0.5";
- // This type is completely generic and carries no actual information about the type of data, but
- // it is the default for request entities if no content type is specified. Well behaving users
- // of the API will always specify the content type, but ad hoc use may omit it. We treat this as
- // JSON since that's all we currently support.
- public static final String GENERIC_REQUEST = "application/octet-stream";
+ public static final List PREFERRED_RESPONSE_TYPES = Arrays
+ .asList(Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON,
+ Versions.JSON);
+
+ // This type is completely generic and carries no actual information about the type of data, but
+ // it is the default for request entities if no content type is specified. Well behaving users
+ // of the API will always specify the content type, but ad hoc use may omit it. We treat this as
+ // JSON since that's all we currently support.
+ public static final String GENERIC_REQUEST = "application/octet-stream";
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java
index 293663a6300..a0de9fb2a7b 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Schema.java
@@ -1,134 +1,152 @@
package io.confluent.kafka.schemaregistry.rest.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.constraints.Min;
public class Schema {
- @NotEmpty
- private String name;
- @Min(1)
- private Integer version;
- @NotEmpty
- private String schema;
- private boolean compatible = true;
- private boolean deprecated = false;
- private boolean latest = true;
-
- public Schema(@JsonProperty("name") String name,
- @JsonProperty("version") Integer version,
- @JsonProperty("schema") String schema,
- @JsonProperty("compatible") boolean compatible,
- @JsonProperty("deprecated") boolean deprecated,
- @JsonProperty("latest") boolean latest) {
- this.name = name;
- this.version = version;
- this.schema = schema;
- this.compatible = compatible;
- this.deprecated = deprecated;
- this.latest = latest;
- }
-
- @JsonProperty("name")
- public String getName() {
- return name;
- }
-
- @JsonProperty("name")
- public void setName(String name) {
- this.name = name;
- }
-
- @JsonProperty("schema")
- public String getSchema() {
- return this.schema;
- }
-
- @JsonProperty("schema")
- public void setSchema(String schema) {
- this.schema = schema;
- }
-
- @JsonProperty("version")
- public Integer getVersion() {
- return this.version;
- }
-
- @JsonProperty("version")
- public void setVersion(Integer version) {
- this.version = version;
- }
-
- @JsonProperty("compatible")
- public boolean getCompatible() {
- return this.compatible;
- }
-
- @JsonProperty("compatible")
- public void setCompatible(boolean compatible) {
- this.compatible = compatible;
- }
-
- @JsonProperty("deprecated")
- public boolean getDeprecated() {
- return this.deprecated;
- }
-
- @JsonProperty("deprecated")
- public void setDeprecated(boolean deprecated) {
- this.deprecated = deprecated;
- }
-
- @JsonProperty("latest")
- public boolean getLatest() {
- return this.latest;
- }
-
- @JsonProperty("latest")
- public void setLatest(boolean latest) {
- this.latest = latest;
- }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Schema that = (Schema) o;
-
- if (!name.equals(that.getName())) return false;
- if (!schema.equals(that.schema)) return false;
- if (this.version != that.getVersion()) return false;
- if (this.compatible && !that.compatible) return false;
- if (this.deprecated && !that.deprecated) return false;
- if (this.latest && !that.latest) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + schema.hashCode();
- result = 31 * result + version;
- result = 31 * result + new Boolean(compatible).hashCode();
- result = 31 * result + new Boolean(deprecated).hashCode();
- result = 31 * result + new Boolean(latest).hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("{name=" + this.name + ",");
- sb.append("schema=" + this.schema + ",");
- sb.append("version=" + this.version + ",");
- sb.append("compatible=" + this.compatible + ",");
- sb.append("deprecated=" + this.deprecated + ",");
- sb.append("latest=" + this.latest + "}");
- return sb.toString();
- }
+ @NotEmpty
+ private String name;
+ @Min(1)
+ private Integer version;
+ @NotEmpty
+ private String schema;
+ private boolean compatible = true;
+ private boolean deprecated = false;
+ private boolean latest = true;
+
+ public Schema(@JsonProperty("name") String name,
+ @JsonProperty("version") Integer version,
+ @JsonProperty("schema") String schema,
+ @JsonProperty("compatible") boolean compatible,
+ @JsonProperty("deprecated") boolean deprecated,
+ @JsonProperty("latest") boolean latest) {
+ this.name = name;
+ this.version = version;
+ this.schema = schema;
+ this.compatible = compatible;
+ this.deprecated = deprecated;
+ this.latest = latest;
+ }
+
+ @JsonProperty("name")
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty("name")
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("schema")
+ public String getSchema() {
+ return this.schema;
+ }
+
+ @JsonProperty("schema")
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ @JsonProperty("version")
+ public Integer getVersion() {
+ return this.version;
+ }
+
+ @JsonProperty("version")
+ public void setVersion(Integer version) {
+ this.version = version;
+ }
+
+ @JsonProperty("compatible")
+ public boolean getCompatible() {
+ return this.compatible;
+ }
+
+ @JsonProperty("compatible")
+ public void setCompatible(boolean compatible) {
+ this.compatible = compatible;
+ }
+
+ @JsonProperty("deprecated")
+ public boolean getDeprecated() {
+ return this.deprecated;
+ }
+
+ @JsonProperty("deprecated")
+ public void setDeprecated(boolean deprecated) {
+ this.deprecated = deprecated;
+ }
+
+ @JsonProperty("latest")
+ public boolean getLatest() {
+ return this.latest;
+ }
+
+ @JsonProperty("latest")
+ public void setLatest(boolean latest) {
+ this.latest = latest;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Schema that = (Schema) o;
+
+ if (!name.equals(that.getName())) {
+ return false;
+ }
+ if (!schema.equals(that.schema)) {
+ return false;
+ }
+ if (this.version != that.getVersion()) {
+ return false;
+ }
+ if (this.compatible && !that.compatible) {
+ return false;
+ }
+ if (this.deprecated && !that.deprecated) {
+ return false;
+ }
+ if (this.latest && !that.latest) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + schema.hashCode();
+ result = 31 * result + version;
+ result = 31 * result + new Boolean(compatible).hashCode();
+ result = 31 * result + new Boolean(deprecated).hashCode();
+ result = 31 * result + new Boolean(latest).hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{name=" + this.name + ",");
+ sb.append("schema=" + this.schema + ",");
+ sb.append("version=" + this.version + ",");
+ sb.append("compatible=" + this.compatible + ",");
+ sb.append("deprecated=" + this.deprecated + ",");
+ sb.append("latest=" + this.latest + "}");
+ return sb.toString();
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java
index 267ab74e8da..000e6093a53 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/Topic.java
@@ -1,106 +1,122 @@
package io.confluent.kafka.schemaregistry.rest.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.hibernate.validator.constraints.NotEmpty;
public class Topic {
- @NotEmpty
- private String name;
-
- private String compatibility = "full";
- private String registration = "all";
- private String deprecation = "all";
- private String validators = null;
-
- public Topic(@JsonProperty("name") String name,
- @JsonProperty("compatibility") String compatibility,
- @JsonProperty("registration") String registration,
- @JsonProperty("deprecation") String deprecation,
- @JsonProperty("validators") String validators) {
- this.name = name;
- this.compatibility = compatibility;
- this.registration = registration;
- this.deprecation = deprecation;
- this.validators = validators;
- }
-
- public Topic(@JsonProperty("name") String name) {
- this.name = name;
- }
-
- @JsonProperty("name")
- public String getName() {
- return name;
- }
-
- @JsonProperty("name")
- public void setName(String name) {
- this.name = name;
- }
-
- @JsonProperty("compatibility")
- public String getCompatibility() {
- return this.compatibility;
- }
- @JsonProperty("compatibility")
- public void setCompatibility(String compatibility) {
- this.compatibility = compatibility;
+ @NotEmpty
+ private String name;
+
+ private String compatibility = "full";
+ private String registration = "all";
+ private String deprecation = "all";
+ private String validators = null;
+
+ public Topic(@JsonProperty("name") String name,
+ @JsonProperty("compatibility") String compatibility,
+ @JsonProperty("registration") String registration,
+ @JsonProperty("deprecation") String deprecation,
+ @JsonProperty("validators") String validators) {
+ this.name = name;
+ this.compatibility = compatibility;
+ this.registration = registration;
+ this.deprecation = deprecation;
+ this.validators = validators;
+ }
+
+ public Topic(@JsonProperty("name") String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("name")
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty("name")
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("compatibility")
+ public String getCompatibility() {
+ return this.compatibility;
+ }
+
+ @JsonProperty("compatibility")
+ public void setCompatibility(String compatibility) {
+ this.compatibility = compatibility;
+ }
+
+ @JsonProperty("registration")
+ public String getRegistration() {
+ return this.registration;
+ }
+
+ @JsonProperty("registration")
+ public void setRegistration(String registration) {
+ this.registration = registration;
+ }
+
+ @JsonProperty("deprecation")
+ public String getDeprecation() {
+ return this.deprecation;
+ }
+
+ @JsonProperty("deprecation")
+ public void setDeprecation(String deprecation) {
+ this.deprecation = deprecation;
+ }
+
+ @JsonProperty("validators")
+ public String getValidators() {
+ return this.validators;
+ }
+
+ @JsonProperty("validators")
+ public void setValidators(String validators) {
+ this.validators = validators;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- @JsonProperty("registration")
- public String getRegistration() {
- return this.registration;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
- @JsonProperty("registration")
- public void setRegistration(String registration) {
- this.registration = registration;
- }
+ Topic topic = (Topic) o;
- @JsonProperty("deprecation")
- public String getDeprecation() {
- return this.deprecation;
+ if (!name.equals(topic.name)) {
+ return false;
}
-
- @JsonProperty("deprecation")
- public void setDeprecation(String deprecation) {
- this.deprecation = deprecation;
+ if (!this.compatibility.equals(topic.compatibility)) {
+ return false;
}
-
- @JsonProperty("validators")
- public String getValidators() {
- return this.validators;
+ if (!this.registration.equals(topic.registration)) {
+ return false;
}
-
- @JsonProperty("validators")
- public void setValidators(String validators) {
- this.validators = validators;
+ if (!this.deprecation.equals(topic.deprecation)) {
+ return false;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Topic topic = (Topic) o;
-
- if (!name.equals(topic.name)) return false;
- if (!this.compatibility.equals(topic.compatibility)) return false;
- if (!this.registration.equals(topic.registration)) return false;
- if (!this.deprecation.equals(topic.deprecation)) return false;
- if (!this.validators.equals(topic.validators)) return false;
-
- return true;
+ if (!this.validators.equals(topic.validators)) {
+ return false;
}
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + this.registration.hashCode();
- result = 31 * result + this.deprecation.hashCode();
- result = 31 * result + this.validators.hashCode();
- return result;
- }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + this.registration.hashCode();
+ result = 31 * result + this.deprecation.hashCode();
+ result = 31 * result + this.validators.hashCode();
+ return result;
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java
index 58953e36710..d4e2199dfc7 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaRequest.java
@@ -1,42 +1,49 @@
package io.confluent.kafka.schemaregistry.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
-import io.confluent.kafka.schemaregistry.rest.entities.Schema;
+
import org.hibernate.validator.constraints.NotEmpty;
public class RegisterSchemaRequest {
- @NotEmpty
- private String schema;
- @JsonProperty("schema")
- public String getSchema() {
- return this.schema;
- }
+ @NotEmpty
+ private String schema;
- @JsonProperty("schema")
- public void setSchema(String schema) {
- this.schema = schema;
- }
+ @JsonProperty("schema")
+ public String getSchema() {
+ return this.schema;
+ }
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ @JsonProperty("schema")
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
- RegisterSchemaRequest that = (RegisterSchemaRequest) o;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
- if (schema != null ? !schema.equals(that.schema) : that.schema != null) {
- return false;
- }
+ RegisterSchemaRequest that = (RegisterSchemaRequest) o;
- return true;
+ if (schema != null ? !schema.equals(that.schema) : that.schema != null) {
+ return false;
}
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (schema != null ? schema.hashCode() : 0);
- return result;
- }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (schema != null ? schema.hashCode() : 0);
+ return result;
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java
index 07ac0fbe7aa..200fa782111 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/entities/requests/RegisterSchemaResponse.java
@@ -1,20 +1,22 @@
package io.confluent.kafka.schemaregistry.rest.entities.requests;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.hibernate.validator.constraints.NotEmpty;
public class RegisterSchemaResponse {
- @NotEmpty
- private int version;
- @JsonProperty("version")
- public int getVersion() {
- return version;
- }
+ @NotEmpty
+ private int version;
+
+ @JsonProperty("version")
+ public int getVersion() {
+ return version;
+ }
- @JsonProperty("version")
- public void setVersion(int version) {
- this.version = version;
- }
+ @JsonProperty("version")
+ public void setVersion(int version) {
+ this.version = version;
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java
index 615e3275ea9..c12f03bac47 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/RootResource.java
@@ -1,32 +1,38 @@
package io.confluent.kafka.schemaregistry.rest.resources;
-import io.confluent.kafka.schemaregistry.rest.Versions;
-
-import javax.validation.Valid;
-import javax.ws.rs.*;
import java.util.HashMap;
import java.util.Map;
+import javax.validation.Valid;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import io.confluent.kafka.schemaregistry.rest.Versions;
+
@Path("/")
-@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED,
- Versions.JSON_WEIGHTED})
+@Produces(
+ {Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED, Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED,
+ Versions.JSON_WEIGHTED})
@Consumes({Versions.SCHEMA_REGISTRY_V1_JSON, Versions.SCHEMA_REGISTRY_DEFAULT_JSON,
- Versions.JSON, Versions.GENERIC_REQUEST})
+ Versions.JSON, Versions.GENERIC_REQUEST})
public class RootResource {
- @GET
- public Map get() {
- // Currently this just provides an endpoint that's a nop and can be used to check for
- // liveness and can be used for tests that need to test the server setup rather than the
- // functionality of a specific resource. Some APIs provide a listing of endpoints as their
- // root resource; it might be nice to provide that.
- return new HashMap();
- }
+ @GET
+ public Map get() {
+ // Currently this just provides an endpoint that's a nop and can be used to check for
+ // liveness and can be used for tests that need to test the server setup rather than the
+ // functionality of a specific resource. Some APIs provide a listing of endpoints as their
+ // root resource; it might be nice to provide that.
+ return new HashMap();
+ }
- @POST
- public Map post(@Valid Map request) {
- // This version allows testing with posted entities
- return new HashMap();
- }
+ @POST
+ public Map post(@Valid Map request) {
+ // This version allows testing with posted entities
+ return new HashMap();
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java
index fce0c4add36..d582e466bda 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SchemasResource.java
@@ -1,19 +1,32 @@
package io.confluent.kafka.schemaregistry.rest.resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Response;
+
import io.confluent.kafka.schemaregistry.rest.Versions;
import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import javax.ws.rs.*;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED,
Versions.JSON_WEIGHTED})
@@ -22,8 +35,9 @@
Versions.JSON, Versions.GENERIC_REQUEST})
public class SchemasResource {
public final static String MESSAGE_SCHEMA_NOT_FOUND = "Schema not found.";
+ private static final Logger log = LoggerFactory.getLogger(SchemasResource.class);
- private final String topic;
+ private final String topic;
private final boolean isKey;
private final SchemaRegistry schemaRegistry;
@@ -36,8 +50,15 @@ public SchemasResource(SchemaRegistry registry, String topic, boolean isKey) {
@GET
@Path("/{id}")
public Schema getSchema(@PathParam("id") Integer id) {
- Schema schema = schemaRegistry.get(this.topic, id);
- if (schema == null)
+ Schema schema = null;
+ try {
+ schema = schemaRegistry.get(this.topic, id);
+ } catch (SchemaRegistryException e) {
+ log.debug("Error while retrieving schema with id " + id + " from the schema registry",
+ e);
+ throw new NotFoundException(MESSAGE_SCHEMA_NOT_FOUND, e);
+ }
+ if (schema == null)
throw new NotFoundException(MESSAGE_SCHEMA_NOT_FOUND);
return schema;
}
@@ -49,8 +70,7 @@ public List list() {
try {
allSchemasForThisTopic = schemaRegistry.getAllVersions(this.topic);
} catch (StoreException e) {
- // TODO: throw meaningful exception
- e.printStackTrace();
+ throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e);
}
while (allSchemasForThisTopic.hasNext()) {
Schema schema = allSchemasForThisTopic.next();
@@ -63,8 +83,13 @@ public List list() {
public void register(final @Suspended AsyncResponse asyncResponse,
@PathParam("topic") String topicName, RegisterSchemaRequest request) {
Schema schema = new Schema(topicName, 0, request.getSchema(), true, false, true);
- int version = schemaRegistry.register(topicName, schema);
- RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
+ int version = 0;
+ try {
+ version = schemaRegistry.register(topicName, schema);
+ } catch (SchemaRegistryException e) {
+ throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse();
registerSchemaResponse.setVersion(version);
asyncResponse.resume(registerSchemaResponse);
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java
index 40a77b83cd5..2e92c3a4690 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/TopicsResource.java
@@ -1,13 +1,21 @@
package io.confluent.kafka.schemaregistry.rest.resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+
import io.confluent.kafka.schemaregistry.rest.Versions;
-import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.rest.entities.Topic;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
-import javax.ws.rs.*;
-import java.util.Set;
-
@Path("/topics")
@Produces({Versions.SCHEMA_REGISTRY_V1_JSON_WEIGHTED,
Versions.SCHEMA_REGISTRY_DEFAULT_JSON_WEIGHTED,
@@ -17,7 +25,8 @@
Versions.JSON, Versions.GENERIC_REQUEST})
public class TopicsResource {
public final static String MESSAGE_TOPIC_NOT_FOUND = "Topic not found.";
- private final SchemaRegistry schemaRegistry;
+ private static final Logger log = LoggerFactory.getLogger(TopicsResource.class);
+ private final SchemaRegistry schemaRegistry;
public TopicsResource(SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
@@ -26,11 +35,10 @@ public TopicsResource(SchemaRegistry schemaRegistry) {
@GET
@Path("/{topic}")
public Topic getTopic(@PathParam("topic") String topicName) {
- System.out.println("Received get topic request for " + topicName);
if (!schemaRegistry.listTopics().contains(topicName))
throw new NotFoundException(MESSAGE_TOPIC_NOT_FOUND);
- // TODO: Implement topic/schema metadata
- return new Topic(topicName);
+ // TODO: https://github.com/confluentinc/schema-registry/issues/3 Implement metadata
+ return new Topic(topicName);
}
@Path("/{topic}/key/versions")
@@ -45,7 +53,6 @@ public SchemasResource getValueSchemas(@PathParam("topic") String topicName) {
@GET
public Set list() {
- System.out.println("Received list topics request");
return schemaRegistry.listTopics();
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java
index bcd972c2176..c9c087e52a7 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/InMemoryStore.java
@@ -1,56 +1,63 @@
/**
- *
+ *
*/
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
+
/**
* In-memory store based on maps
- * @author nnarkhed
*
+ * @author nnarkhed
*/
-public class InMemoryStore implements Store {
- private final ConcurrentSkipListMap store;
-
- public InMemoryStore() {
- store = new ConcurrentSkipListMap();
- }
-
- public void init() throws StoreInitializationException {
- // do nothing
- }
-
- @Override public V get(K key) {
- return store.get(key);
- }
-
- @Override public void put(K key, V value) throws StoreException {
- store.put(key, value);
- }
-
- @Override public Iterator getAll(K key1, K key2) {
- ConcurrentNavigableMap subMap = (key1 == null && key2 == null) ?
- store : store.subMap(key1, key2);
- return subMap.values().iterator();
- }
-
- @Override public void putAll(Map entries) {
- store.putAll(entries);
- }
-
- @Override public void delete(K key) throws StoreException {
- store.remove(key);
- }
-
- @Override public void close() {
- store.clear();
- }
+public class InMemoryStore implements Store {
+
+ private final ConcurrentSkipListMap store;
+
+ public InMemoryStore() {
+ store = new ConcurrentSkipListMap();
+ }
+
+ public void init() throws StoreInitializationException {
+ // do nothing
+ }
+
+ @Override
+ public V get(K key) {
+ return store.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) throws StoreException {
+ store.put(key, value);
+ }
+
+ @Override
+ public Iterator getAll(K key1, K key2) {
+ ConcurrentNavigableMap subMap = (key1 == null && key2 == null) ?
+ store : store.subMap(key1, key2);
+ return subMap.values().iterator();
+ }
+
+ @Override
+ public void putAll(Map entries) {
+ store.putAll(entries);
+ }
+
+ @Override
+ public void delete(K key) throws StoreException {
+ store.remove(key);
+ }
+
+ @Override
+ public void close() {
+ store.clear();
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java
new file mode 100644
index 00000000000..ce1c4d9f680
--- /dev/null
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java
@@ -0,0 +1,140 @@
+package io.confluent.kafka.schemaregistry.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import io.confluent.kafka.schemaregistry.rest.entities.Schema;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
+import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
+import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer;
+
+public class KafkaSchemaRegistry implements SchemaRegistry {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
+
+ private final Map schemaVersions;
+ private final Store kafkaStore;
+ private final Serializer serializer;
+
+ public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer serializer)
+ throws SchemaRegistryException {
+ this.serializer = serializer;
+ schemaVersions = new HashMap();
+ StringSerializer stringSerializer = new StringSerializer();
+ kafkaStore = new KafkaStore(config,
+ stringSerializer, this.serializer,
+ new InMemoryStore());
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ }
+ try {
+ Iterator allSchemas = kafkaStore.getAll(null, null);
+ while (allSchemas.hasNext()) {
+ Schema schema = allSchemas.next();
+ log.debug("Applying schema " + schema.toString() + " to the schema version " +
+ "cache");
+ schemaVersions.put(schema.getName(), schema.getVersion());
+ }
+ } catch (StoreException e) {
+ throw new SchemaRegistryException("Error while bootstrapping the schema registry " +
+ "from the backend Kafka store", e);
+ }
+ log.trace("Contents of version cache after bootstrap is complete" +
+ schemaVersions.toString());
+ }
+
+ @Override
+ public int register(String topic, Schema schema) throws SchemaRegistryException {
+ int latestVersion = 0;
+ if (schemaVersions.containsKey(topic)) {
+ latestVersion = schemaVersions.get(topic);
+ }
+ int version = latestVersion + 1;
+ String newKeyForLatestSchema = topic + "," + version;
+ String keyForLatestSchema = topic + "," + latestVersion;
+ Schema latestSchema = null;
+ try {
+ latestSchema = kafkaStore.get(keyForLatestSchema);
+ } catch (StoreException e) {
+ throw new SchemaRegistryException("Error while retrieving the latest schema from the" +
+ " backend Kafka store", e);
+ }
+ if (isCompatible(topic, schema, latestSchema)) {
+ try {
+ schema.setVersion(version);
+ log.trace("Adding schema to the Kafka store: " + schema.toString());
+ kafkaStore.put(newKeyForLatestSchema, schema);
+ } catch (StoreException e) {
+ throw new SchemaRegistryException("Error while registering the schema in the" +
+ " backend Kafka store", e);
+ }
+ }
+ schemaVersions.put(topic, version);
+ return version;
+ }
+
+ @Override
+ public Schema get(String topic, int version) throws SchemaRegistryException {
+ String key = topic + "," + version;
+ Schema schema = null;
+ try {
+ schema = kafkaStore.get(key);
+ } catch (StoreException e) {
+ throw new SchemaRegistryException(
+ "Error while retrieving schema from the backend Kafka" +
+ " store", e);
+ }
+ return schema;
+ }
+
+ @Override
+ public Set listTopics() {
+ return schemaVersions.keySet();
+ }
+
+ @Override
+ public Iterator getAll(String topic) throws StoreException {
+ int earliestVersion = 1;
+ int latestVersion = 1;
+ if (schemaVersions.containsKey(topic)) {
+ latestVersion = schemaVersions.get(topic) + 1;
+ }
+ String keyEarliestVersion = topic + "," + earliestVersion;
+ String keyLatestVersion = topic + "," + latestVersion;
+ return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion);
+ }
+
+ @Override
+ public Iterator getAllVersions(String topic) throws StoreException {
+ int earliestVersion = 1;
+ int latestVersion = 1;
+ if (schemaVersions.containsKey(topic)) {
+ latestVersion = schemaVersions.get(topic) + 1;
+ } else {
+ log.trace("Schema for " + topic + " does not exist in version cache. " +
+ "Defaulting to version 1 as latest version");
+ }
+ String keyEarliestVersion = topic + "," + earliestVersion;
+ String keyLatestVersion = topic + "," + latestVersion;
+ log.trace("Getting schemas between versions: " + earliestVersion + "," + latestVersion);
+ return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion);
+ }
+
+ @Override
+ public boolean isCompatible(String topic, Schema schema1, Schema schema2) {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ kafkaStore.close();
+ }
+}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java
index 48fb2525a0d..7253a8ed484 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java
@@ -1,5 +1,26 @@
package io.confluent.kafka.schemaregistry.storage;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
@@ -13,22 +34,13 @@
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import scala.collection.JavaConversions;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class KafkaStore implements Store {
- private final String kafkaClusterZkUrl;
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStore.class);
+
+ private final String kafkaClusterZkUrl;
private final int zkSessionTimeoutMs;
private final String topic;
private final String groupId;
@@ -105,11 +117,21 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer,
while (iterator.hasNext()) {
MessageAndMetadata messageAndMetadata = iterator.next();
byte[] messageBytes = messageAndMetadata.message();
- V message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes);
- K messageKey = keySerializer.fromBytes(messageAndMetadata.key());
- try {
- System.out.println("Applying update (" + messageKey + "," + message + ") to the " +
- "local store during bootstrap");
+ V message = null;
+ try {
+ message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes);
+ } catch (SerializationException e) {
+ throw new StoreInitializationException("Failed to deserialize the schema", e);
+ }
+ K messageKey = null;
+ try {
+ messageKey = keySerializer.fromBytes(messageAndMetadata.key());
+ } catch (SerializationException e) {
+ throw new StoreInitializationException("Failed to deserialize the schema key", e);
+ }
+ try {
+ log.trace("Applying update (" + messageKey + "," + message + ") to the " +
+ "local store during bootstrap");
if (message == null) {
localStore.delete(messageKey);
} else {
@@ -126,7 +148,8 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer,
// the consumer will checkpoint it's offset in zookeeper, so the background thread will
// continue from where the bootstrap consumer left off
consumer.shutdown();
- System.out.println("Bootstrap is complete. Now switching to live update");
+ log.info("Kafka store bootstrap from the log " + this.topic + " is complete. Now " +
+ "switching to live update");
}
// start the background thread that subscribes to the Kafka topic and applies updates
kafkaTopicReader.start();
@@ -148,9 +171,15 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer,
throw new StoreException("Key should not be null");
}
// write to the Kafka topic
- ProducerRecord producerRecord = new ProducerRecord(topic, 0, keySerializer.toBytes(key),
+ ProducerRecord producerRecord = null;
+ try {
+ producerRecord = new ProducerRecord(topic, 0, keySerializer.toBytes(key),
value == null ? null : valueSerializer.toBytes(value));
- Future ack = producer.send(producerRecord);
+ } catch (SerializationException e) {
+ throw new StoreException("Error serializing schema while creating the Kafka produce "
+ + "record", e);
+ }
+ Future ack = producer.send(producerRecord);
try {
ack.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -186,8 +215,11 @@ public KafkaStore(KafkaStoreConfig storeConfig, Serializer keySerializer,
@Override public void close() {
kafkaTopicReader.shutdown();
- producer.close();
- localStore.close();
+ log.debug("Kafka store reader thread shut down");
+ producer.close();
+ log.debug("Kafka store producer shut down");
+ localStore.close();
+ log.debug("Kafka store shut down complete");
}
private void assertInitialized() throws StoreException {
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java
index d61edd7fa62..fa598849ccd 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreConfig.java
@@ -11,44 +11,54 @@
public class KafkaStoreConfig extends AbstractConfig {
- /** kafkastore.connection.url
*/
- public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url";
- protected static final String KAFKASTORE_CONNECTION_URL_DOC = "Zookeeper url for the Kafka cluster";
-
- /** kafkastore.zk.session.timeout.ms
*/
- public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG
- = "kafkastore.zk.session.timeout.ms";
- protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout";
-
- /** kafkastore.topic
*/
- public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic";
- protected static final String KAFKASTORE_TOPIC_DOC = "The durable single partition topic that acts" +
- "as the durable log for the data";
-
- /** kafkastore.timeout.ms
*/
- public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms";
- protected static final String KAFKASTORE_TIMEOUT_DOC = "The timeout for an operation on the Kafka" +
- " store";
-
- private static final ConfigDef config = new ConfigDef()
- .define(KAFKASTORE_CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH,
- KAFKASTORE_CONNECTION_URL_DOC)
- .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, atLeast(0), Importance.LOW,
- KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC)
- .define(KAFKASTORE_TOPIC_CONFIG, Type.STRING, Importance.HIGH, KAFKASTORE_TOPIC_DOC)
- .define(KAFKASTORE_TIMEOUT_CONFIG, Type.INT, 500, atLeast(0), Importance.MEDIUM,
- KAFKASTORE_TIMEOUT_DOC);
-
- public KafkaStoreConfig(ConfigDef arg0, Map, ?> arg1) {
- super(arg0, arg1);
- }
-
- KafkaStoreConfig(Map extends Object, ? extends Object> props) {
- super(config, props);
- }
-
- public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
- }
+ /**
+ * kafkastore.connection.url
+ */
+ public static final String KAFKASTORE_CONNECTION_URL_CONFIG = "kafkastore.connection.url";
+ /**
+ * kafkastore.zk.session.timeout.ms
+ */
+ public static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG
+ = "kafkastore.zk.session.timeout.ms";
+ /**
+ * kafkastore.topic
+ */
+ public static final String KAFKASTORE_TOPIC_CONFIG = "kafkastore.topic";
+ /**
+ * kafkastore.timeout.ms
+ */
+ public static final String KAFKASTORE_TIMEOUT_CONFIG = "kafkastore.timeout.ms";
+ protected static final String KAFKASTORE_CONNECTION_URL_DOC =
+ "Zookeeper url for the Kafka cluster";
+ protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC =
+ "Zookeeper session timeout";
+ protected static final String KAFKASTORE_TOPIC_DOC =
+ "The durable single partition topic that acts" +
+ "as the durable log for the data";
+ protected static final String KAFKASTORE_TIMEOUT_DOC =
+ "The timeout for an operation on the Kafka" +
+ " store";
+
+ private static final ConfigDef config = new ConfigDef()
+ .define(KAFKASTORE_CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH,
+ KAFKASTORE_CONNECTION_URL_DOC)
+ .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, atLeast(0),
+ Importance.LOW,
+ KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC)
+ .define(KAFKASTORE_TOPIC_CONFIG, Type.STRING, Importance.HIGH, KAFKASTORE_TOPIC_DOC)
+ .define(KAFKASTORE_TIMEOUT_CONFIG, Type.INT, 500, atLeast(0), Importance.MEDIUM,
+ KAFKASTORE_TIMEOUT_DOC);
+
+ public KafkaStoreConfig(ConfigDef arg0, Map, ?> arg1) {
+ super(arg0, arg1);
+ }
+
+ KafkaStoreConfig(Map extends Object, ? extends Object> props) {
+ super(config, props);
+ }
+
+ public static void main(String[] args) {
+ System.out.println(config.toHtmlTable());
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java
index f6443dca17e..57afadfaccf 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreReaderThread.java
@@ -1,5 +1,15 @@
package io.confluent.kafka.schemaregistry.storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import kafka.consumer.ConsumerConfig;
@@ -10,10 +20,11 @@
import kafka.message.MessageAndMetadata;
import kafka.utils.ShutdownableThread;
-import java.util.*;
-
public class KafkaStoreReaderThread extends ShutdownableThread {
- private final String kafkaClusterZkUrl;
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStoreReaderThread.class);
+
+ private final String kafkaClusterZkUrl;
private final String topic;
private final String groupId;
private final Serializer keySerializer;
@@ -63,7 +74,8 @@ public void start() {
}
KafkaStream stream = streamsForTheLogTopic.get(0);
consumerIterator = stream.iterator();
- System.out.println("Thread started with consumer properties " + consumerProps.toString());
+ log.debug("Kafka store reader thread started with consumer properties " +
+ consumerProps.toString());
}
@Override
@@ -72,12 +84,23 @@ public void doWork() {
while (consumerIterator != null && consumerIterator.hasNext()) {
MessageAndMetadata messageAndMetadata = consumerIterator.next();
byte[] messageBytes = messageAndMetadata.message();
- V message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes);
- K messageKey = keySerializer.fromBytes(messageAndMetadata.key());
- try {
- System.out.println("Applying update (" + messageKey + "," + message + ") to the " +
- "local store");
- if (message == null) {
+ V message = null;
+ try {
+ message = messageBytes == null ? null : valueSerializer.fromBytes(messageBytes);
+ } catch (SerializationException e) {
+ // TODO: fail just this operation or all subsequent operations?
+ log.error("Failed to deserialize the schema", e);
+ }
+ K messageKey = null;
+ try {
+ messageKey = keySerializer.fromBytes(messageAndMetadata.key());
+ } catch (SerializationException e) {
+ log.error("Failed to deserialize the schema key", e);
+ }
+ try {
+ log.trace("Applying update (" + messageKey + "," + message + ") to the local " +
+ "store");
+ if (message == null) {
localStore.delete(messageKey);
} else {
localStore.put(messageKey, message);
@@ -93,8 +116,7 @@ public void doWork() {
* 1. Restart the store hoping that it works subsequently
* 2. Look into the issue manually
*/
- System.err.println("Failed to add record from the Kafka topic" +
- topic + " the local store");
+ log.error("Failed to add record from the Kafka topic" + topic + " the local store");
}
}
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java
index 90e7fe0e129..4b1ffdcf705 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbConfig.java
@@ -15,49 +15,62 @@
public class RocksDbConfig extends AbstractConfig {
- /** rocksdb.data.dir
*/
+ /**
+ * rocksdb.data.dir
+ */
public static final String ROCKSDB_DATADIR_CONFIG = "rocksdb.data.dir";
- private static final String ROCKSDB_DATADIR_DOC = "Location of the rocksdb data";
-
- /** rocksdb.compression
*/
+ /**
+ * rocksdb.compression
+ */
public static final String ROCKSDB_COMPRESSION_CONFIG = "rocksdb.compression";
- private static final String ROCKSDB_COMPRESSION_DOC = "The compression setting and choice for data stored in RocksDB. Could be one of" +
- "snappy, bzip2, zlib, lz4, lz4hc, none";
- private static final List compressionOptions = Arrays.asList("snappy", "bzip2", "zlib", "lz4", "lz4hc", "none");
-
- /** rocksdb.block.size.bytes
*/
+ /**
+ * rocksdb.block.size.bytes
+ */
public static final String ROCKSDB_BLOCK_SIZE_BYTES_CONFIG = "rocksdb.block.size.bytes";
- private static final String ROCKSDB_BLOCK_SIZE_DOC = "Block size in bytes for data in RocksDB";
-
- /** rocksdb.write.buffer.size.bytes
*/
- public static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG = "rocksdb.write.buffer.size.bytes";
- private static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC = "Write buffer size in bytes for data in RocksDB";
-
- /** rocksdb.bloomfilter.bits
*/
+ /**
+ * rocksdb.write.buffer.size.bytes
+ */
+ public static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG =
+ "rocksdb.write.buffer.size.bytes";
+ /**
+ * rocksdb.bloomfilter.bits
+ */
public static final String ROCKSDB_BLOOMFILTER_BITS_CONFIG = "rocksdb.bloomfilter.bits";
-
public static final String ROCKSDB_COMPACTION_STYLE_CONFIG = "rocksdb.compaction.style";
- private static final String ROCKSDB_COMPACTION_STYLE_DOC = "The compaction style for RocksDB. Could be one of universal, fifo, level";
- private static final List compactionOptions = Arrays.asList("universal", "level", "fifo");
-
public static final String ROCKSDB_WRITE_BUFFERS_CONFIG = "rocksdb.write.buffers";
-
- private static final ConfigDef config = new ConfigDef().define(ROCKSDB_COMPRESSION_CONFIG, Type.STRING, CompressionType.NO_COMPRESSION.toString(),
- Importance.MEDIUM, ROCKSDB_COMPRESSION_DOC)
+ private static final String ROCKSDB_DATADIR_DOC = "Location of the rocksdb data";
+ private static final String ROCKSDB_COMPRESSION_DOC =
+ "The compression setting and choice for data stored in RocksDB. Could be one of" +
+ "snappy, bzip2, zlib, lz4, lz4hc, none";
+ private static final List compressionOptions =
+ Arrays.asList("snappy", "bzip2", "zlib", "lz4", "lz4hc", "none");
+ private static final String ROCKSDB_BLOCK_SIZE_DOC = "Block size in bytes for data in RocksDB";
+ private static final String ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC =
+ "Write buffer size in bytes for data in RocksDB";
+ private static final String ROCKSDB_COMPACTION_STYLE_DOC =
+ "The compaction style for RocksDB. Could be one of universal, fifo, level";
+ private static final List compactionOptions =
+ Arrays.asList("universal", "level", "fifo");
+ private static final ConfigDef config = new ConfigDef()
+ .define(ROCKSDB_COMPRESSION_CONFIG, Type.STRING, CompressionType.NO_COMPRESSION.toString(),
+ Importance.MEDIUM, ROCKSDB_COMPRESSION_DOC)
.define(ROCKSDB_DATADIR_CONFIG, Type.STRING, Importance.HIGH, ROCKSDB_DATADIR_DOC)
- .define(ROCKSDB_COMPACTION_STYLE_CONFIG, Type.STRING, CompactionStyle.UNIVERSAL.toString(), Importance.MEDIUM,
- ROCKSDB_COMPACTION_STYLE_DOC)
- .define(ROCKSDB_BLOCK_SIZE_BYTES_CONFIG, Type.INT, 4096, Importance.MEDIUM, ROCKSDB_BLOCK_SIZE_DOC)
+ .define(ROCKSDB_COMPACTION_STYLE_CONFIG, Type.STRING, CompactionStyle.UNIVERSAL.toString(),
+ Importance.MEDIUM,
+ ROCKSDB_COMPACTION_STYLE_DOC)
+ .define(ROCKSDB_BLOCK_SIZE_BYTES_CONFIG, Type.INT, 4096, Importance.MEDIUM,
+ ROCKSDB_BLOCK_SIZE_DOC)
.define(ROCKSDB_BLOOMFILTER_BITS_CONFIG, Type.INT, 10, atLeast(0), Importance.LOW, null)
.define(ROCKSDB_WRITE_BUFFERS_CONFIG, Type.INT, 1, Importance.LOW, null)
- .define(ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG, Type.LONG, 4096, Importance.MEDIUM, ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC);
+ .define(ROCKSDB_WRITE_BUFFER_SIZE_BYTES_CONFIG, Type.LONG, 4096, Importance.MEDIUM,
+ ROCKSDB_WRITE_BUFFER_SIZE_BYTES_DOC);
public RocksDbConfig(ConfigDef arg0, Map, ?> arg1) {
super(arg0, arg1);
}
RocksDbConfig(Map extends Object, ? extends Object> props) {
- super(config, props);
+ super(config, props);
}
public static void main(String[] args) {
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java
index 83445c2bd15..69b048cba8f 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/RocksDbStore.java
@@ -3,27 +3,32 @@
*/
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-import org.rocksdb.*;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
import java.io.File;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
+
/**
* @author nnarkhed
*
*/
public class RocksDbStore implements Store {
- private RocksDB db;
private final Options options;
private final File rocksDbDir;
private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private RocksDB db;
- public RocksDbStore(RocksDbConfig config) {
+ public RocksDbStore(RocksDbConfig config) {
rocksDbDir = new File(config.getString(RocksDbConfig.ROCKSDB_DATADIR_CONFIG));
options = new Options();
String compression = config.getString(RocksDbConfig.ROCKSDB_COMPRESSION_CONFIG);
@@ -61,8 +66,7 @@ public byte[] get(byte[] key) throws StoreException {
try {
value = db.get(key);
} catch (RocksDBException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new StoreException(e);
}
return value;
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java
index 4e666a7703f..ed58f77ae1e 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java
@@ -1,123 +1,25 @@
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.rest.entities.Schema;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
-import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer;
-
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
-public class SchemaRegistry {
- private final Map schemaVersions;
- private final Store kafkaStore;
- private final Serializer serializer;
+import io.confluent.kafka.schemaregistry.rest.entities.Schema;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
- public SchemaRegistry(SchemaRegistryConfig config, Serializer serializer) {
- this.serializer = serializer;
- schemaVersions = new HashMap();
- StringSerializer stringSerializer = new StringSerializer();
- kafkaStore = new KafkaStore(config,
- stringSerializer, this.serializer, new InMemoryStore());
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- }
- try {
- Iterator allSchemas = kafkaStore.getAll(null, null);
- while(allSchemas.hasNext()) {
- Schema schema = allSchemas.next();
- System.out.println("Applying schema " + schema.toString() + " to the schema version " +
- "cache");
- schemaVersions.put(schema.getName(), schema.getVersion());
- }
- } catch (StoreException e) {
- // TODO: throw meaningful exception
- e.printStackTrace();
- }
- System.out.println("Current contents of versionc cache" + schemaVersions.toString());
- }
+public interface SchemaRegistry {
- public int register(String topic, Schema schema) {
- int latestVersion = 0;
- if (schemaVersions.containsKey(topic)) {
- latestVersion = schemaVersions.get(topic);
- }
- int version = latestVersion + 1;
- String newKeyForLatestSchema = topic + "," + version;
- String keyForLatestSchema = topic + "," + latestVersion;
- Schema latestSchema = null;
- try {
- latestSchema = kafkaStore.get(keyForLatestSchema);
- } catch (StoreException e) {
- // TODO: Throw a meaningful exception
- e.printStackTrace();
- return -1;
- }
- if (isCompatible(topic, schema, latestSchema)) {
- try {
- schema.setVersion(version);
- System.out.println("Adding schema to the Kafka store: " + schema.toString());
- kafkaStore.put(newKeyForLatestSchema, schema);
- } catch (StoreException e) {
- e.printStackTrace();
- return -1;
- }
- }
- schemaVersions.put(topic, version);
- return version;
- }
+ int register(String topic, Schema schema) throws SchemaRegistryException;
- public Schema get(String topic, int version) {
- String key = topic + "," + version;
- Schema schema = null;
- try {
- schema = kafkaStore.get(key);
- } catch (StoreException e) {
- // TODO: Throw a meaningful exception
- e.printStackTrace();
- }
- return schema;
- }
+ Schema get(String topic, int version) throws SchemaRegistryException;
- public Set listTopics() {
- return schemaVersions.keySet();
- }
+ Set listTopics();
- public Iterator getAll(String topic) throws StoreException {
- int earliestVersion = 1;
- int latestVersion = 1;
- if (schemaVersions.containsKey(topic)) {
- latestVersion = schemaVersions.get(topic) + 1;
- }
- String keyEarliestVersion = topic + "," + earliestVersion;
- String keyLatestVersion = topic + "," + latestVersion;
- return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion);
- }
+ Iterator getAll(String topic) throws StoreException;
- public Iterator getAllVersions(String topic) throws StoreException {
- int earliestVersion = 1;
- int latestVersion = 1;
- if (schemaVersions.containsKey(topic)) {
- latestVersion = schemaVersions.get(topic) + 1;
- } else {
- System.err.println("Schema for " + topic + " does not exist in version cache. " +
- "Defaulting to version 1 as latest version");
- }
- String keyEarliestVersion = topic + "," + earliestVersion;
- String keyLatestVersion = topic + "," + latestVersion;
- System.out.println("Getting schemas between versions: " + earliestVersion + "," + latestVersion);
- return kafkaStore.getAll(keyEarliestVersion, keyLatestVersion);
- }
+ Iterator getAllVersions(String topic) throws StoreException;
- public boolean isCompatible(String topic, Schema schema1, Schema schema2) {
- return true;
- }
+ boolean isCompatible(String topic, Schema schema1, Schema schema2);
- public void close() {
- kafkaStore.close();
- }
+ void close();
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java
index e7b4479a15c..16b16e8ad17 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistryConfig.java
@@ -7,26 +7,27 @@
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
public class SchemaRegistryConfig extends KafkaStoreConfig {
- private static final ConfigDef config = new ConfigDef()
- .define(KAFKASTORE_CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
- KAFKASTORE_CONNECTION_URL_DOC)
- .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 10000, atLeast(0),
- ConfigDef.Importance.LOW, KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC)
- .define(KAFKASTORE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
- KAFKASTORE_TOPIC_DOC)
- .define(KAFKASTORE_TIMEOUT_CONFIG, ConfigDef.Type.INT, 500, atLeast(0),
- ConfigDef.Importance.MEDIUM, KAFKASTORE_TIMEOUT_DOC);
-
- public SchemaRegistryConfig(ConfigDef arg0, Map, ?> arg1) {
- super(arg0, arg1);
- }
-
- public SchemaRegistryConfig(Map extends Object, ? extends Object> props) {
- super(config, props);
- }
-
- public static void main(String[] args) {
- System.out.println(config.toHtmlTable());
- }
+
+ private static final ConfigDef config = new ConfigDef()
+ .define(KAFKASTORE_CONNECTION_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+ KAFKASTORE_CONNECTION_URL_DOC)
+ .define(KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 10000, atLeast(0),
+ ConfigDef.Importance.LOW, KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC)
+ .define(KAFKASTORE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
+ KAFKASTORE_TOPIC_DOC)
+ .define(KAFKASTORE_TIMEOUT_CONFIG, ConfigDef.Type.INT, 500, atLeast(0),
+ ConfigDef.Importance.MEDIUM, KAFKASTORE_TIMEOUT_DOC);
+
+ public SchemaRegistryConfig(ConfigDef arg0, Map, ?> arg1) {
+ super(arg0, arg1);
+ }
+
+ public SchemaRegistryConfig(Map extends Object, ? extends Object> props) {
+ super(config, props);
+ }
+
+ public static void main(String[] args) {
+ System.out.println(config.toHtmlTable());
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java
index 4b0a502f7b5..381770662f1 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/Store.java
@@ -1,32 +1,38 @@
/**
- *
+ *
*/
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-
import java.util.Iterator;
import java.util.Map;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
+
/**
* @author nnarkhed
- *
*/
-public interface Store {
- public void init() throws StoreInitializationException;
- public V get(K key) throws StoreException;
- public void put(K key, V value) throws StoreException;
- /**
- * Iterator over keys in the specified range
- * @param key1 If key1 is null, start from the first key in sorted order
- * @param key2 If key2 is null, end at the last key
- * @return Iterator over keys in range (key1, key2]. If both keys are null, return an iterator
- * over all keys in the database
- * @throws StoreException
- */
- public Iterator getAll(K key1, K key2) throws StoreException;
- public void putAll(Map entries) throws StoreException;
- public void delete(K key) throws StoreException;
- public void close();
+public interface Store {
+
+ public void init() throws StoreInitializationException;
+
+ public V get(K key) throws StoreException;
+
+ public void put(K key, V value) throws StoreException;
+
+ /**
+ * Iterator over keys in the specified range
+ *
+ * @param key1 If key1 is null, start from the first key in sorted order
+ * @param key2 If key2 is null, end at the last key
+ * @return Iterator over keys in range (key1, key2]. If both keys are null, return an iterator
+ * over all keys in the database
+ */
+ public Iterator getAll(K key1, K key2) throws StoreException;
+
+ public void putAll(Map entries) throws StoreException;
+
+ public void delete(K key) throws StoreException;
+
+ public void close();
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java
new file mode 100644
index 00000000000..a9c51b23c54
--- /dev/null
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SchemaRegistryException.java
@@ -0,0 +1,20 @@
+package io.confluent.kafka.schemaregistry.storage.exceptions;
+
+public class SchemaRegistryException extends Exception {
+
+ public SchemaRegistryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SchemaRegistryException(String message) {
+ super(message);
+ }
+
+ public SchemaRegistryException(Throwable cause) {
+ super(cause);
+ }
+
+ public SchemaRegistryException() {
+ super();
+ }
+}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java
new file mode 100644
index 00000000000..de664f72943
--- /dev/null
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/SerializationException.java
@@ -0,0 +1,20 @@
+package io.confluent.kafka.schemaregistry.storage.exceptions;
+
+public class SerializationException extends Exception {
+
+ public SerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SerializationException(String message) {
+ super(message);
+ }
+
+ public SerializationException(Throwable cause) {
+ super(cause);
+ }
+
+ public SerializationException() {
+ super();
+ }
+}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java
index 924466d325a..e97d5c0312e 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/exceptions/StoreInitializationException.java
@@ -2,8 +2,6 @@
/**
* Error while initializing a io.confluent.kafka.schemaregistry.storage.Store
- * @author nnarkhed
- *
*/
public class StoreInitializationException extends Exception {
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java
index 9faac455c3a..7eaeac07dc5 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/SchemaSerializer.java
@@ -1,43 +1,47 @@
package io.confluent.kafka.schemaregistry.storage.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import java.io.IOException;
import java.util.Map;
+import io.confluent.kafka.schemaregistry.rest.entities.Schema;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
+
public class SchemaSerializer implements Serializer {
- public SchemaSerializer() {
+ public SchemaSerializer() {
- }
+ }
- @Override public byte[] toBytes(Schema data) {
- try {
- return new ObjectMapper().writeValueAsBytes(data);
- } catch (IOException e) {
- // TODO: throw a SerializationException
- e.printStackTrace();
- return null;
- }
+ @Override
+ public byte[] toBytes(Schema data) throws SerializationException {
+ try {
+ return new ObjectMapper().writeValueAsBytes(data);
+ } catch (IOException e) {
+ throw new SerializationException("Error while serializing schema " + data.toString(),
+ e);
}
-
- @Override public Schema fromBytes(byte[] data) {
- Schema schema = null;
- try {
- schema = new ObjectMapper().readValue(data, Schema.class);
- } catch (IOException e) {
- // TODO: throw a SerializationException
- e.printStackTrace();
- }
- return schema;
+ }
+
+ @Override
+ public Schema fromBytes(byte[] data) throws SerializationException {
+ Schema schema = null;
+ try {
+ schema = new ObjectMapper().readValue(data, Schema.class);
+ } catch (IOException e) {
+ throw new SerializationException("Error while deserializing schema", e);
}
+ return schema;
+ }
- @Override public void close() {
+ @Override
+ public void close() {
- }
+ }
- @Override public void configure(Map stringMap) {
+ @Override
+ public void configure(Map stringMap) {
- }
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java
index d025fc1f3a4..2b47e478c4d 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/Serializer.java
@@ -15,27 +15,28 @@
import org.apache.kafka.common.Configurable;
+import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
+
/**
- *
- * @param Type to be serialized from.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
+ * @param Type to be serialized from. A class that implements this interface is expected to
+ * have a constructor with no parameter.
*/
public interface Serializer extends Configurable {
- /**
- * @param data Typed data
- * @return bytes of the serialized data
- */
- public byte[] toBytes(T data);
- /**
- * @param data Bytes
- * @return Typed deserialized data
- */
- public T fromBytes(byte[] data);
+ /**
+ * @param data Typed data
+ * @return bytes of the serialized data
+ */
+ public byte[] toBytes(T data) throws SerializationException;
+
+ /**
+ * @param data Bytes
+ * @return Typed deserialized data
+ */
+ public T fromBytes(byte[] data) throws SerializationException;
- /**
- * Close this serializer
- */
- public void close();
+ /**
+ * Close this serializer
+ */
+ public void close();
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java
index a790b4c6dda..1b7cb09acd2 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/StringSerializer.java
@@ -4,19 +4,23 @@
public class StringSerializer implements Serializer {
- @Override public byte[] toBytes(String data) {
- return data.getBytes();
- }
+ @Override
+ public byte[] toBytes(String data) {
+ return data.getBytes();
+ }
- @Override public String fromBytes(byte[] data) {
- return new String(data);
- }
+ @Override
+ public String fromBytes(byte[] data) {
+ return new String(data);
+ }
- @Override public void close() {
- // do nothing
- }
+ @Override
+ public void close() {
+ // do nothing
+ }
- @Override public void configure(Map stringMap) {
- // do nothing
- }
+ @Override
+ public void configure(Map stringMap) {
+ // do nothing
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java
index f4afc510adb..9b81b24d1da 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/storage/serialization/ZkStringSerializer.java
@@ -5,13 +5,16 @@
public class ZkStringSerializer implements ZkSerializer {
- @Override public byte[] serialize(Object o) throws ZkMarshallingError {
- return ((String) o).getBytes();
- }
+ @Override
+ public byte[] serialize(Object o) throws ZkMarshallingError {
+ return ((String) o).getBytes();
+ }
- @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError {
- if (bytes == null)
- return null;
- return new String(bytes);
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null) {
+ return null;
}
+ return new String(bytes);
+ }
}
diff --git a/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java b/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java
index 898c4bbd6d7..4b46f8be85d 100644
--- a/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java
+++ b/src/main/java/io/confluent/kafka/schemaregistry/utils/Pair.java
@@ -1,6 +1,6 @@
package io.confluent.kafka.schemaregistry.utils;
-public class Pair {
+public class Pair {
private K item1;
private V item2;
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
index 988f0f0ede7..08b69eca8cc 100644
--- a/src/main/resources/log4j.properties
+++ b/src/main/resources/log4j.properties
@@ -4,7 +4,9 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
-log4j.logger.kafka.server=INFO, stdout
-log4j.logger.kafka.consumer.ZookeeperConsumerConnector=INFO, stdout
+log4j.logger.kafka=OFF, stdout
+log4j.logger.org.apache.zookeeper=WARN, stdout
+log4j.logger.org.I0Itec.zkclient=WARN, stdout
+#log4j.logger.kafka.consumer.ZookeeperConsumerConnector=INFO, stdout
log4j.additivity.kafka.server=false
log4j.additivity.kafka.consumer.ZookeeperConsumerConnector=false
diff --git a/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java b/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java
index 7badf57cc55..32dccc717e8 100644
--- a/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java
+++ b/src/test/java/io/confluent/kafka/schemaregistry/ClusterTestHarness.java
@@ -15,6 +15,16 @@
*/
package io.confluent.kafka.schemaregistry;
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Vector;
+
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
@@ -22,106 +32,104 @@
import kafka.utils.Utils;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
-import org.I0Itec.zkclient.ZkClient;
-import org.eclipse.jetty.server.Server;
-import org.junit.After;
-import org.junit.Before;
import scala.collection.JavaConversions;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
/**
- * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially Kafka's
- * ZookeeperTestHarness and KafkaServerTestHarness traits combined and ported to Java with the addition
- * of the REST proxy. Defaults to a 1-ZK, 3-broker, 1 REST proxy cluster.
+ * Test harness to run against a real, local Kafka cluster and REST proxy. This is essentially
+ * Kafka's ZookeeperTestHarness and KafkaServerTestHarness traits combined and ported to Java with
+ * the addition of the REST proxy. Defaults to a 1-ZK, 3-broker, 1 REST proxy cluster.
*/
public abstract class ClusterTestHarness {
- public static final int DEFAULT_NUM_BROKERS = 3;
- // Shared config
- protected Queue ports;
+ public static final int DEFAULT_NUM_BROKERS = 3;
- // ZK Config
- protected int zkPort;
- protected String zkConnect;
- protected EmbeddedZookeeper zookeeper;
- protected ZkClient zkClient;
- protected int zkConnectionTimeout = 6000;
- protected int zkSessionTimeout = 6000;
+ // Shared config
+ protected Queue ports;
- // Kafka Config
- protected List configs = null;
- protected List servers = null;
- protected String brokerList = null;
+ // ZK Config
+ protected int zkPort;
+ protected String zkConnect;
+ protected EmbeddedZookeeper zookeeper;
+ protected ZkClient zkClient;
+ protected int zkConnectionTimeout = 6000;
+ protected int zkSessionTimeout = 6000;
- protected String bootstrapServers = null;
+ // Kafka Config
+ protected List configs = null;
+ protected List servers = null;
+ protected String brokerList = null;
- public ClusterTestHarness() {
- this(DEFAULT_NUM_BROKERS);
- }
+ protected String bootstrapServers = null;
- public ClusterTestHarness(int numBrokers) {
- // 1 port per broker + ZK + REST server
- this(numBrokers, numBrokers + 2);
- }
+ public ClusterTestHarness() {
+ this(DEFAULT_NUM_BROKERS);
+ }
- public ClusterTestHarness(int numBrokers, int numPorts) {
- ports = new ArrayDeque();
- for(Object portObj : JavaConversions.asJavaList(TestUtils.choosePorts(numPorts)))
- ports.add((Integer)portObj);
- zkPort = ports.remove();
- zkConnect = String.format("localhost:%d", zkPort);
-
- configs = new Vector();
- bootstrapServers = "";
- for(int i = 0; i < numBrokers; i++) {
- int port = ports.remove();
- Properties props = TestUtils.createBrokerConfig(i, port, false);
- // Turn auto creation *off*, unlike the default. This lets us test errors that should be generated when
- // brokers are configured that way.
- props.setProperty("auto.create.topics.enable", "true");
- props.setProperty("num.partitions", "1");
- // We *must* override this to use the port we allocated (Kafka currently allocates one port that it always
- // uses for ZK
- props.setProperty("zookeeper.connect", this.zkConnect);
- configs.add(new KafkaConfig(props));
-
- if (bootstrapServers.length() > 0)
- bootstrapServers += ",";
- bootstrapServers = bootstrapServers + "localhost:" + ((Integer)port).toString();
- }
- }
+ public ClusterTestHarness(int numBrokers) {
+ // 1 port per broker + ZK + REST server
+ this(numBrokers, numBrokers + 2);
+ }
- @Before
- public void setUp() throws Exception {
- zookeeper = new EmbeddedZookeeper(zkConnect);
- zkClient = new ZkClient(zookeeper.connectString(), zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer$.MODULE$);
-
- if(configs == null || configs.size() <= 0)
- throw new RuntimeException("Must supply at least one server config.");
- brokerList = TestUtils.getBrokerListStrFromConfigs(JavaConversions.asScalaIterable(configs).toSeq());
- servers = new Vector(configs.size());
- for(KafkaConfig config : configs) {
- KafkaServer server = TestUtils.createServer(config, SystemTime$.MODULE$);
- servers.add(server);
- }
+ public ClusterTestHarness(int numBrokers, int numPorts) {
+ ports = new ArrayDeque();
+ for (Object portObj : JavaConversions.asJavaList(TestUtils.choosePorts(numPorts))) {
+ ports.add((Integer) portObj);
}
+ zkPort = ports.remove();
+ zkConnect = String.format("localhost:%d", zkPort);
+
+ configs = new Vector();
+ bootstrapServers = "";
+ for (int i = 0; i < numBrokers; i++) {
+ int port = ports.remove();
+ Properties props = TestUtils.createBrokerConfig(i, port, false);
+ // Turn auto creation *off*, unlike the default. This lets us test errors that should be generated when
+ // brokers are configured that way.
+ props.setProperty("auto.create.topics.enable", "true");
+ props.setProperty("num.partitions", "1");
+ // We *must* override this to use the port we allocated (Kafka currently allocates one port that it always
+ // uses for ZK
+ props.setProperty("zookeeper.connect", this.zkConnect);
+ configs.add(new KafkaConfig(props));
+
+ if (bootstrapServers.length() > 0) {
+ bootstrapServers += ",";
+ }
+ bootstrapServers = bootstrapServers + "localhost:" + ((Integer) port).toString();
+ }
+ }
- @After
- public void tearDown() throws Exception {
- for(KafkaServer server: servers)
- server.shutdown();
- for(KafkaServer server: servers)
- for (String logDir : JavaConversions.asJavaCollection(server.config().logDirs()))
- Utils.rm(logDir);
+ @Before
+ public void setUp() throws Exception {
+ zookeeper = new EmbeddedZookeeper(zkConnect);
+ zkClient =
+ new ZkClient(zookeeper.connectString(), zkSessionTimeout, zkConnectionTimeout,
+ ZKStringSerializer$.MODULE$);
- zkClient.close();
- zookeeper.shutdown();
+ if (configs == null || configs.size() <= 0) {
+ throw new RuntimeException("Must supply at least one server config.");
+ }
+ brokerList =
+ TestUtils.getBrokerListStrFromConfigs(JavaConversions.asScalaIterable(configs).toSeq());
+ servers = new Vector(configs.size());
+ for (KafkaConfig config : configs) {
+ KafkaServer server = TestUtils.createServer(config, SystemTime$.MODULE$);
+ servers.add(server);
}
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (KafkaServer server : servers) {
+ server.shutdown();
+ }
+ for (KafkaServer server : servers) {
+ for (String logDir : JavaConversions.asJavaCollection(server.config().logDirs())) {
+ Utils.rm(logDir);
+ }
+ }
+
+ zkClient.close();
+ zookeeper.shutdown();
+ }
}
diff --git a/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java b/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java
index 88e5fecf70b..10b41eee4f0 100644
--- a/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java
+++ b/src/test/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.java
@@ -1,267 +1,283 @@
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.ClusterTestHarness;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Properties;
+import io.confluent.kafka.schemaregistry.ClusterTestHarness;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
+import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
+import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class KafkaStoreTest extends ClusterTestHarness {
- private String topic = "log";
- @Before
- public void setup() {
- System.out.println("Zk conn url = " + zkConnect);
- }
- @After
- public void teardown() {
- System.out.println("Shutting down");
- }
+ private static final Logger log = LoggerFactory.getLogger(KafkaStoreTest.class);
+
+ private String topic = "log";
- @Test
- public void testInitialization() {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, new InMemoryStore());
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
+ @Before
+ public void setup() {
+ log.debug("Zk conn url = " + zkConnect);
+ }
+
+ @After
+ public void teardown() {
+ log.debug("Shutting down");
+ }
+
+ @Test
+ public void testInitialization() {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ new InMemoryStore());
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
}
+ }
- @Test
- public void testIncorrectInitialization() {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, new InMemoryStore());
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- try {
- kafkaStore.init();
- fail("Kafka store repeated initialization should fail");
- } catch (StoreInitializationException e) {
- // this is expected
- }
+ @Test
+ public void testIncorrectInitialization() {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ new InMemoryStore());
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
}
+ try {
+ kafkaStore.init();
+ fail("Kafka store repeated initialization should fail");
+ } catch (StoreInitializationException e) {
+ // this is expected
+ }
+ }
- @Test
- public void testSimplePut() throws InterruptedException {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- Store inMemoryStore = new InMemoryStore();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- String key = "Kafka";
- String value = "Rocks";
- try {
- kafkaStore.put(key, value);
- } catch (StoreException e) {
- fail("Kafka store put(Kafka, Rocks) operation failed");
- }
- Thread.sleep(500);
- String retrievedValue = null;
- try {
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertEquals("Retrieved value should match entered value", value, retrievedValue);
- kafkaStore.close();
+ @Test
+ public void testSimplePut() throws InterruptedException {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ Store inMemoryStore = new InMemoryStore();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
+ }
+ String key = "Kafka";
+ String value = "Rocks";
+ try {
+ kafkaStore.put(key, value);
+ } catch (StoreException e) {
+ fail("Kafka store put(Kafka, Rocks) operation failed");
}
+ Thread.sleep(500);
+ String retrievedValue = null;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertEquals("Retrieved value should match entered value", value, retrievedValue);
+ kafkaStore.close();
+ }
- @Test
- public void testSimpleGetAfterFailure() throws InterruptedException {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- Store inMemoryStore = new InMemoryStore();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- String key = "Kafka";
- String value = "Rocks";
- try {
- kafkaStore.put(key, value);
- } catch (StoreException e) {
- fail("Kafka store put(Kafka, Rocks) operation failed");
- }
- Thread.sleep(1000);
- String retrievedValue = null;
- try {
- System.out.println("Reading written value from the kafka store");
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertEquals("Retrieved value should match entered value", value, retrievedValue);
- kafkaStore.close();
- inMemoryStore.close();
- // recreate kafka store
- kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer,
- inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- retrievedValue = null;
- try {
- System.out.println("Reading written value from the kafka store");
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertEquals("Retrieved value should match entered value", value, retrievedValue);
- kafkaStore.close();
- inMemoryStore.close();
+ @Test
+ public void testSimpleGetAfterFailure() throws InterruptedException {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ Store inMemoryStore = new InMemoryStore();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
+ }
+ String key = "Kafka";
+ String value = "Rocks";
+ try {
+ kafkaStore.put(key, value);
+ } catch (StoreException e) {
+ fail("Kafka store put(Kafka, Rocks) operation failed");
+ }
+ Thread.sleep(1000);
+ String retrievedValue = null;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertEquals("Retrieved value should match entered value", value, retrievedValue);
+ kafkaStore.close();
+ inMemoryStore.close();
+ // recreate kafka store
+ kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
}
+ retrievedValue = null;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertEquals("Retrieved value should match entered value", value, retrievedValue);
+ kafkaStore.close();
+ inMemoryStore.close();
+ }
- @Test
- public void testSimpleDelete() throws InterruptedException {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- Store inMemoryStore = new InMemoryStore();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- String key = "Kafka";
- String value = "Rocks";
- try {
- kafkaStore.put(key, value);
- } catch (StoreException e) {
- fail("Kafka store put(Kafka, Rocks) operation failed");
- }
- Thread.sleep(500);
- String retrievedValue = null;
- try {
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertEquals("Retrieved value should match entered value", value, retrievedValue);
- try {
- kafkaStore.delete(key);
- } catch (StoreException e) {
- fail("Kafka store delete(Kafka) operation failed");
- }
- Thread.sleep(500);
- // verify that value is deleted
- retrievedValue = value;
- try {
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertNull("Value should have been deleted", retrievedValue);
- kafkaStore.close();
+ @Test
+ public void testSimpleDelete() throws InterruptedException {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ Store inMemoryStore = new InMemoryStore();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
+ }
+ String key = "Kafka";
+ String value = "Rocks";
+ try {
+ kafkaStore.put(key, value);
+ } catch (StoreException e) {
+ fail("Kafka store put(Kafka, Rocks) operation failed");
+ }
+ Thread.sleep(500);
+ String retrievedValue = null;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertEquals("Retrieved value should match entered value", value, retrievedValue);
+ try {
+ kafkaStore.delete(key);
+ } catch (StoreException e) {
+ fail("Kafka store delete(Kafka) operation failed");
}
+ Thread.sleep(500);
+ // verify that value is deleted
+ retrievedValue = value;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertNull("Value should have been deleted", retrievedValue);
+ kafkaStore.close();
+ }
- @Test
- public void testDeleteAfterRestart() throws InterruptedException {
- Properties props = new Properties();
- props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
- props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
- KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
- StringSerializer stringSerializer = new StringSerializer();
- Store inMemoryStore = new InMemoryStore();
- KafkaStore kafkaStore = new KafkaStore(storeConfig,
- stringSerializer, stringSerializer, inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- String key = "Kafka";
- String value = "Rocks";
- try {
- kafkaStore.put(key, value);
- } catch (StoreException e) {
- fail("Kafka store put(Kafka, Rocks) operation failed");
- }
- Thread.sleep(1000);
- String retrievedValue = null;
- try {
- System.out.println("Reading written value from the kafka store");
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertEquals("Retrieved value should match entered value", value, retrievedValue);
- // delete the key
- try {
- kafkaStore.delete(key);
- } catch (StoreException e) {
- fail("Kafka store delete(Kafka) operation failed");
- }
- Thread.sleep(500);
- // verify that key is deleted
- retrievedValue = value;
- try {
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertNull("Value should have been deleted", retrievedValue);
- kafkaStore.close();
- inMemoryStore.close();
- // recreate kafka store
- kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer,
- inMemoryStore);
- try {
- kafkaStore.init();
- } catch (StoreInitializationException e) {
- fail("Kafka store failed to initialize");
- }
- // verify that key still doesn't exist in the store
- retrievedValue = value;
- try {
- retrievedValue = kafkaStore.get(key);
- } catch (StoreException e) {
- fail("Kafka store get(Kafka) operation failed");
- }
- assertNull("Value should have been deleted", retrievedValue);
- kafkaStore.close();
- inMemoryStore.close();
+ @Test
+ public void testDeleteAfterRestart() throws InterruptedException {
+ Properties props = new Properties();
+ props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
+ props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, topic);
+ KafkaStoreConfig storeConfig = new KafkaStoreConfig(props);
+ StringSerializer stringSerializer = new StringSerializer();
+ Store inMemoryStore = new InMemoryStore();
+ KafkaStore kafkaStore = new KafkaStore(storeConfig,
+ stringSerializer,
+ stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
+ }
+ String key = "Kafka";
+ String value = "Rocks";
+ try {
+ kafkaStore.put(key, value);
+ } catch (StoreException e) {
+ fail("Kafka store put(Kafka, Rocks) operation failed");
+ }
+ Thread.sleep(1000);
+ String retrievedValue = null;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertEquals("Retrieved value should match entered value", value, retrievedValue);
+ // delete the key
+ try {
+ kafkaStore.delete(key);
+ } catch (StoreException e) {
+ fail("Kafka store delete(Kafka) operation failed");
+ }
+ Thread.sleep(500);
+ // verify that key is deleted
+ retrievedValue = value;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
+ }
+ assertNull("Value should have been deleted", retrievedValue);
+ kafkaStore.close();
+ inMemoryStore.close();
+ // recreate kafka store
+ kafkaStore = new KafkaStore(storeConfig, stringSerializer, stringSerializer,
+ inMemoryStore);
+ try {
+ kafkaStore.init();
+ } catch (StoreInitializationException e) {
+ fail("Kafka store failed to initialize");
+ }
+ // verify that key still doesn't exist in the store
+ retrievedValue = value;
+ try {
+ retrievedValue = kafkaStore.get(key);
+ } catch (StoreException e) {
+ fail("Kafka store get(Kafka) operation failed");
}
+ assertNull("Value should have been deleted", retrievedValue);
+ kafkaStore.close();
+ inMemoryStore.close();
+ }
}
diff --git a/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java b/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java
index 38c8912024a..9cdbc65fa78 100644
--- a/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java
+++ b/src/test/java/io/confluent/kafka/schemaregistry/storage/RocksDbTest.java
@@ -1,15 +1,5 @@
package io.confluent.kafka.schemaregistry.storage;
-import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
-import io.confluent.kafka.schemaregistry.utils.TestUtils;
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
public class RocksDbTest {
// @Test
@@ -60,7 +50,6 @@ public class RocksDbTest {
// try {
// db = new RocksDbStore(config);
// } catch (StoreInitializationException e) {
-// // TODO Auto-generated catch block
// e.printStackTrace();
// fail("RocksDB store initialization failed");
// } finally {
@@ -73,7 +62,6 @@ public class RocksDbTest {
// db.put("Kafka".getBytes(), "rocks".getBytes());
// value = db.get("Kafka".getBytes());
// } catch (StoreException e) {
-// // TODO Auto-generated catch block
// e.printStackTrace();
// fail("RocksDB store put failed");
// }
diff --git a/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java b/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java
index 9aa094c3a93..90890ec2953 100644
--- a/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java
+++ b/src/test/java/io/confluent/kafka/schemaregistry/utils/TestUtils.java
@@ -4,9 +4,10 @@
import java.util.Random;
public class TestUtils {
+
static String IoTmpDir = System.getProperty("java.io.tmpdir");
static Random random = new Random();
-
+
/**
* Create a temporary directory
*/
@@ -14,7 +15,7 @@ public static File tempDir(String namePrefix) {
final File f = new File(IoTmpDir, namePrefix + "-" + random.nextInt(1000000));
f.mkdirs();
f.deleteOnExit();
-
+
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -23,18 +24,19 @@ public void run() {
});
return f;
}
-
+
/**
* Recursively delete the given file/directory and any subfiles (if any exist)
+ *
* @param file The root file at which to begin deleting
*/
public static void rm(File file) {
- if(file == null) {
+ if (file == null) {
return;
- } else if(file.isDirectory()) {
+ } else if (file.isDirectory()) {
File[] files = file.listFiles();
- if(files != null) {
- for(File f : files) {
+ if (files != null) {
+ for (File f : files) {
rm(f);
}
}