Applying an void function to DataStream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Applying an void function to DataStream

Soheil Pourbafrani
Hi, I have a void function that takes a String, parse it and write it into Cassandra (Using pure java, not Flink Cassandra connector). Using Apache Flink Kafka connector, I've got some data into DataStream<String>. Now I want to apply Parse function to each message in DataStream<String>, but as the Parse function returns nothing (is void), I got the error 
no instance of type variable R exists so that void conforms to R

Is there any way to do such process using apache Flink?
Reply | Threaded
Open this post in threaded view
|

Re: Applying an void function to DataStream

Timo Walther
Hi Soheil,

Flink supports the type "java.lang.Void" which you can use in this case.

Regards,
Timo


Am 19.04.18 um 15:16 schrieb Soheil Pourbafrani:
> Hi, I have a void function that takes a String, parse it and write it
> into Cassandra (Using pure java, not Flink Cassandra connector). Using
> Apache Flink Kafka connector, I've got some data into
> DataStream<String>. Now I want to apply Parse function to each message
> in DataStream<String>, but as the Parse function returns nothing (is
> void), I got the error
> no instance of type variable R exists so that void conforms to R
>
> Is there any way to do such process using apache Flink?


Reply | Threaded
Open this post in threaded view
|

Re: Applying an void function to DataStream

Timo Walther
In your case a FlatMapFunction is better suited because it allows 0, 1 or more output.

It would look like this:

text.flatMap((FlatMapFunction<String, Void>) (value, out) -> parse(value));

Or with an anonymous class:

text.flatMap(new FlatMapFunction<String, Void>() {
   @Override
   public void flatMap(String value, Collector<Void> out) throws Exception {
      parse(value);
   }
});

Regards,
Timo

Am 19.04.18 um 15:26 schrieb Soheil Pourbafrani:
parse function is a static function from another class I've imported into the project.

On Thu, Apr 19, 2018 at 5:55 PM, Soheil Pourbafrani <[hidden email]> wrote:
Thanks, 
my map code is like this:
stream.map(x -> parse(x));
I can't get what you mean!
Something like the line below?
DataStream<java.lang.void> t = stream.map(x -> parse(x));
?

On Thu, Apr 19, 2018 at 5:49 PM, Timo Walther <[hidden email]> wrote:
Hi Soheil,

Flink supports the type "java.lang.Void" which you can use in this case.

Regards,
Timo


Am 19.04.18 um 15:16 schrieb Soheil Pourbafrani:

Hi, I have a void function that takes a String, parse it and write it into Cassandra (Using pure java, not Flink Cassandra connector). Using Apache Flink Kafka connector, I've got some data into DataStream<String>. Now I want to apply Parse function to each message in DataStream<String>, but as the Parse function returns nothing (is void), I got the error
no instance of type variable R exists so that void conforms to R

Is there any way to do such process using apache Flink?