Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a java-friendly API #59

Open
BenFradet opened this issue Jun 11, 2017 · 18 comments
Open

Provide a java-friendly API #59

BenFradet opened this issue Jun 11, 2017 · 18 comments

Comments

@BenFradet
Copy link
Owner

BenFradet commented Jun 11, 2017

  • work on JavaDStream
  • not (?) rely on implicits
@BenFradet BenFradet added this to the 0.4.0 milestone Jun 11, 2017
@huylv
Copy link
Contributor

huylv commented Jun 15, 2017

Hi Ben. Can I use this lib in Java at the moment?

@BenFradet
Copy link
Owner Author

It should be possible instantiating your own Function1 like:

Function1<String, ProducerRecord<String, String>> f = new AbstractFunction1<String, ProducerRecord<String, String>() {
    public ProducerRecord<String, String> apply(String s) {
        return new ProducerRecord<String, String>(topic, s);
    }
};

but I think it's a bit clumsy that's why I want to provide an API which is more java friendly.

@huylv
Copy link
Contributor

huylv commented Jun 15, 2017

How do I call the writeToKafka() function? I need to write a JavaDStream to Kafka.

@BenFradet
Copy link
Owner Author

My bad didn't take into account the fact that you had a JavaDStream.

Can't you call dstream on your JavaDStream?

and then writeToKafka?

@BenFradet BenFradet changed the title Make the API more Java friendly Provide a java-friendly API Jun 15, 2017
@huylv
Copy link
Contributor

huylv commented Jun 15, 2017

I can call dstream from JavaDStream, but it doesn't provide .writeToKafka() function.

@BenFradet
Copy link
Owner Author

True, forgot about the implicits and Java.

You might want to try:

import static com.github.benfradet.spark.kafka010.write.dStreamToKafkaWriter
KafkaWriter<String> w = dStreamToKafkaWriter<String, String, String>(javaDStream.dStream());
w.writeToKafka(...);

@huylv
Copy link
Contributor

huylv commented Jun 15, 2017

This is the code I write. However, it complains that Function1 is not serializable:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("value.serializer", StringSerializer.class);

    KafkaWriter<String> kafkaWriter = new DStreamKafkaWriter<>(myStream.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class));
    Function1<String, ProducerRecord<String, String>> f = new AbstractFunction1<String, ProducerRecord<String, String>>() {
        @Override
        public ProducerRecord<String, String> apply(final String s) {
            return new ProducerRecord<>("my-topic", s);
        }
    };
    kafkaWriter.writeToKafka(props, f, Option.empty());

Here's the serialization stack trace:

Serialization stack:
    - object not serializable (class: Main$1, value: <function1>)
    - field (class: com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1, name: transformFunc$1, type: interface scala.Function1)
    - object (class com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 31 more

@BenFradet
Copy link
Owner Author

Have you tried having AbstractFunction1 extend Serializable:

Function1<String, ProducerRecord<String, String>> f = new AbstractFunction1<String, ProducerRecord<String, String>>() extends Serializable {
        @Override
        public ProducerRecord<String, String> apply(final String s) {
            return new ProducerRecord<>("my-topic", s);
        }
    };

?

@huylv
Copy link
Contributor

huylv commented Jun 15, 2017

Hey, thanks. It works now. I'll leave here my complete code with proper Java syntax for anyone who's interested in:

    abstract class MyFunc<T, U> extends AbstractFunction1<T, U> implements Serializable {}

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);

        KafkaWriter<String> kafkaWriter = new DStreamKafkaWriter<>(myStream.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class));
        Function1<String, ProducerRecord<String, String>> f = new MyFunc<String, ProducerRecord<String, String>>() {
            @Override
            public ProducerRecord<String, String> apply(final String s) {
                return new ProducerRecord<>("my-topic", s);
            }
        };
        kafkaWriter.writeToKafka(props, f, Option.empty());
    }

@BenFradet
Copy link
Owner Author

Great stuff 👍

Feel free to open a PR adding a section in the readme with your code

@huylv
Copy link
Contributor

huylv commented Jun 16, 2017

I've created pull request #60. Please have a look. Thanks.

@vipin-kumar-tomar
Copy link

Does it take care that ProducerRecord is created only once and used across all executors?

@BenFradet
Copy link
Owner Author

What do you mean? the procuder record is created in the function f so every time a message is sent a new producer record is created.

@vipin-kumar-tomar
Copy link

It is suggested on most of the links to use a single Producer across all executors. e.g. below link:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

@BenFradet
Copy link
Owner Author

Yeah, you wrote:

Does it take care that ProducerRecord is created only once and used across all executors?

that's what confused me.

To answer your question, only one Producer is created per executor since you can't share them across executors.

@BenFradet BenFradet modified the milestones: 0.5.0, 0.4.0 Jul 17, 2017
@sunone5
Copy link

sunone5 commented Oct 2, 2017

@huylv & @BenFradet

I have tried this Java code with following latest maven artifact.
https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.4.0
As well as previous one - https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.1.0
But following code doesn't allow me to compile with Java 1.8 and Eclipse Oxygen latest IDE.

----------------IMPORT SECTION--------------------

import com.github.benfradet.spark.kafka.writer.DStreamKafkaWriter;
import com.github.benfradet.spark.kafka.writer.KafkaWriter;
import org.apache.kafka.common.serialization.StringSerializer;
import scala.Function1;
import scala.Option;
import scala.Serializable;


        Map<String, Object> producerConfig = new HashMap<String, Object>();
        producerConfig.put("bootstrap.servers", "localhost:9092");
        producerConfig.put("key.serializer", StringSerializer.class);
        producerConfig.put("value.serializer", StringSerializer.class);
        
        KafkaWriter<String> kafkaWriter = new DStreamKafkaWriter<>(lines.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class));

        Function1<String, ProducerRecord<String, String>> f = new MyFunc<String, ProducerRecord<String, String>>() {
            @Override
            public ProducerRecord<String, String> apply(final String s) {
                return new ProducerRecord<>("my-topic", s);
            }
        };
        
        kafkaWriter.writeToKafka(producerConfig, f,Option.empty());

KafkaWriter.writerToKafka(producerConfig,f,Option.empty) --- Line gives me a following Error on Eclipse IDE
image

Any help appreciate.
https://stackoverflow.com/questions/46519554/how-to-write-spark-stream-dstream-javadstream-into-kafka

@BenFradet
Copy link
Owner Author

BenFradet commented Oct 2, 2017

You need to turn your java map into a scala one:

import scala.collection.JavaConverters._
// ...
kafkaWriter.writeToKafka(producerConfig.asScala(), f,Option.empty());

@rfmvlc
Copy link

rfmvlc commented Dec 19, 2017

Hello Ben,

It doesn't work on Java RDDs? -> @BenFradet work on JavaDStream..

@sunone5
@huylv

Do you have any working sample on Java please?

I've got a mismatch error in the function:

 required: scala.collection.immutable.Map<String,Object>,Function1<String,ProducerRecord<K,V>>,Option<Callback>
  found: scala.collection.mutable.Map<String,Object>,<anonymous SerializableFunc<String,ProducerRecord<String,String>>>,Option<Object>
  reason: no instance(s) of type variable(s) A,B exist so that scala.collection.mutable.Map<A,B> conforms to scala.collection.immutable.Map<String,Object>
  where K,V,T,A,B are type-variables:
    K extends Object declared in method <K,V>writeToKafka(scala.collection.immutable.Map<String,Object>,Function1<T,ProducerRecord<K,V>>,Option<Callback>)
    V extends Object declared in method <K,V>writeToKafka(scala.collection.immutable.Map<String,Object>,Function1<T,ProducerRecord<K,V>>,Option<Callback>)
    T extends Object declared in class KafkaWriter
    A extends Object declared in method <A,B>mapAsScalaMap(java.util.Map<A,B>)
    B extends Object declared in method <A,B>mapAsScalaMap(java.util.Map<A,B>)

with this sample code

imports//


import com.github.benfradet.spark.kafka.writer.DStreamKafkaWriter;
import com.github.benfradet.spark.kafka.writer.KafkaWriter;
import com.github.benfradet.spark.kafka.writer.RDDKafkaWriter;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Function1;
import scala.Option;
import scala.collection.JavaConversions;

import java.util.Calendar;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;


 KafkaWriter<String> kafkaWriter = new RDDKafkaWriter<>(lines.rdd(), scala.reflect.ClassTag$.MODULE$.apply(String.class));

        kafkaWriter.writeToKafka(JavaConversions.mapAsScalaMap(producerConfig),
                new SerializableFunc<String, ProducerRecord<String,String>>() {
                    @Override
                    public ProducerRecord<String, String> apply(final String s) {
                        return new ProducerRecord<>("sometopic", s);
                    }
                }
                ,
                Option.empty()
        );




import scala.Serializable;
import scala.runtime.AbstractFunction1;


abstract class SerializableFunc<T, R> extends AbstractFunction1<T, R> implements Serializable {}


Cheers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants