Flink+avro integration

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink+avro integration

aawhitaker
This post was updated on .
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Márton Balassi
Hi Andrew,

1a,
In general Flink can read and write Avro data through the AvroInputFormat and AvroOutputtFormat in both the batch and the streaming API. In general you can write the following:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream dataStream = env.createInput(new AvroInputFormat(...));

//do something with dataSteam

dataStream.write(new AvroOutputFormat(...));

1b, 
When reading from Kafka you are expected to define a DeserializationSchema [1], in your case it is the only thing that you need to implement to get your topology running.

2,
The Scala Api uses the same function names presented in 1a, and accepts the java input and output format implementations.


I hope this clarifies the situation.

Best,

Marton

On Mon, Oct 19, 2015 at 4:21 PM, Andrew Whitaker <[hidden email]> wrote:
I'm doing some research on Flink + Avro integration, and I've come across "org.apache.flink.api.java.io.AvroInputFormat" as a way to create a stream of Avro objects from a file. I had the following questions:

1. Is this the extent of Flink's integration with Avro? If I wanted to read Avro-serialized objects from a Kafka stream, would I have to write something to do this or is this functionality already built somewhere?

2. Is there an analogous InputFormat in Flink's Scala API? If not, what's the recommended way to work with Avro objects in Scala using Flink?

Thanks,

--
Andrew Whitaker
[hidden email] | <a href="tel:540-521-5299" value="+15405215299" target="_blank">540-521-5299 | @andrewwhitaker

Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Stephan Ewen
Hi Andrew!

TL;DR There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add.

Here is a gist that basically does it. Let me know if that works for you, I'll add it to the Flink source then:


Greetings,
Stephan


On Mon, Oct 19, 2015 at 7:14 PM, Márton Balassi <[hidden email]> wrote:
Hi Andrew,

1a,
In general Flink can read and write Avro data through the AvroInputFormat and AvroOutputtFormat in both the batch and the streaming API. In general you can write the following:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream dataStream = env.createInput(new AvroInputFormat(...));

//do something with dataSteam

dataStream.write(new AvroOutputFormat(...));

1b, 
When reading from Kafka you are expected to define a DeserializationSchema [1], in your case it is the only thing that you need to implement to get your topology running.

2,
The Scala Api uses the same function names presented in 1a, and accepts the java input and output format implementations.


I hope this clarifies the situation.

Best,

Marton

On Mon, Oct 19, 2015 at 4:21 PM, Andrew Whitaker <[hidden email]> wrote:
I'm doing some research on Flink + Avro integration, and I've come across "org.apache.flink.api.java.io.AvroInputFormat" as a way to create a stream of Avro objects from a file. I had the following questions:

1. Is this the extent of Flink's integration with Avro? If I wanted to read Avro-serialized objects from a Kafka stream, would I have to write something to do this or is this functionality already built somewhere?

2. Is there an analogous InputFormat in Flink's Scala API? If not, what's the recommended way to work with Avro objects in Scala using Flink?

Thanks,

--
Andrew Whitaker
[hidden email] | <a href="tel:540-521-5299" value="+15405215299" target="_blank">540-521-5299 | @andrewwhitaker


Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

aawhitaker
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

aawhitaker
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Till Rohrmann
What was your problem with using Java POJOs with the Scala API? According to https://issues.apache.org/jira/browse/AVRO-1105, the progress on adding a Scala API to Avro is kind of stalling.

Cheers,
Till

On Tue, Oct 20, 2015 at 9:06 PM, aawhitaker <[hidden email]> wrote:
One more follow up:

There doesn't appear to be an official avro library for Scala. How would you
recommend integrating Avro in a Scala project? The most straightforward
option seems to be to just use the Java library, but Scala Flink operations
don't work on Java POJOs, as far as I could tell. Does that sound correct?
What would you recommend?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3187.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Stephan Ewen
@Andrew  Flink should work with Scala classes that follow the POJO style (public fields), so you should be able to use the Java Avro Library just like that.

If that does not work in your case, please file a bug report!

On Wed, Oct 21, 2015 at 9:41 AM, Till Rohrmann <[hidden email]> wrote:
What was your problem with using Java POJOs with the Scala API? According to https://issues.apache.org/jira/browse/AVRO-1105, the progress on adding a Scala API to Avro is kind of stalling.

Cheers,
Till

On Tue, Oct 20, 2015 at 9:06 PM, aawhitaker <[hidden email]> wrote:
One more follow up:

There doesn't appear to be an official avro library for Scala. How would you
recommend integrating Avro in a Scala project? The most straightforward
option seems to be to just use the Java library, but Scala Flink operations
don't work on Java POJOs, as far as I could tell. Does that sound correct?
What would you recommend?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3187.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

aawhitaker
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Stephan Ewen
This is actually not a bug, or a POJO or Avro problem. It is simply a limitation in the functionality, as the exception message says: "Specifying fields by name is only supported on Case Classes (for now)."

Try this with a regular reduce function that selects the max and it should work fine...

Greetings,
Stephan


On Wed, Oct 21, 2015 at 3:46 PM, aawhitaker <[hidden email]> wrote:
Till Rohrmann wrote
> What was your problem with using Java POJOs with the Scala API?

Here's a quick example
<https://gist.github.com/AndrewWhitaker/e51308bb4b43f7ddefc3>   that
demonstrates some of the problems I'm having. I used `max` in the example,
but actually I get an exception for most of the operations I try directly on
Java POJOs.

The "User" class referenced here is just the Avro example schema hydrated
into a Java POJO. I can post that or the entire project if it'd be helpful.

I included the stack trace of the exception in the gist, but I'll post it
here too:

Exception in thread "main" java.lang.UnsupportedOperationException:
Specifying fields by name is onlysupported on Case Classes (for now).
        at org.apache.flink.api.scala.package$.fieldNames2Indices(package.scala:62)
        at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:466)
        at org.apache.flink.api.scala.DataSet.max(DataSet.scala:503)
        at SampleAvroJob$.main(SampleAvroJob.scala:12)
        at SampleAvroJob.main(SampleAvroJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

And if I use field position instead of field name, I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Aggregating on field
positions is only possible on tuple data types.
        at
org.apache.flink.api.scala.operators.ScalaAggregateOperator.<init>(ScalaAggregateOperator.java:71)
        at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:455)
        at org.apache.flink.api.scala.DataSet.max(DataSet.scala:482)
        at SampleAvroJob$.main(SampleAvroJob.scala:12)
        at SampleAvroJob.main(SampleAvroJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

A workaround is to use `.map` to map to tuples first, but this seems a
little clunky.

Thanks!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3202.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

aawhitaker
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: Flink+avro integration

Till Rohrmann
In the Java API, we only support the `max` operation for tuple types where you reference the fields via indices.

Cheers,
Till

On Thu, Oct 22, 2015 at 4:04 PM, aawhitaker <[hidden email]> wrote:
Stephan Ewen wrote
> This is actually not a bug, or a POJO or Avro problem. It is simply a
> limitation in the functionality, as the exception message says:
> "Specifying
> fields by name is only supported on Case Classes (for now)."
>
> Try this with a regular reduce function that selects the max and it should
> work fine...

Thanks. Just to be sure, this is also a limitation of the Java
implementation of Flink, right?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3236.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.