Skip to content

Commit

Permalink
Fixed formatting to match the coding style. Added log4j and proper ex…
Browse files Browse the repository at this point in the history
…ception handling
  • Loading branch information
nehanarkhede committed Dec 17, 2014
1 parent 91908c9 commit 2146f46
Show file tree
Hide file tree
Showing 36 changed files with 1,549 additions and 1,236 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<javaxel.version>3.0.0</javaxel.version>
<kafka.version>0.8.2-beta</kafka.version>
<kafka.scala.version>2.10</kafka.scala.version>
<log4j.version>1.7.6</log4j.version>
<easymock.version>3.0</easymock.version>
<rocksdbjni.version>3.5.1</rocksdbjni.version>
<restutils.version>0.1-SNAPSHOT</restutils.version>
Expand Down Expand Up @@ -101,6 +102,11 @@
<artifactId>rest-utils</artifactId>
<version>${restutils.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
Expand Down
48 changes: 27 additions & 21 deletions src/main/java/io/confluent/kafka/schemaregistry/rest/Main.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
package io.confluent.kafka.schemaregistry.rest;

import io.confluent.rest.ConfigurationException;
import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import io.confluent.rest.ConfigurationException;

public class Main {
/**
* Starts an embedded Jetty server running the REST server.
*/
public static void main(String[] args) throws IOException {
try {
SchemaRegistryRestConfiguration config = new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null));
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
System.out.println("Server started, listening for requests...");
server.join();
} catch (ConfigurationException e) {
System.out.println("Server configuration failed: " + e.getMessage());
e.printStackTrace();
System.exit(1);
} catch (Exception e) {
System.err.println("Server died unexpectedly: " + e.toString());
e.printStackTrace();
System.exit(1);
}

private static final Logger log = LoggerFactory.getLogger(Main.class);

/**
* Starts an embedded Jetty server running the REST server.
*/
public static void main(String[] args) throws IOException {

try {
SchemaRegistryRestConfiguration config =
new SchemaRegistryRestConfiguration((args.length > 0 ? args[0] : null));
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
log.info("Server started, listening for requests...");
server.join();
} catch (ConfigurationException e) {
log.error("Server configuration failed: ", e);
System.exit(1);
} catch (Exception e) {
log.error("Server died unexpectedly: ", e);
System.exit(1);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,77 +1,76 @@
package io.confluent.kafka.schemaregistry.rest;

import io.confluent.rest.Configuration;
import io.confluent.rest.ConfigurationException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

import java.util.Properties;

public class SchemaRegistryApplicationConfig extends Configuration {
public Time time;

public boolean debug;
public static final String DEFAULT_DEBUG = "false";

public int port;
public static final String DEFAULT_PORT = "8080";

public String zookeeperConnect;
public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";

public String bootstrapServers;
public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";

public int producerThreads;
public static final String DEFAULT_PRODUCER_THREADS = "5";

/**
* The consumer timeout used to limit consumer iterator operations. This is effectively the maximum error for the
* entire request timeout. It should be small enough to get reasonably close to the timeout, but large enough to
* not result in busy waiting.
*/
public int consumerIteratorTimeoutMs;
public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50";

/** The maximum total time to wait for messages for a request if the maximum number of messages has not yet been reached. */
public int consumerRequestTimeoutMs;
public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";

/** The maximum number of messages returned in a single request. */
public int consumerRequestMaxMessages;
public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";

public int consumerThreads;
public static final String DEFAULT_CONSUMER_THREADS = "1";

/** Amount of idle time before a consumer instance is automatically destroyed. */
public int consumerInstanceTimeoutMs;
public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";

public SchemaRegistryApplicationConfig() throws ConfigurationException {
this(new Properties());
}

public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException {
time = new SystemTime();
import io.confluent.rest.Configuration;
import io.confluent.rest.ConfigurationException;

debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));
public class SchemaRegistryApplicationConfig extends Configuration {

port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
producerThreads = Integer.parseInt(props.getProperty("producer.threads",
DEFAULT_PRODUCER_THREADS));
consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
consumerRequestTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
"consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
DEFAULT_CONSUMER_THREADS));
consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
}
public static final String DEFAULT_DEBUG = "false";
public static final String DEFAULT_PORT = "8080";
public static final String DEFAULT_ZOOKEEPER_CONNECT = "localhost:2181";
public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
public static final String DEFAULT_PRODUCER_THREADS = "5";
public static final String DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS = "50";
public static final String DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS = "1000";
public static final String DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES = "100";
public static final String DEFAULT_CONSUMER_THREADS = "1";
public static final String DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS = "300000";
public Time time;
public boolean debug;
public int port;
public String zookeeperConnect;
public String bootstrapServers;
public int producerThreads;
/**
* The consumer timeout used to limit consumer iterator operations. This is effectively the
* maximum error for the entire request timeout. It should be small enough to get reasonably close
* to the timeout, but large enough to not result in busy waiting.
*/
public int consumerIteratorTimeoutMs;
/**
* The maximum total time to wait for messages for a request if the maximum number of messages has
* not yet been reached.
*/
public int consumerRequestTimeoutMs;
/**
* The maximum number of messages returned in a single request.
*/
public int consumerRequestMaxMessages;
public int consumerThreads;
/**
* Amount of idle time before a consumer instance is automatically destroyed.
*/
public int consumerInstanceTimeoutMs;

public SchemaRegistryApplicationConfig() throws ConfigurationException {
this(new Properties());
}

public SchemaRegistryApplicationConfig(Properties props) throws ConfigurationException {
time = new SystemTime();

debug = Boolean.parseBoolean(props.getProperty("debug", DEFAULT_DEBUG));

port = Integer.parseInt(props.getProperty("port", DEFAULT_PORT));
zookeeperConnect = props.getProperty("zookeeper.connect", DEFAULT_ZOOKEEPER_CONNECT);
bootstrapServers = props.getProperty("bootstrap.servers", DEFAULT_BOOTSTRAP_SERVERS);
producerThreads = Integer.parseInt(props.getProperty("producer.threads",
DEFAULT_PRODUCER_THREADS));
consumerIteratorTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.iterator.timeout.ms", DEFAULT_CONSUMER_ITERATOR_TIMEOUT_MS));
consumerRequestTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.request.timeout.ms", DEFAULT_CONSUMER_REQUEST_TIMEOUT_MS));
consumerRequestMaxMessages = Integer.parseInt(props.getProperty(
"consumer.request.max.messages", DEFAULT_CONSUMER_REQUEST_MAX_MESSAGES));
consumerThreads = Integer.parseInt(props.getProperty("consumer.threads",
DEFAULT_CONSUMER_THREADS));
consumerInstanceTimeoutMs = Integer.parseInt(props.getProperty(
"consumer.instance.timeout.ms", DEFAULT_CONSUMER_INSTANCE_TIMEOUT_MS));
}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package io.confluent.kafka.schemaregistry.rest;

import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

import javax.ws.rs.core.Configurable;

import io.confluent.kafka.schemaregistry.rest.resources.RootResource;
import io.confluent.kafka.schemaregistry.rest.resources.SchemasResource;
import io.confluent.kafka.schemaregistry.rest.resources.TopicsResource;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreConfig;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaSerializer;
import io.confluent.rest.Application;
import io.confluent.rest.ConfigurationException;

import javax.ws.rs.core.Configurable;
import java.util.Properties;

public class SchemaRegistryRestApplication extends Application<SchemaRegistryRestConfiguration> {
public SchemaRegistryRestApplication() throws ConfigurationException {

private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class);

public SchemaRegistryRestApplication() throws ConfigurationException {
this(new Properties());
}

Expand All @@ -31,11 +39,17 @@ public SchemaRegistryRestApplication(SchemaRegistryRestConfiguration config) {
public void setupResources(Configurable<?> config, SchemaRegistryRestConfiguration appConfig) {
Properties props = new Properties();
props.put(KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, "localhost:2181");
props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "schemaregistry");
SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props);
SchemaRegistry schemaRegistry = new SchemaRegistry(schemaRegistryConfig,
new SchemaSerializer());
config.register(RootResource.class);
props.put(KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, "_schemas");
SchemaRegistryConfig schemaRegistryConfig = new SchemaRegistryConfig(props);
SchemaRegistry schemaRegistry = null;
try {
schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig,
new SchemaSerializer());
} catch (SchemaRegistryException e) {
log.error("Error starting the schema registry", e);
System.exit(1);
}
config.register(RootResource.class);
config.register(new TopicsResource(schemaRegistry));
config.register(SchemasResource.class);
}
Expand Down
Loading

0 comments on commit 2146f46

Please sign in to comment.