This post was updated on .
CONTENTS DELETED
The author has deleted this message.
|
Hi Andrew, 1a, In general Flink can read and write Avro data through the AvroInputFormat and AvroOutputtFormat in both the batch and the streaming API. In general you can write the following: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env.createInput(new AvroInputFormat(...)); //do something with dataSteam dataStream.write(new AvroOutputFormat(...)); 1b, When reading from Kafka you are expected to define a DeserializationSchema [1], in your case it is the only thing that you need to implement to get your topology running. 2, The Scala Api uses the same function names presented in 1a, and accepts the java input and output format implementations. I hope this clarifies the situation. Best, Marton On Mon, Oct 19, 2015 at 4:21 PM, Andrew Whitaker <[hidden email]> wrote:
|
Hi Andrew! TL;DR There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add. Here is a gist that basically does it. Let me know if that works for you, I'll add it to the Flink source then: Greetings, Stephan On Mon, Oct 19, 2015 at 7:14 PM, Márton Balassi <[hidden email]> wrote:
|
CONTENTS DELETED
The author has deleted this message.
|
CONTENTS DELETED
The author has deleted this message.
|
What was your problem with using Java POJOs with the Scala API? According to https://issues.apache.org/jira/browse/AVRO-1105, the progress on adding a Scala API to Avro is kind of stalling. Cheers, Till On Tue, Oct 20, 2015 at 9:06 PM, aawhitaker <[hidden email]> wrote: One more follow up: |
@Andrew Flink should work with Scala classes that follow the POJO style (public fields), so you should be able to use the Java Avro Library just like that. If that does not work in your case, please file a bug report! On Wed, Oct 21, 2015 at 9:41 AM, Till Rohrmann <[hidden email]> wrote:
|
CONTENTS DELETED
The author has deleted this message.
|
This is actually not a bug, or a POJO or Avro problem. It is simply a limitation in the functionality, as the exception message says: "Specifying fields by name is only supported on Case Classes (for now)." Try this with a regular reduce function that selects the max and it should work fine... Greetings, Stephan On Wed, Oct 21, 2015 at 3:46 PM, aawhitaker <[hidden email]> wrote: Till Rohrmann wrote |
CONTENTS DELETED
The author has deleted this message.
|
In the Java API, we only support the `max` operation for tuple types where you reference the fields via indices. Cheers, Till On Thu, Oct 22, 2015 at 4:04 PM, aawhitaker <[hidden email]> wrote: Stephan Ewen wrote |
Free forum by Nabble | Edit this page |