Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Zookeeper dependency entirely #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

cbsmith
Copy link

@cbsmith cbsmith commented Nov 24, 2019

This pull request updates to the newest KafkaClients, which provide full 1.x and 2.x compatibility, and also removes a dependency on Zookeeper (which is planned for deprecation).

Christopher Smith added 5 commits November 23, 2019 21:49
@arsiesys
Copy link
Owner

Hi @cbsmith,
Thanks! I am going to try it.
I missed the notification.. sorry for the delay.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
Copy link
Owner

@arsiesys arsiesys Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this lib is not used. the org.graylog2 parent depency also include the artifact "org.apache.kafka/kafka_2.11" and it seems that it is the one used instead of the one added.

I tried the following:

        <dependency>
            <groupId>org.graylog2</groupId>
            <artifactId>graylog2-server</artifactId>
            <version>${graylog.version}</version>
            <scope>provided</scope>
            <exclusions>
              <exclusion>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka_2.11</artifactId>
              </exclusion>
            </exclusions>
        </dependency>

In this case, we use the new library but the build fail with some errors:

/home/xxx/git/graylog-kafka-plugin-test/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java:31: error: package kafka.consumer does not exist
import kafka.consumer.ConsumerTimeoutException;
                     ^
/home/xxx/git/graylog-kafka-plugin-test/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java:58: error: package scala.collection does not exist
import scala.collection.JavaConversions;

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made these test and was able to build it:

diff --git a/pom.xml b/pom.xml
index 67ef80d..611023e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,12 @@
             <artifactId>graylog2-server</artifactId>
             <version>${graylog.version}</version>
             <scope>provided</scope>
+            <exclusions>
+              <exclusion>
+                 <groupId>org.apache.kafka</groupId>
+                 <artifactId>kafka_2.11</artifactId>
:...skipping...
diff --git a/pom.xml b/pom.xml
index 67ef80d..611023e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,12 @@
             <artifactId>graylog2-server</artifactId>
             <version>${graylog.version}</version>
             <scope>provided</scope>
+            <exclusions>
+              <exclusion>
+                 <groupId>org.apache.kafka</groupId>
+                 <artifactId>kafka_2.11</artifactId>
+              </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.google.auto.value</groupId>
@@ -52,7 +58,7 @@
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
-         <version>2.3.1</version>
+         <version>2.4.0</version>
        </dependency>
     </dependencies>
 
diff --git a/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java b/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java
index 3e6ef8d..5628ee0 100644
--- a/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java
+++ b/src/main/java/org/graylog/inputs/kafkanew/KafkaNewTransport.java
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
-import kafka.consumer.ConsumerTimeoutException;
+//import kafka.consumer.ConsumerTimeoutException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -55,7 +55,7 @@ import org.graylog2.plugin.lifecycles.Lifecycle;
 import org.graylog2.plugin.system.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+//import scala.collection.JavaConversions;
 
 import javax.inject.Named;
 import java.util.Iterator;
@@ -289,9 +289,9 @@ public class KafkaNewTransport extends ThrottleableTransport {
                                 input.processRawMessage(rawMessage);
                             }
 
-                        } catch (ConsumerTimeoutException e) {
+//                        } catch (ConsumerTimeoutException e) {
                             // Happens when there is nothing to consume, retry to check again.
-                            retry = true;
+//                            retry = true;
                         } catch (Exception e) {
                             LOG.error("Kafka consumer error, stopping consumer thread.", e);
                         }

However, it still use the old version:

2020-01-10 20:42:59,147 INFO : org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.1
2020-01-10 20:42:59,147 INFO : org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06

I assume it used the one compiled with graylog :(.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried to create a uber shade with maven shade to include the lib and rename it but an issue with the Deseriazlizer:

2020-01-10 23:03:35,964 ERROR: org.graylog2.shared.inputs.InputLauncher - The [org.graylog2.inputs.kafkanew.SyslogKafkaInputNew] input with ID <5d0d1a6d578c75002855240c> misfired. Reason: Invalid value org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
org.graylog2.plugin.inputs.MisfireException: org.shaded.apache.kafka.common.config.ConfigException: Invalid value org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
	at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:158) ~[graylog.jar:?]
	at org.graylog2.shared.inputs.InputLauncher$1.run(InputLauncher.java:84) [graylog.jar:?]
	at com.codahale.metrics.InstrumentedExecutorService$InstrumentedRunnable.run(InstrumentedExecutorService.java:181) [graylog.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
Caused by: org.shaded.apache.kafka.common.config.ConfigException: Invalid value org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.shaded.apache.kafka.common.serialization.ByteArrayDeserializer could not be found.
	at org.shaded.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[?:?]
	at org.shaded.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[?:?]
	at org.shaded.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[?:?]
	at org.shaded.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[?:?]
	at org.shaded.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[?:?]
	at org.shaded.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:539) ~[?:?]
	at org.shaded.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[?:?]
	at org.shaded.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[?:?]
	at org.graylog2.inputs.kafkanew.KafkaNewTransport.doLaunch(KafkaNewTransport.java:232) ~[?:?]
	at org.graylog2.plugin.inputs.transports.ThrottleableTransport.launch(ThrottleableTransport.java:76) ~[graylog.jar:?]
	at org.graylog2.plugin.inputs.MessageInput.launch(MessageInput.java:155) ~[graylog.jar:?]
	... 7 more
2020-01-10 23:03:35,986 INFO : org.graylog2.inputs.InputStateListener - Input [Syslog Kafka New/5d0d1a6d578c75002855240c] is now FAILED

I assume that the created thread is "out" of the context and don't access the class.

So does the last solution would be to compile graylog with the last kafka library ? :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants