InvalidProgramException when trying to sort a group within a dataset

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

InvalidProgramException when trying to sort a group within a dataset

Papadopoulos, Konstantinos

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: InvalidProgramException when trying to sort a group within a dataset

Chesnay Schepler
Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: InvalidProgramException when trying to sort a group within a dataset

Papadopoulos, Konstantinos

Hi Chesnay,

 

Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition:

 

public class ThresholdAcvFact {

 

    private Long timePeriodId;

    private Long geographyId;

    private Long productId;

    private Long customerId;

    private Double basePrice;

    private Double promoPrice;

    private Double basePriceAcv;

    private Double promoPriceAcv;

    private Long count;

 

    public Long getTimePeriodId() {

        return timePeriodId;

    }

 

    public void setTimePeriodId(Long timePeriodId) {

        this.timePeriodId = timePeriodId;

    }

 

    public Long getGeographyId() {

        return geographyId;

    }

 

    public void setGeographyId(Long geographyId) {

        this.geographyId = geographyId;

    }

 

    public Long getProductId() {

        return productId;

    }

 

    public void setProductId(Long productId) {

        this.productId = productId;

    }

 

    public Long getCustomerId() {

        return customerId;

    }

 

    public void setCustomerId(Long customerId) {

        this.customerId = customerId;

    }

 

    public Double getBasePrice() {

        return basePrice;

    }

 

    public void setBasePrice(Double basePrice) {

        this.basePrice = basePrice;

    }

 

    public Double getPromoPrice() {

        return promoPrice;

    }

 

    public void setPromoPrice(Double promoPrice) {

        this.promoPrice = promoPrice;

    }

 

    public Double getBasePriceAcv() {

        return basePriceAcv;

    }

 

    public void setBasePriceAcv(Double basePriceAcv) {

        this.basePriceAcv = basePriceAcv;

    }

 

    public Double getPromoPriceAcv() {

        return promoPriceAcv;

    }

 

    public void setPromoPriceAcv(Double promoPriceAcv) {

        this.promoPriceAcv = promoPriceAcv;

    }

 

    public Long getCount() {

        return count;

    }

 

    public void setCount(Long count) {

        this.count = count;

    }

 

    @Override

    public String toString() {

        return "ThresholdAcvFact{" +

                "timePeriodId=" + timePeriodId +

                ", geographyId=" + geographyId +

                ", productId=" + productId +

                ", customerId=" + customerId +

                ", basePrice=" + basePrice +

                ", promoPrice=" + promoPrice +

                ", basePriceAcv=" + basePriceAcv +

                ", promoPriceAcv=" + promoPriceAcv +

                ", count=" + count +

                '}';

    }

 

}

 

While the implementation of the function we faced the issue reported is the following:

 

public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {

 

        final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvBasePriceFactMapper(customerId));

 

        final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvPromoPriceFactMapper(customerId));

 

        return basePriceFacts

                .fullOuterJoin(promoPriceFacts)

                .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")

                .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")

                .with(new ThresholdAcvFactBasePromoPriceJoiner())

                .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)

.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

                .reduceGroup(new ThresholdAcvFactCountGroupReducer());

 

    }

 

Regards,

Konstantinos

 

From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: InvalidProgramException when trying to sort a group within a dataset

Fabian Hueske-2
Hi,

You POJO should implement the Serializable interface. 
Otherwise it's not considered to be serializable. 

Best, Fabian 

Papadopoulos, Konstantinos <[hidden email]> schrieb am Mi., 3. Apr. 2019, 07:22:

Hi Chesnay,

 

Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition:

 

public class ThresholdAcvFact {

 

    private Long timePeriodId;

    private Long geographyId;

    private Long productId;

    private Long customerId;

    private Double basePrice;

    private Double promoPrice;

    private Double basePriceAcv;

    private Double promoPriceAcv;

    private Long count;

 

    public Long getTimePeriodId() {

        return timePeriodId;

    }

 

    public void setTimePeriodId(Long timePeriodId) {

        this.timePeriodId = timePeriodId;

    }

 

    public Long getGeographyId() {

        return geographyId;

    }

 

    public void setGeographyId(Long geographyId) {

        this.geographyId = geographyId;

    }

 

    public Long getProductId() {

        return productId;

    }

 

    public void setProductId(Long productId) {

        this.productId = productId;

    }

 

    public Long getCustomerId() {

        return customerId;

    }

 

    public void setCustomerId(Long customerId) {

        this.customerId = customerId;

    }

 

    public Double getBasePrice() {

        return basePrice;

    }

 

    public void setBasePrice(Double basePrice) {

        this.basePrice = basePrice;

    }

 

    public Double getPromoPrice() {

        return promoPrice;

    }

 

    public void setPromoPrice(Double promoPrice) {

        this.promoPrice = promoPrice;

    }

 

    public Double getBasePriceAcv() {

        return basePriceAcv;

    }

 

    public void setBasePriceAcv(Double basePriceAcv) {

        this.basePriceAcv = basePriceAcv;

    }

 

    public Double getPromoPriceAcv() {

        return promoPriceAcv;

    }

 

    public void setPromoPriceAcv(Double promoPriceAcv) {

        this.promoPriceAcv = promoPriceAcv;

    }

 

    public Long getCount() {

        return count;

    }

 

    public void setCount(Long count) {

        this.count = count;

    }

 

    @Override

    public String toString() {

        return "ThresholdAcvFact{" +

                "timePeriodId=" + timePeriodId +

                ", geographyId=" + geographyId +

                ", productId=" + productId +

                ", customerId=" + customerId +

                ", basePrice=" + basePrice +

                ", promoPrice=" + promoPrice +

                ", basePriceAcv=" + basePriceAcv +

                ", promoPriceAcv=" + promoPriceAcv +

                ", count=" + count +

                '}';

    }

 

}

 

While the implementation of the function we faced the issue reported is the following:

 

public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {

 

        final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvBasePriceFactMapper(customerId));

 

        final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvPromoPriceFactMapper(customerId));

 

        return basePriceFacts

                .fullOuterJoin(promoPriceFacts)

                .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")

                .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")

                .with(new ThresholdAcvFactBasePromoPriceJoiner())

                .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)

.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

                .reduceGroup(new ThresholdAcvFactCountGroupReducer());

 

    }

 

Regards,

Konstantinos

 

From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: InvalidProgramException when trying to sort a group within a dataset

Papadopoulos, Konstantinos

Hi Fabian,

 

Thanks for your support. I updated my POJO to implement the Serializable interface with no success.

I got the same NotSerializableException.

 

Best,

Konstantinos

 

From: Fabian Hueske <[hidden email]>
Sent: Σάββατο, 6 Απριλίου 2019 2:26 πμ
To: Papadopoulos, Konstantinos <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>; user <[hidden email]>
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Hi,

 

You POJO should implement the Serializable interface. 

Otherwise it's not considered to be serializable. 

 

Best, Fabian 

 

Papadopoulos, Konstantinos <[hidden email]> schrieb am Mi., 3. Apr. 2019, 07:22:

Hi Chesnay,

 

Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition:

 

public class ThresholdAcvFact {

 

    private Long timePeriodId;

    private Long geographyId;

    private Long productId;

    private Long customerId;

    private Double basePrice;

    private Double promoPrice;

    private Double basePriceAcv;

    private Double promoPriceAcv;

    private Long count;

 

    public Long getTimePeriodId() {

        return timePeriodId;

    }

 

    public void setTimePeriodId(Long timePeriodId) {

        this.timePeriodId = timePeriodId;

    }

 

    public Long getGeographyId() {

        return geographyId;

    }

 

    public void setGeographyId(Long geographyId) {

        this.geographyId = geographyId;

    }

 

    public Long getProductId() {

        return productId;

    }

 

    public void setProductId(Long productId) {

        this.productId = productId;

    }

 

    public Long getCustomerId() {

        return customerId;

    }

 

    public void setCustomerId(Long customerId) {

        this.customerId = customerId;

    }

 

    public Double getBasePrice() {

        return basePrice;

    }

 

    public void setBasePrice(Double basePrice) {

        this.basePrice = basePrice;

    }

 

    public Double getPromoPrice() {

        return promoPrice;

    }

 

    public void setPromoPrice(Double promoPrice) {

        this.promoPrice = promoPrice;

    }

 

    public Double getBasePriceAcv() {

        return basePriceAcv;

    }

 

    public void setBasePriceAcv(Double basePriceAcv) {

        this.basePriceAcv = basePriceAcv;

    }

 

    public Double getPromoPriceAcv() {

        return promoPriceAcv;

    }

 

    public void setPromoPriceAcv(Double promoPriceAcv) {

        this.promoPriceAcv = promoPriceAcv;

    }

 

    public Long getCount() {

        return count;

    }

 

    public void setCount(Long count) {

        this.count = count;

    }

 

    @Override

    public String toString() {

        return "ThresholdAcvFact{" +

                "timePeriodId=" + timePeriodId +

                ", geographyId=" + geographyId +

                ", productId=" + productId +

                ", customerId=" + customerId +

                ", basePrice=" + basePrice +

                ", promoPrice=" + promoPrice +

                ", basePriceAcv=" + basePriceAcv +

                ", promoPriceAcv=" + promoPriceAcv +

                ", count=" + count +

                '}';

    }

 

}

 

While the implementation of the function we faced the issue reported is the following:

 

public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {

 

        final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvBasePriceFactMapper(customerId));

 

        final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvPromoPriceFactMapper(customerId));

 

        return basePriceFacts

                .fullOuterJoin(promoPriceFacts)

                .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")

                .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")

                .with(new ThresholdAcvFactBasePromoPriceJoiner())

                .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)

.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

                .reduceGroup(new ThresholdAcvFactCountGroupReducer());

 

    }

 

Regards,

Konstantinos

 

From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: InvalidProgramException when trying to sort a group within a dataset

Fabian Hueske-2
Hi,

If you have a closer look at the excecption, you'll see that the issue is cause by com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl not being serializable.
It seems that you have a reference to this class somewhere.

Flink requires that all function classes (like KeySelector) are serializable.

Best, Fabian

Am Mo., 8. Apr. 2019 um 09:13 Uhr schrieb Papadopoulos, Konstantinos <[hidden email]>:

Hi Fabian,

 

Thanks for your support. I updated my POJO to implement the Serializable interface with no success.

I got the same NotSerializableException.

 

Best,

Konstantinos

 

From: Fabian Hueske <[hidden email]>
Sent: Σάββατο, 6 Απριλίου 2019 2:26 πμ
To: Papadopoulos, Konstantinos <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>; user <[hidden email]>
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Hi,

 

You POJO should implement the Serializable interface. 

Otherwise it's not considered to be serializable. 

 

Best, Fabian 

 

Papadopoulos, Konstantinos <[hidden email]> schrieb am Mi., 3. Apr. 2019, 07:22:

Hi Chesnay,

 

Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition:

 

public class ThresholdAcvFact {

 

    private Long timePeriodId;

    private Long geographyId;

    private Long productId;

    private Long customerId;

    private Double basePrice;

    private Double promoPrice;

    private Double basePriceAcv;

    private Double promoPriceAcv;

    private Long count;

 

    public Long getTimePeriodId() {

        return timePeriodId;

    }

 

    public void setTimePeriodId(Long timePeriodId) {

        this.timePeriodId = timePeriodId;

    }

 

    public Long getGeographyId() {

        return geographyId;

    }

 

    public void setGeographyId(Long geographyId) {

        this.geographyId = geographyId;

    }

 

    public Long getProductId() {

        return productId;

    }

 

    public void setProductId(Long productId) {

        this.productId = productId;

    }

 

    public Long getCustomerId() {

        return customerId;

    }

 

    public void setCustomerId(Long customerId) {

        this.customerId = customerId;

    }

 

    public Double getBasePrice() {

        return basePrice;

    }

 

    public void setBasePrice(Double basePrice) {

        this.basePrice = basePrice;

    }

 

    public Double getPromoPrice() {

        return promoPrice;

    }

 

    public void setPromoPrice(Double promoPrice) {

        this.promoPrice = promoPrice;

    }

 

    public Double getBasePriceAcv() {

        return basePriceAcv;

    }

 

    public void setBasePriceAcv(Double basePriceAcv) {

        this.basePriceAcv = basePriceAcv;

    }

 

    public Double getPromoPriceAcv() {

        return promoPriceAcv;

    }

 

    public void setPromoPriceAcv(Double promoPriceAcv) {

        this.promoPriceAcv = promoPriceAcv;

    }

 

    public Long getCount() {

        return count;

    }

 

    public void setCount(Long count) {

        this.count = count;

    }

 

    @Override

    public String toString() {

        return "ThresholdAcvFact{" +

                "timePeriodId=" + timePeriodId +

                ", geographyId=" + geographyId +

                ", productId=" + productId +

                ", customerId=" + customerId +

                ", basePrice=" + basePrice +

                ", promoPrice=" + promoPrice +

                ", basePriceAcv=" + basePriceAcv +

                ", promoPriceAcv=" + promoPriceAcv +

                ", count=" + count +

                '}';

    }

 

}

 

While the implementation of the function we faced the issue reported is the following:

 

public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {

 

        final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvBasePriceFactMapper(customerId));

 

        final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvPromoPriceFactMapper(customerId));

 

        return basePriceFacts

                .fullOuterJoin(promoPriceFacts)

                .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")

                .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")

                .with(new ThresholdAcvFactBasePromoPriceJoiner())

                .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)

.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

                .reduceGroup(new ThresholdAcvFactCountGroupReducer());

 

    }

 

Regards,

Konstantinos

 

From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: InvalidProgramException when trying to sort a group within a dataset

Papadopoulos, Konstantinos

Thanks, Fabian.

 

Problem solved after implementing the Serializable interface from all the services of the stack or making transient the ones not needed.

 

Best,

Konstantinos

 

From: Fabian Hueske <[hidden email]>
Sent: Δευτέρα, 8 Απριλίου 2019 11:37 πμ
To: Papadopoulos, Konstantinos <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>; user <[hidden email]>
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Hi,

 

If you have a closer look at the excecption, you'll see that the issue is cause by com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl not being serializable.

It seems that you have a reference to this class somewhere.

 

Flink requires that all function classes (like KeySelector) are serializable.

 

Best, Fabian

 

Am Mo., 8. Apr. 2019 um 09:13 Uhr schrieb Papadopoulos, Konstantinos <[hidden email]>:

Hi Fabian,

 

Thanks for your support. I updated my POJO to implement the Serializable interface with no success.

I got the same NotSerializableException.

 

Best,

Konstantinos

 

From: Fabian Hueske <[hidden email]>
Sent: Σάββατο, 6 Απριλίου 2019 2:26 πμ
To: Papadopoulos, Konstantinos <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>; user <[hidden email]>
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Hi,

 

You POJO should implement the Serializable interface. 

Otherwise it's not considered to be serializable. 

 

Best, Fabian 

 

Papadopoulos, Konstantinos <[hidden email]> schrieb am Mi., 3. Apr. 2019, 07:22:

Hi Chesnay,

 

Thanks for your support. ThresholdAcvFact class is a simple POJO with the following definition:

 

public class ThresholdAcvFact {

 

    private Long timePeriodId;

    private Long geographyId;

    private Long productId;

    private Long customerId;

    private Double basePrice;

    private Double promoPrice;

    private Double basePriceAcv;

    private Double promoPriceAcv;

    private Long count;

 

    public Long getTimePeriodId() {

        return timePeriodId;

    }

 

    public void setTimePeriodId(Long timePeriodId) {

        this.timePeriodId = timePeriodId;

    }

 

    public Long getGeographyId() {

        return geographyId;

    }

 

    public void setGeographyId(Long geographyId) {

        this.geographyId = geographyId;

    }

 

    public Long getProductId() {

        return productId;

    }

 

    public void setProductId(Long productId) {

        this.productId = productId;

    }

 

    public Long getCustomerId() {

        return customerId;

    }

 

    public void setCustomerId(Long customerId) {

        this.customerId = customerId;

    }

 

    public Double getBasePrice() {

        return basePrice;

    }

 

    public void setBasePrice(Double basePrice) {

        this.basePrice = basePrice;

    }

 

    public Double getPromoPrice() {

        return promoPrice;

    }

 

    public void setPromoPrice(Double promoPrice) {

        this.promoPrice = promoPrice;

    }

 

    public Double getBasePriceAcv() {

        return basePriceAcv;

    }

 

    public void setBasePriceAcv(Double basePriceAcv) {

        this.basePriceAcv = basePriceAcv;

    }

 

    public Double getPromoPriceAcv() {

        return promoPriceAcv;

    }

 

    public void setPromoPriceAcv(Double promoPriceAcv) {

        this.promoPriceAcv = promoPriceAcv;

    }

 

    public Long getCount() {

        return count;

    }

 

    public void setCount(Long count) {

        this.count = count;

    }

 

    @Override

    public String toString() {

        return "ThresholdAcvFact{" +

                "timePeriodId=" + timePeriodId +

                ", geographyId=" + geographyId +

                ", productId=" + productId +

                ", customerId=" + customerId +

                ", basePrice=" + basePrice +

                ", promoPrice=" + promoPrice +

                ", basePriceAcv=" + basePriceAcv +

                ", promoPriceAcv=" + promoPriceAcv +

                ", count=" + count +

                '}';

    }

 

}

 

While the implementation of the function we faced the issue reported is the following:

 

public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {

 

        final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvBasePriceFactMapper(customerId));

 

        final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(

                thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),

                new ThresholdAcvPromoPriceFactMapper(customerId));

 

        return basePriceFacts

                .fullOuterJoin(promoPriceFacts)

                .where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")

                .equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")

                .with(new ThresholdAcvFactBasePromoPriceJoiner())

                .groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)

.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

                .reduceGroup(new ThresholdAcvFactCountGroupReducer());

 

    }

 

Regards,

Konstantinos

 

From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset

 

Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.

My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.

On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:

Hi all,

 

I am trying to sort a group within a dataset using KeySelector as follows:

 

in

  .groupBy(“productId”, “timePeriodId”, “geographyId”)

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

And I am getting the following exception:

 

org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.

 

     at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

 

Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:

 

in

  .groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {

 

    @Override

    public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return  new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());

    }

  })

  .sortGroup(new KeySelector<ThresholdAcvFact, Double>() {

    @Override
    public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {

      return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
       

     }
  }, Order.ASCENDING)

  .reduceGroup(/* do something */)

 

The job execution still failed with the following exception:

 

org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

 

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)

     at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)

     at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)

     at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)

     at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)

     at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)

     at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)

     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

     at java.lang.reflect.Method.invoke(Method.java:498)

     at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)

     at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)

     at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at java.util.ArrayList.forEach(ArrayList.java:1257)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)

     at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)

     at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)

     at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)

     at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)

     at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)

     at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)

     at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)

     at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)

     at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

     at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)

     at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)

     at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)

     ... 50 more

Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

     at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)

     at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)

     at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)

     ... 52 more

 

Does anyone have any idea how I can surpass such issues?

 

Thanks in advance