Hi all, public class MyPartitioner implements Partitioner<MyGroupingKey> { ... } public class MyGroupingKey implements Comparable<MyGroupingKey> { ... } This worked fine, but I noticed a warning logged by Flink about MyGroupingKey not having an empty constructor, and thus not being treated as a POJO. I added that empty constructor, and then I got an error because partitionCustom() only works on a single field key. So I changed MyGroupingKey to have a single field (a string), with transient cached values for the pieces of the key that I need while partitioning. Now I get an odd error: java.lang.RuntimeException: Error while calling custom partitioner Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to MyGroupingKey at MyPartitioner.partition(AdsPinotFilePartitioner.java:11) at org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235) ... 19 more So I've got two questions… • Should I just get rid of the empty constructor, and have Flink treat it as a non-POJO? This seemed to be working fine. • Is it a bug in Flink that the extracted field from the key is being used as the expected type for partitioning? — Ken -------------------------- Ken Krugler |
Hi Ken,
non-POJOs are serialized with Kryo. This might not give you optimal performance. You can register a custom Kryo serializer in ExecutionConfig to speed up the serialization. Alternatively, you can implement `ResultTypeQueryable` provide a custom type information with a custom serializer. I hope this helps. Otherwise can you share a little example how you would like to cann partitionCustom()? Regards, Timo On 04.06.21 15:38, Ken Krugler wrote: > Hi all, > > I'm using Flink 1.12 and a custom partitioner/partitioning key (batch > mode, with a DataSet) to do a better job of distributing data to tasks. > The classes look like: > > public class MyPartitioner implements Partitioner<MyGroupingKey> > { > ... > } > > public class MyGroupingKey implements Comparable<MyGroupingKey> > { > ... > } > > This worked fine, but I noticed a warning logged by Flink > about MyGroupingKey not having an empty constructor, and thus not being > treated as a POJO. > > I added that empty constructor, and then I got an error > because partitionCustom() only works on a single field key. > > So I changed MyGroupingKey to have a single field (a string), with > transient cached values for the pieces of the key that I need while > partitioning. Now I get an odd error: > > java.lang.RuntimeException: Error while calling custom partitioner > > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast > to MyGroupingKey > at MyPartitioner.partition(AdsPinotFilePartitioner.java:11) > at > org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235) > ... 19 more > > So I've got two questions… > > • Should I just get rid of the empty constructor, and have Flink treat > it as a non-POJO? This seemed to be working fine. > • Is it a bug in Flink that the extracted field from the key is being > used as the expected type for partitioning? > > Thanks! > > — Ken > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > |
Hi Timo,
Thanks, I’ll give the ResultTypeQueryable interface a try - my previous experience registering custom Kryo serializers wasn’t so positive. Though I’m still curious as to whether java.lang.ClassCastException I got was representative of a bug in Flink, or my doing something wrong. But with the ongoing deprecation of DataSet support, I imagine that’s a low priority issue in any case. Regards, — Ken
-------------------------- Ken Krugler |
This could be a bug but I'd need to see more of the DataStream code to be sure. Could you share that code? On Sat, Jun 12, 2021 at 9:56 PM Ken Krugler <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |