Error with extracted type from custom partitioner key

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Error with extracted type from custom partitioner key

Ken Krugler
Hi all,

I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, with a DataSet) to do a better job of distributing data to tasks. The classes look like:

public class MyPartitioner implements Partitioner<MyGroupingKey> 
{
    ...
}

public class MyGroupingKey implements Comparable<MyGroupingKey> 
{
    ...
}

This worked fine, but I noticed a warning logged by Flink about MyGroupingKey not having an empty constructor, and thus not being treated as a POJO.

I added that empty constructor, and then I got an error because partitionCustom() only works on a single field key.

So I changed MyGroupingKey to have a single field (a string), with transient cached values for the pieces of the key that I need while partitioning. Now I get an odd error:

java.lang.RuntimeException: Error while calling custom partitioner

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to MyGroupingKey
        at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
        at org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
        ... 19 more

So I've got two questions…

• Should I just get rid of the empty constructor, and have Flink treat it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being used as the expected type for partitioning?

Thanks!

— Ken

--------------------------
Ken Krugler
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply | Threaded
Open this post in threaded view
|

Re: Error with extracted type from custom partitioner key

Timo Walther
Hi Ken,

non-POJOs are serialized with Kryo. This might not give you optimal
performance. You can register a custom Kryo serializer in
ExecutionConfig to speed up the serialization.

Alternatively, you can implement `ResultTypeQueryable` provide a custom
type information with a custom serializer.

I hope this helps. Otherwise can you share a little example how you
would like to cann partitionCustom()?

Regards,
Timo

On 04.06.21 15:38, Ken Krugler wrote:

> Hi all,
>
> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch
> mode, with a DataSet) to do a better job of distributing data to tasks.
> The classes look like:
>
> public class MyPartitioner implements Partitioner<MyGroupingKey>
> {
>      ...
> }
>
> public class MyGroupingKey implements Comparable<MyGroupingKey>
> {
>      ...
> }
>
> This worked fine, but I noticed a warning logged by Flink
> about MyGroupingKey not having an empty constructor, and thus not being
> treated as a POJO.
>
> I added that empty constructor, and then I got an error
> because partitionCustom() only works on a single field key.
>
> So I changed MyGroupingKey to have a single field (a string), with
> transient cached values for the pieces of the key that I need while
> partitioning. Now I get an odd error:
>
> java.lang.RuntimeException: Error while calling custom partitioner
>
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to MyGroupingKey
>          at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
>          at
> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
>          ... 19 more
>
> So I've got two questions…
>
> • Should I just get rid of the empty constructor, and have Flink treat
> it as a non-POJO? This seemed to be working fine.
> • Is it a bug in Flink that the extracted field from the key is being
> used as the expected type for partitioning?
>
> Thanks!
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Error with extracted type from custom partitioner key

Ken Krugler
Hi Timo,

Thanks, I’ll give the ResultTypeQueryable interface a try - my previous experience registering custom Kryo serializers wasn’t so positive.

Though I’m still curious as to whether java.lang.ClassCastException I got was representative of a bug in Flink, or my doing something wrong.

But with the ongoing deprecation of DataSet support, I imagine that’s a low priority issue in any case.

Regards,

— Ken


On Jun 4, 2021, at 7:05 AM, Timo Walther <[hidden email]> wrote:

Hi Ken,

non-POJOs are serialized with Kryo. This might not give you optimal performance. You can register a custom Kryo serializer in ExecutionConfig to speed up the serialization.

Alternatively, you can implement `ResultTypeQueryable` provide a custom type information with a custom serializer.

I hope this helps. Otherwise can you share a little example how you would like to cann partitionCustom()?

Regards,
Timo

On 04.06.21 15:38, Ken Krugler wrote:
Hi all,
I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, with a DataSet) to do a better job of distributing data to tasks. The classes look like:
public class MyPartitioner implements Partitioner<MyGroupingKey>
{
    ...
}
public class MyGroupingKey implements Comparable<MyGroupingKey>
{
    ...
}
This worked fine, but I noticed a warning logged by Flink about MyGroupingKey not having an empty constructor, and thus not being treated as a POJO.
I added that empty constructor, and then I got an error because partitionCustom() only works on a single field key.
So I changed MyGroupingKey to have a single field (a string), with transient cached values for the pieces of the key that I need while partitioning. Now I get an odd error:
java.lang.RuntimeException: Error while calling custom partitioner
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to MyGroupingKey
        at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
        at org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
        ... 19 more
So I've got two questions…
• Should I just get rid of the empty constructor, and have Flink treat it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being used as the expected type for partitioning?
Thanks!
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com>
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch


--------------------------
Ken Krugler
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply | Threaded
Open this post in threaded view
|

Re: Error with extracted type from custom partitioner key

Arvid Heise-4
This could be a bug but I'd need to see more of the DataStream code to be sure. Could you share that code?

On Sat, Jun 12, 2021 at 9:56 PM Ken Krugler <[hidden email]> wrote:
Hi Timo,

Thanks, I’ll give the ResultTypeQueryable interface a try - my previous experience registering custom Kryo serializers wasn’t so positive.

Though I’m still curious as to whether java.lang.ClassCastException I got was representative of a bug in Flink, or my doing something wrong.

But with the ongoing deprecation of DataSet support, I imagine that’s a low priority issue in any case.

Regards,

— Ken


On Jun 4, 2021, at 7:05 AM, Timo Walther <[hidden email]> wrote:

Hi Ken,

non-POJOs are serialized with Kryo. This might not give you optimal performance. You can register a custom Kryo serializer in ExecutionConfig to speed up the serialization.

Alternatively, you can implement `ResultTypeQueryable` provide a custom type information with a custom serializer.

I hope this helps. Otherwise can you share a little example how you would like to cann partitionCustom()?

Regards,
Timo

On 04.06.21 15:38, Ken Krugler wrote:
Hi all,
I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, with a DataSet) to do a better job of distributing data to tasks. The classes look like:
public class MyPartitioner implements Partitioner<MyGroupingKey>
{
    ...
}
public class MyGroupingKey implements Comparable<MyGroupingKey>
{
    ...
}
This worked fine, but I noticed a warning logged by Flink about MyGroupingKey not having an empty constructor, and thus not being treated as a POJO.
I added that empty constructor, and then I got an error because partitionCustom() only works on a single field key.
So I changed MyGroupingKey to have a single field (a string), with transient cached values for the pieces of the key that I need while partitioning. Now I get an odd error:
java.lang.RuntimeException: Error while calling custom partitioner
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to MyGroupingKey
        at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
        at org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
        ... 19 more
So I've got two questions…
• Should I just get rid of the empty constructor, and have Flink treat it as a non-POJO? This seemed to be working fine.
• Is it a bug in Flink that the extracted field from the key is being used as the expected type for partitioning?
Thanks!
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com>
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch


--------------------------
Ken Krugler
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch