Hi, I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required? I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment. 1) this is concise 2) it operates on sparks off-heap memory representations (tungsten) to be faster In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink? How could I get the following example: val serializer = new JSONKeyValueDeserializationSchema(false) val stream = senv.addSource( new FlinkKafkaConsumer( "tweets-raw-json", serializer, properties ).setStartFromEarliest() // TODO experiment with different start values ) to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree. final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]]) Best, Georg |
You can try the Jackson ObjectMapper library and that will get you from json to object. Regards, Taher Koitawala On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
|
How can I use it with a scala case class? If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it? https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings. Best, Georg Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <[hidden email]>:
|
The performant way would be to apply a map function over the stream and then use the Jackson ObjectMapper to convert to scala objects. In flink there is no API like Spark to automatically get all fields. On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]> wrote:
|
Great. Thanks. But would it be possible to automate this i.e. to have this work automatically for the case class / product? Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <[hidden email]>:
|
Hi Georg, you can try using the circe library for this which has a way to automatically generate JSON decoders for scala case classes. As it was mentioned earlier, Flink does not come packaged with JSON-decoding generators for Scala like spark does. On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <[hidden email]> wrote:
|
Hi Georg,
I'm afraid the other suggestions are missing the point a bit. From your other emails it seems you want to use Kafka with JSON records together with the Table API/SQL. For that, take a look at [1] which describes how to define data sources for the Table API. Especially the Kafka and JSON sections should be relevant. That first link I mentioned is for the legacy connector API. There is a newer API with slightly different properties which will allow us to do the kinds of optimization like working on binary data throughout the stack: [2]. Unfortunately, there is no programmatic API yet, you would have to use `TableEnvironment.executeSql()` to execute SQL DDL that defines your sources. There is a FLIP for adding the programmatic API: [3] Best, Aljoscha [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API On 10.07.20 05:01, Aaron Levin wrote: > Hi Georg, you can try using the circe library for this which has a way to > automatically generate JSON decoders for scala case classes. > > As it was mentioned earlier, Flink does not come packaged with > JSON-decoding generators for Scala like spark does. > > On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <[hidden email]> > wrote: > >> Great. Thanks. >> But would it be possible to automate this i.e. to have this work >> automatically for the case class / product? >> >> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < >> [hidden email]>: >> >>> The performant way would be to apply a map function over the stream and >>> then use the Jackson ObjectMapper to convert to scala objects. In flink >>> there is no API like Spark to automatically get all fields. >>> >>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]> >>> wrote: >>> >>>> How can I use it with a scala case class? >>>> If I understand it correctly for better performance the Object Mapper is >>>> already initialized in each KafkaConsumer and returning ObjectNodes. So >>>> probably I should rephrase to: how can I then map these to case classes >>>> without handcoding it? https://github.com/json4s/json4s or >>>> https://github.com/FasterXML/jackson-module-scala both only seem to >>>> consume strings. >>>> >>>> Best, >>>> Georg >>>> >>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < >>>> [hidden email]>: >>>> >>>>> You can try the Jackson ObjectMapper library and that will get you from >>>>> json to object. >>>>> >>>>> Regards, >>>>> Taher Koitawala >>>>> >>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I want to map a stream of JSON documents from Kafka to a scala >>>>>> case-class. How can this be accomplished using the >>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes >>>>>> required? >>>>>> >>>>>> I have a Spark background. There, such manual mappings usually are >>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a >>>>>> type of assignment. >>>>>> 1) this is concise >>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to >>>>>> be faster >>>>>> >>>>>> In Flink, instead, such off-heap optimizations seem not to be talked >>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a >>>>>> reason why these optimizations are not necessary in Flink? >>>>>> >>>>>> >>>>>> How could I get the following example: >>>>>> val serializer = new JSONKeyValueDeserializationSchema(false) >>>>>> val stream = senv.addSource( >>>>>> new FlinkKafkaConsumer( >>>>>> "tweets-raw-json", >>>>>> serializer, >>>>>> properties >>>>>> ).setStartFromEarliest() // TODO experiment with different start >>>>>> values >>>>>> ) >>>>>> >>>>>> to map to this Tweet class concisely, i.e. without manually iterating >>>>>> through all the attribute fields and parsing the keys from the object node >>>>>> tree. >>>>>> >>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String], >>>>>> source: Option[String], geo: Option[String], place: Option[String], lang: >>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String], >>>>>> coordinates: Option[String], user_id: Option[Long], user_name: >>>>>> Option[String], screen_name: Option[String], user_created_at: >>>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long], >>>>>> user_lang: Option[String], user_location: Option[String], hashtags: >>>>>> Option[Seq[String]]) >>>>>> >>>>>> Best, >>>>>> Georg >>>>>> >>>>> > |
Hi, Many thanks. So do I understand correctly that: 1) similarly to spark the Table API works on some optimized binary representation 2) this is only available in the SQL way of interaction - there is no programmatic API This leads me then to some questions: q1) I have read somewhere (I think in some Flink Forward presentations) that the SQL API is not necessarily stable with regards to state - even with small changes to the DAG (due to optimization). So does this also /still apply to the table API? (I assume yes) q2) When I use the DataSet/Stream (classical scala/java) API it looks like I must create a custom serializer if I want to handle one/all of: - side-output failing records and not simply crash the job - as asked before automatic serialization to a scala (case) class q3) So as asked before: >>> But I also read that creating the ObjectMapper (i.e. in Jackson terms) inside the map function is not recommended. From Spark I know that there is a map-partitions
function, i.e. something where a database connection can be created and
then reused for the individua elements. Is a similar construct
available in Flink as well? >>> Also, I have read a
lot of articles and it looks like a lot of people are using the String
serializer and then manually parse the JSON which also seems inefficient. Where would I find an example for some Serializer with side outputs for failed records as well as efficient initialization using some similar construct to map-partitions? Best, Georg Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek <[hidden email]>: Hi Georg, |
On 11.07.20 10:31, Georg Heiler wrote:
> 1) similarly to spark the Table API works on some optimized binary > representation > 2) this is only available in the SQL way of interaction - there is no > programmatic API yes it's available from SQL, but also the Table API, which is a programmatic declarative API, similar to Spark's Structured Streaming. > q1) I have read somewhere (I think in some Flink Forward presentations) > that the SQL API is not necessarily stable with regards to state - even > with small changes to the DAG (due to optimization). So does this also > /still apply to the table API? (I assume yes) Yes, unfortunately this is correct. Because the Table API/SQL is declarative users don't have control over the DAG and the state that the operators have. Some work will happen on at least making sure that the optimizer stays stable between Flink versions or that we can let users pin a certain physical graph of a query so that it can be re-used across versions. > q2) When I use the DataSet/Stream (classical scala/java) API it looks like > I must create a custom serializer if I want to handle one/all of: > > - side-output failing records and not simply crash the job > - as asked before automatic serialization to a scala (case) class This is true, yes. > But I also read that creating the ObjectMapper (i.e. in Jackson terms) > inside the map function is not recommended. From Spark I know that there is > a map-partitions function, i.e. something where a database connection can > be created and then reused for the individua elements. Is a similar > construct available in Flink as well? Yes, for this you can use "rich functions", which have an open()/close() method that allows initializing and re-using resources across invocations: https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions > Also, I have read a lot of articles and it looks like a lot of people > are using the String serializer and then manually parse the JSON which also > seems inefficient. > Where would I find an example for some Serializer with side outputs for > failed records as well as efficient initialization using some similar > construct to map-partitions? I'm not aware of such examples, unfortunately. I hope that at least some answers will be helpful! Best, Aljoscha |
Many thanks! Am Mi., 15. Juli 2020 um 15:58 Uhr schrieb Aljoscha Krettek <[hidden email]>: On 11.07.20 10:31, Georg Heiler wrote: |
Free forum by Nabble | Edit this page |