map JSON to scala case class & off-heap optimization

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

map JSON to scala case class & off-heap optimization

Georg Heiler
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

taher koitawala-2
You can try the Jackson ObjectMapper library and that will get you from json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Georg Heiler
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it?  https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <[hidden email]>:
You can try the Jackson ObjectMapper library and that will get you from json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

taher koitawala-2
The performant way would be to apply a map function over the stream and then use the Jackson ObjectMapper to convert to scala objects. In flink there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]> wrote:
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it?  https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <[hidden email]>:
You can try the Jackson ObjectMapper library and that will get you from json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Georg Heiler
Great. Thanks.
But would it be possible to automate this i.e. to have this work automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <[hidden email]>:
The performant way would be to apply a map function over the stream and then use the Jackson ObjectMapper to convert to scala objects. In flink there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]> wrote:
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it?  https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <[hidden email]>:
You can try the Jackson ObjectMapper library and that will get you from json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Aaron Levin
Hi Georg, you can try using the circe library for this which has a way to automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <[hidden email]> wrote:
Great. Thanks.
But would it be possible to automate this i.e. to have this work automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <[hidden email]>:
The performant way would be to apply a map function over the stream and then use the Jackson ObjectMapper to convert to scala objects. In flink there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]> wrote:
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it?  https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <[hidden email]>:
You can try the Jackson ObjectMapper library and that will get you from json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]> wrote:
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class. How can this be accomplished using the JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are discouraged. Instead, they offer a nice API (dataset API) to perform such a type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be faster

In Flink, instead, such off-heap optimizations seem not to be talked much about (sorry if I miss something, I am a Flink newbie). Is there a reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
    new FlinkKafkaConsumer(
      "tweets-raw-json",
      serializer,
      properties
    ).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating through all the attribute fields and parsing the keys from the object node tree.

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Aljoscha Krettek
Hi Georg,

I'm afraid the other suggestions are missing the point a bit. From your
other emails it seems you want to use Kafka with JSON records together
with the Table API/SQL. For that, take a look at [1] which describes how
to define data sources for the Table API. Especially the Kafka and JSON
sections should be relevant.

That first link I mentioned is for the legacy connector API. There is a
newer API with slightly different properties which will allow us to do
the kinds of optimization like working on binary data throughout the
stack: [2]. Unfortunately, there is no programmatic API yet, you would
have to use `TableEnvironment.executeSql()` to execute SQL DDL that
defines your sources. There is a FLIP for adding the programmatic API: [3]

Best,
Aljoscha

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API

On 10.07.20 05:01, Aaron Levin wrote:

> Hi Georg, you can try using the circe library for this which has a way to
> automatically generate JSON decoders for scala case classes.
>
> As it was mentioned earlier, Flink does not come packaged with
> JSON-decoding generators for Scala like spark does.
>
> On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <[hidden email]>
> wrote:
>
>> Great. Thanks.
>> But would it be possible to automate this i.e. to have this work
>> automatically for the case class / product?
>>
>> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
>> [hidden email]>:
>>
>>> The performant way would be to apply a map function over the stream and
>>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>>> there is no API like Spark to automatically get all fields.
>>>
>>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]>
>>> wrote:
>>>
>>>> How can I use it with a scala case class?
>>>> If I understand it correctly for better performance the Object Mapper is
>>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>>> probably I should rephrase to: how can I then map these to case classes
>>>> without handcoding it?  https://github.com/json4s/json4s or
>>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>>> consume strings.
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>>> [hidden email]>:
>>>>
>>>>> You can try the Jackson ObjectMapper library and that will get you from
>>>>> json to object.
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>>>> case-class. How can this be accomplished using the
>>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>>>> required?
>>>>>>
>>>>>> I have a Spark background. There, such manual mappings usually are
>>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>>>> type of assignment.
>>>>>> 1) this is concise
>>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>>>> be faster
>>>>>>
>>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>>>> reason why these optimizations are not necessary in Flink?
>>>>>>
>>>>>>
>>>>>> How could I get the following example:
>>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>>>> val stream = senv.addSource(
>>>>>>      new FlinkKafkaConsumer(
>>>>>>        "tweets-raw-json",
>>>>>>        serializer,
>>>>>>        properties
>>>>>>      ).setStartFromEarliest() // TODO experiment with different start
>>>>>> values
>>>>>>    )
>>>>>>
>>>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>>>> through all the attribute fields and parsing the keys from the object node
>>>>>> tree.
>>>>>>
>>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>>>> Option[String], screen_name: Option[String], user_created_at:
>>>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>>>> Option[Seq[String]])
>>>>>>
>>>>>> Best,
>>>>>> Georg
>>>>>>
>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Georg Heiler
Hi,


Many thanks.
So do I understand correctly that:

1) similarly to spark the Table API works on some optimized binary representation
2) this is only available in the SQL way of interaction - there is no programmatic API

This leads me then to some questions:

q1) I have read somewhere (I think in some Flink Forward presentations) that the SQL API is not necessarily stable with regards to state - even with small changes to the DAG (due to optimization). So does this also /still apply to the table API? (I assume yes)
q2) When I use the DataSet/Stream (classical scala/java) API it looks like I must create a custom serializer if I want to handle one/all of:

  - side-output failing records and not simply crash the job
  - as asked before automatic serialization to a scala (case) class

q3)
So as asked before:
>>> But I also read that creating the ObjectMapper (i.e. in Jackson terms) inside the map function is not recommended. From Spark I know that there is a map-partitions function, i.e. something where a database connection can be created and then reused for the individua elements. Is a similar construct available in Flink as well?
>>> Also, I have read a lot of articles and it looks like a lot of people are using the String serializer and then manually parse the JSON which also seems inefficient.
Where would I find an example for some Serializer with side outputs for failed records as well as efficient initialization using some similar construct to map-partitions?

Best,
Georg

Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek <[hidden email]>:
Hi Georg,

I'm afraid the other suggestions are missing the point a bit. From your
other emails it seems you want to use Kafka with JSON records together
with the Table API/SQL. For that, take a look at [1] which describes how
to define data sources for the Table API. Especially the Kafka and JSON
sections should be relevant.

That first link I mentioned is for the legacy connector API. There is a
newer API with slightly different properties which will allow us to do
the kinds of optimization like working on binary data throughout the
stack: [2]. Unfortunately, there is no programmatic API yet, you would
have to use `TableEnvironment.executeSql()` to execute SQL DDL that
defines your sources. There is a FLIP for adding the programmatic API: [3]

Best,
Aljoscha

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API

On 10.07.20 05:01, Aaron Levin wrote:
> Hi Georg, you can try using the circe library for this which has a way to
> automatically generate JSON decoders for scala case classes.
>
> As it was mentioned earlier, Flink does not come packaged with
> JSON-decoding generators for Scala like spark does.
>
> On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <[hidden email]>
> wrote:
>
>> Great. Thanks.
>> But would it be possible to automate this i.e. to have this work
>> automatically for the case class / product?
>>
>> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
>> [hidden email]>:
>>
>>> The performant way would be to apply a map function over the stream and
>>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>>> there is no API like Spark to automatically get all fields.
>>>
>>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <[hidden email]>
>>> wrote:
>>>
>>>> How can I use it with a scala case class?
>>>> If I understand it correctly for better performance the Object Mapper is
>>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>>> probably I should rephrase to: how can I then map these to case classes
>>>> without handcoding it?  https://github.com/json4s/json4s or
>>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>>> consume strings.
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>>> [hidden email]>:
>>>>
>>>>> You can try the Jackson ObjectMapper library and that will get you from
>>>>> json to object.
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>>>> case-class. How can this be accomplished using the
>>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>>>> required?
>>>>>>
>>>>>> I have a Spark background. There, such manual mappings usually are
>>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>>>> type of assignment.
>>>>>> 1) this is concise
>>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>>>> be faster
>>>>>>
>>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>>>> reason why these optimizations are not necessary in Flink?
>>>>>>
>>>>>>
>>>>>> How could I get the following example:
>>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>>>> val stream = senv.addSource(
>>>>>>      new FlinkKafkaConsumer(
>>>>>>        "tweets-raw-json",
>>>>>>        serializer,
>>>>>>        properties
>>>>>>      ).setStartFromEarliest() // TODO experiment with different start
>>>>>> values
>>>>>>    )
>>>>>>
>>>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>>>> through all the attribute fields and parsing the keys from the object node
>>>>>> tree.
>>>>>>
>>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>>>> Option[String], screen_name: Option[String], user_created_at:
>>>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>>>> Option[Seq[String]])
>>>>>>
>>>>>> Best,
>>>>>> Georg
>>>>>>
>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Aljoscha Krettek
On 11.07.20 10:31, Georg Heiler wrote:
> 1) similarly to spark the Table API works on some optimized binary
> representation
> 2) this is only available in the SQL way of interaction - there is no
> programmatic API

yes it's available from SQL, but also the Table API, which is a
programmatic declarative API, similar to Spark's Structured Streaming.


> q1) I have read somewhere (I think in some Flink Forward presentations)
> that the SQL API is not necessarily stable with regards to state - even
> with small changes to the DAG (due to optimization). So does this also
> /still apply to the table API? (I assume yes)

Yes, unfortunately this is correct. Because the Table API/SQL is
declarative users don't have control over the DAG and the state that the
operators have. Some work will happen on at least making sure that the
optimizer stays stable between Flink versions or that we can let users
pin a certain physical graph of a query so that it can be re-used across
versions.

> q2) When I use the DataSet/Stream (classical scala/java) API it looks like
> I must create a custom serializer if I want to handle one/all of:
>
>    - side-output failing records and not simply crash the job
>    - as asked before automatic serialization to a scala (case) class

This is true, yes.

> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> inside the map function is not recommended. From Spark I know that there is
> a map-partitions function, i.e. something where a database connection can
> be created and then reused for the individua elements. Is a similar
> construct available in Flink as well?

Yes, for this you can use "rich functions", which have an open()/close()
method that allows initializing and re-using resources across
invocations:
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions

> Also, I have read a lot of articles and it looks like a lot of people
> are using the String serializer and then manually parse the JSON which also
> seems inefficient.
> Where would I find an example for some Serializer with side outputs for
> failed records as well as efficient initialization using some similar
> construct to map-partitions?

I'm not aware of such examples, unfortunately.

I hope that at least some answers will be helpful!

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: map JSON to scala case class & off-heap optimization

Georg Heiler
Many thanks!

Am Mi., 15. Juli 2020 um 15:58 Uhr schrieb Aljoscha Krettek <[hidden email]>:
On 11.07.20 10:31, Georg Heiler wrote:
> 1) similarly to spark the Table API works on some optimized binary
> representation
> 2) this is only available in the SQL way of interaction - there is no
> programmatic API

yes it's available from SQL, but also the Table API, which is a
programmatic declarative API, similar to Spark's Structured Streaming.


> q1) I have read somewhere (I think in some Flink Forward presentations)
> that the SQL API is not necessarily stable with regards to state - even
> with small changes to the DAG (due to optimization). So does this also
> /still apply to the table API? (I assume yes)

Yes, unfortunately this is correct. Because the Table API/SQL is
declarative users don't have control over the DAG and the state that the
operators have. Some work will happen on at least making sure that the
optimizer stays stable between Flink versions or that we can let users
pin a certain physical graph of a query so that it can be re-used across
versions.

> q2) When I use the DataSet/Stream (classical scala/java) API it looks like
> I must create a custom serializer if I want to handle one/all of:
>
>    - side-output failing records and not simply crash the job
>    - as asked before automatic serialization to a scala (case) class

This is true, yes.

> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> inside the map function is not recommended. From Spark I know that there is
> a map-partitions function, i.e. something where a database connection can
> be created and then reused for the individua elements. Is a similar
> construct available in Flink as well?

Yes, for this you can use "rich functions", which have an open()/close()
method that allows initializing and re-using resources across
invocations:
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions

> Also, I have read a lot of articles and it looks like a lot of people
> are using the String serializer and then manually parse the JSON which also
> seems inefficient.
> Where would I find an example for some Serializer with side outputs for
> failed records as well as efficient initialization using some similar
> construct to map-partitions?

I'm not aware of such examples, unfortunately.

I hope that at least some answers will be helpful!

Best,
Aljoscha