DisableGenericTypes is not compatible with Kafka

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

DisableGenericTypes is not compatible with Kafka

Oleksandr Nitavskyi

Hi guys,

 

We have encountered on some issue related with possibility to ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as idea to ensure that nobody introduce some random change which penalize the performance of the job.

 

The issue we have encountered is that Flink’s KafkaSource is storing KafkaTopicPartition in the state for offset recovery, which is serialized with Kryo.

For sure this feature itself is not penalizing performance, but looks like it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also on the side of Flink user there is no good tool to add KafkaTopicPartition’s non-Kryo type information.

 

On of the related tickets I have found: https://issues.apache.org/jira/browse/FLINK-12031

 

Do you know any workaround to ‘disableGenericType’ in case of KafkaSources or what do you think making some development to address this issue?

 

Kind Regards

Oleksandr

 

Reply | Threaded
Open this post in threaded view
|

Re: DisableGenericTypes is not compatible with Kafka

Guowei Ma
Hi,
I think there could be two workaround ways to 'disableGenericType' in case of KafkaSource :
1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
2. using the reflection to call the private method. :)

Maybe we could add this TypeInfo annotation to the KafakaConnector.  


Oleksandr Nitavskyi <[hidden email]> 于2020年1月31日周五 上午12:40写道:

Hi guys,

 

We have encountered on some issue related with possibility to ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as idea to ensure that nobody introduce some random change which penalize the performance of the job.

 

The issue we have encountered is that Flink’s KafkaSource is storing KafkaTopicPartition in the state for offset recovery, which is serialized with Kryo.

For sure this feature itself is not penalizing performance, but looks like it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also on the side of Flink user there is no good tool to add KafkaTopicPartition’s non-Kryo type information.

 

On of the related tickets I have found: https://issues.apache.org/jira/browse/FLINK-12031

 

Do you know any workaround to ‘disableGenericType’ in case of KafkaSources or what do you think making some development to address this issue?

 

Kind Regards

Oleksandr

 

Reply | Threaded
Open this post in threaded view
|

Re: DisableGenericTypes is not compatible with Kafka

Aljoscha Krettek
Unfortunately, the fact that the Kafka Sources use Kryo for state
serialization is a very early design misstep that we cannot get rid of
for now. We will get rid of that when the new source interface lands
([1]) and when we have a new Kafka Source based on that.

As a workaround, we should change the Kafka Consumer to go through a
different constructor of ListStateDescriptor which directly takes a
TypeSerializer instead of a TypeInformation here: [2]. This should
sidestep the "no generic types" check.

I created a Jira Issue for this:
https://issues.apache.org/jira/browse/FLINK-15904

Best,
Aljoscha

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[2]
https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860

On 01.02.20 09:44, Guowei Ma wrote:

> Hi,
> I think there could be two workaround ways to 'disableGenericType' in case
> of KafkaSource :
> 1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
> 2. using the reflection to call the private method. :)
>
> Maybe we could add this TypeInfo annotation to the KafakaConnector.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#defining-type-information-using-a-factory
>
> Best,
> Guowei
>
>
> Oleksandr Nitavskyi <[hidden email]> 于2020年1月31日周五 上午12:40写道:
>
>> Hi guys,
>>
>>
>>
>> We have encountered on some issue related with possibility to
>> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
>> idea to ensure that nobody introduce some random change which penalize the
>> performance of the job.
>>
>>
>>
>> The issue we have encountered is that Flink’s KafkaSource is storing
>> KafkaTopicPartition in the state for offset recovery, which is serialized
>> with Kryo.
>>
>> For sure this feature itself is not penalizing performance, but looks like
>> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
>> on the side of Flink user there is no good tool to add
>> KafkaTopicPartition’s non-Kryo type information.
>>
>>
>>
>> On of the related tickets I have found:
>> https://issues.apache.org/jira/browse/FLINK-12031
>>
>>
>>
>> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
>> or what do you think making some development to address this issue?
>>
>>
>>
>> Kind Regards
>>
>> Oleksandr
>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [BULK]Re: DisableGenericTypes is not compatible with Kafka

Oleksandr Nitavskyi
Thanks, guys for the answers.

Aljoscha, I have a question to ensure I get it right.
Am I correctly understand that this newly created TypeSerializer should use Kryo under the hood, so we keep the backward compatibility of the state and do not get an exception if generic types are disabled?

Thanks
Kind Regards
Oleksandr

From: Aljoscha Krettek <[hidden email]>
Sent: Tuesday, February 4, 2020 2:29 PM
To: [hidden email] <[hidden email]>
Subject: [BULK]Re: DisableGenericTypes is not compatible with Kafka
 
Unfortunately, the fact that the Kafka Sources use Kryo for state
serialization is a very early design misstep that we cannot get rid of
for now. We will get rid of that when the new source interface lands
([1]) and when we have a new Kafka Source based on that.

As a workaround, we should change the Kafka Consumer to go through a
different constructor of ListStateDescriptor which directly takes a
TypeSerializer instead of a TypeInformation here: [2]. This should
sidestep the "no generic types" check.

I created a Jira Issue for this:
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15904&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634652116&amp;sdata=OIMcxBp5dh%2FxZQw%2BBWTEkQnMHh%2BzengVNvW%2B%2FZvZRbY%3D&amp;reserved=0

Best,
Aljoscha

[1]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterface&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=sY7nurLvKaR7YnHIAr8ZFEdUmjuMfN%2BrYvMliCRSBh0%3D&amp;reserved=0
[2]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F68cc21e4af71505efa142110e35a1f8b1c25fe6e%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L860&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=fB%2F%2FOK7sSA93TycaSV5Z0g8EPYglH8fSlRhRt3nJLVE%3D&amp;reserved=0

On 01.02.20 09:44, Guowei Ma wrote:
> Hi,
> I think there could be two workaround ways to 'disableGenericType' in case
> of KafkaSource :
> 1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
> 2. using the reflection to call the private method. :)
>
> Maybe we could add this TypeInfo annotation to the KafakaConnector.
>
> [1]
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Ftypes_serialization.html%23defining-type-information-using-a-factory&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=YbnYb1Cjf%2BqotG8WkE8hC8ElpX9S2C%2BPDn464Hn5XyI%3D&amp;reserved=0
>
> Best,
> Guowei
>
>
> Oleksandr Nitavskyi <[hidden email]> 于2020年1月31日周五 上午12:40写道:
>
>> Hi guys,
>>
>>
>>
>> We have encountered on some issue related with possibility to
>> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
>> idea to ensure that nobody introduce some random change which penalize the
>> performance of the job.
>>
>>
>>
>> The issue we have encountered is that Flink’s KafkaSource is storing
>> KafkaTopicPartition in the state for offset recovery, which is serialized
>> with Kryo.
>>
>> For sure this feature itself is not penalizing performance, but looks like
>> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
>> on the side of Flink user there is no good tool to add
>> KafkaTopicPartition’s non-Kryo type information.
>>
>>
>>
>> On of the related tickets I have found:
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12031&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=%2BvVo6XdXdYbHgOQWO59On8zim4WR2yIPTVwUgUxql6w%3D&amp;reserved=0
>>
>>
>>
>> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
>> or what do you think making some development to address this issue?
>>
>>
>>
>> Kind Regards
>>
>> Oleksandr
>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [BULK]Re: DisableGenericTypes is not compatible with Kafka

Aljoscha Krettek
We can just directly create a KryoSerializer instead of going through  
TypeInformation, I think.

Best,
Aljoscha

On 05.02.20 18:29, Oleksandr Nitavskyi wrote:

> Thanks, guys for the answers.
>
> Aljoscha, I have a question to ensure I get it right.
> Am I correctly understand that this newly created TypeSerializer should use Kryo under the hood, so we keep the backward compatibility of the state and do not get an exception if generic types are disabled?
>
> Thanks
> Kind Regards
> Oleksandr
> ________________________________
> From: Aljoscha Krettek <[hidden email]>
> Sent: Tuesday, February 4, 2020 2:29 PM
> To: [hidden email] <[hidden email]>
> Subject: [BULK]Re: DisableGenericTypes is not compatible with Kafka
>
> Unfortunately, the fact that the Kafka Sources use Kryo for state
> serialization is a very early design misstep that we cannot get rid of
> for now. We will get rid of that when the new source interface lands
> ([1]) and when we have a new Kafka Source based on that.
>
> As a workaround, we should change the Kafka Consumer to go through a
> different constructor of ListStateDescriptor which directly takes a
> TypeSerializer instead of a TypeInformation here: [2]. This should
> sidestep the "no generic types" check.
>
> I created a Jira Issue for this:
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15904&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634652116&amp;sdata=OIMcxBp5dh%2FxZQw%2BBWTEkQnMHh%2BzengVNvW%2B%2FZvZRbY%3D&amp;reserved=0
>
> Best,
> Aljoscha
>
> [1]
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterface&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=sY7nurLvKaR7YnHIAr8ZFEdUmjuMfN%2BrYvMliCRSBh0%3D&amp;reserved=0
> [2]
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F68cc21e4af71505efa142110e35a1f8b1c25fe6e%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L860&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=fB%2F%2FOK7sSA93TycaSV5Z0g8EPYglH8fSlRhRt3nJLVE%3D&amp;reserved=0
>
> On 01.02.20 09:44, Guowei Ma wrote:
>> Hi,
>> I think there could be two workaround ways to 'disableGenericType' in case
>> of KafkaSource :
>> 1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
>> 2. using the reflection to call the private method. :)
>>
>> Maybe we could add this TypeInfo annotation to the KafakaConnector.
>>
>> [1]
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Ftypes_serialization.html%23defining-type-information-using-a-factory&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=YbnYb1Cjf%2BqotG8WkE8hC8ElpX9S2C%2BPDn464Hn5XyI%3D&amp;reserved=0
>>
>> Best,
>> Guowei
>>
>>
>> Oleksandr Nitavskyi <[hidden email]> 于2020年1月31日周五 上午12:40写道:
>>
>>> Hi guys,
>>>
>>>
>>>
>>> We have encountered on some issue related with possibility to
>>> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
>>> idea to ensure that nobody introduce some random change which penalize the
>>> performance of the job.
>>>
>>>
>>>
>>> The issue we have encountered is that Flink’s KafkaSource is storing
>>> KafkaTopicPartition in the state for offset recovery, which is serialized
>>> with Kryo.
>>>
>>> For sure this feature itself is not penalizing performance, but looks like
>>> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
>>> on the side of Flink user there is no good tool to add
>>> KafkaTopicPartition’s non-Kryo type information.
>>>
>>>
>>>
>>> On of the related tickets I have found:
>>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12031&amp;data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107&amp;sdata=%2BvVo6XdXdYbHgOQWO59On8zim4WR2yIPTVwUgUxql6w%3D&amp;reserved=0
>>>
>>>
>>>
>>> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
>>> or what do you think making some development to address this issue?
>>>
>>>
>>>
>>> Kind Regards
>>>
>>> Oleksandr
>>>
>>>
>>>
>>
>