Hi Chesnay,
Thanks for your support.
ThresholdAcvFact class is a simple POJO with the following definition:
public class ThresholdAcvFact {
private Long timePeriodId;
private Long geographyId;
private Long productId;
private Long customerId;
private Double basePrice;
private Double promoPrice;
private Double basePriceAcv;
private Double promoPriceAcv;
private Long count;
public Long getTimePeriodId() {
return timePeriodId;
}
public void setTimePeriodId(Long timePeriodId) {
this.timePeriodId = timePeriodId;
}
public Long getGeographyId() {
return geographyId;
}
public void setGeographyId(Long geographyId) {
this.geographyId = geographyId;
}
public Long getProductId() {
return productId;
}
public void setProductId(Long productId) {
this.productId = productId;
}
public Long getCustomerId() {
return customerId;
}
public void setCustomerId(Long customerId) {
this.customerId = customerId;
}
public Double getBasePrice() {
return basePrice;
}
public void setBasePrice(Double basePrice) {
this.basePrice = basePrice;
}
public Double getPromoPrice() {
return promoPrice;
}
public void setPromoPrice(Double promoPrice) {
this.promoPrice = promoPrice;
}
public Double getBasePriceAcv() {
return basePriceAcv;
}
public void setBasePriceAcv(Double basePriceAcv) {
this.basePriceAcv = basePriceAcv;
}
public Double getPromoPriceAcv() {
return promoPriceAcv;
}
public void setPromoPriceAcv(Double promoPriceAcv) {
this.promoPriceAcv = promoPriceAcv;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "ThresholdAcvFact{" +
"timePeriodId=" + timePeriodId +
", geographyId=" + geographyId +
", productId=" + productId +
", customerId=" + customerId +
", basePrice=" + basePrice +
", promoPrice=" + promoPrice +
", basePriceAcv=" + basePriceAcv +
", promoPriceAcv=" + promoPriceAcv +
", count=" + count +
'}';
}
}
While the implementation of the function we faced the issue reported is the following:
public DataSet<ThresholdAcvFact> transform(ThresholdAcvCalcSources thresholdAcvCalcSources, Long customerId) {
final DataSet<ThresholdAcvFact> basePriceFacts = getPriceFacts(
thresholdAcvCalcSources.getBasePriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
new ThresholdAcvBasePriceFactMapper(customerId));
final DataSet<ThresholdAcvFact> promoPriceFacts = getPriceFacts(
thresholdAcvCalcSources.getPromoPriceDataSet(), thresholdAcvCalcSources.getIriMapStoreGeographyDataSet(),
new ThresholdAcvPromoPriceFactMapper(customerId));
return basePriceFacts
.fullOuterJoin(promoPriceFacts)
.where(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "basePrice")
.equalTo(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID, "promoPrice")
.with(new ThresholdAcvFactBasePromoPriceJoiner())
.groupBy(PRODUCT_ID_FIELD, TIME_PERIOD_ID_FIELD, GEOGRAPHY_ID)
.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
@Override
public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {
return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
}
}, Order.ASCENDING)
.reduceGroup(new ThresholdAcvFactCountGroupReducer());
}
Regards,
Konstantinos
From: Chesnay Schepler <[hidden email]>
Sent: Τετάρτη, 3 Απριλίου 2019 12:59 μμ
To: Papadopoulos, Konstantinos <[hidden email]>; [hidden email]
Subject: Re: InvalidProgramException when trying to sort a group within a dataset
Your user-defined functions are referencing the class "com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl" which isn't serializable.
My guess is that "ThresholdAcvFact" is a non-static inner class, however I would need to see the entire class to give an accurate analysis.
On 02/04/2019 12:31, Papadopoulos, Konstantinos wrote:
Hi all,
I am trying to sort a group within a dataset using KeySelector as follows:
in
.groupBy(“productId”, “timePeriodId”, “geographyId”)
.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
@Override
public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {
return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
}
}, Order.ASCENDING).reduceGroup(/* do something */)
And I am getting the following exception:
org.apache.flink.api.common.InvalidProgramException: KeySelector group-sorting keys can only be used with KeySelector grouping keys.
at org.apache.flink.api.java.operators.UnsortedGrouping.sortGroup(UnsortedGrouping.java:318)
at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl.transform(ThresholdAcvCalcServiceImpl.java:91)
at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Then, I tried to use KeySelector for both ‘groupBy’ and ‘sortGroup’ transformations as follows:
in
.groupBy(new KeySelector<ThresholdAcvFact, Tuple3<Long, Long, Long>>() {
@Override
public Tuple3<Long, Long, Long> getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {
return new Tuple3<>(thresholdAcvFact.getProductId(), thresholdAcvFact.getTimePeriodId(), thresholdAcvFact.getGeographyId());
}
})
.sortGroup(new KeySelector<ThresholdAcvFact, Double>() {
@Override
public Double getKey(ThresholdAcvFact thresholdAcvFact) throws Exception {
return Optional.ofNullable(thresholdAcvFact.getBasePrice()).orElse(thresholdAcvFact.getPromoPrice());
}
}, Order.ASCENDING).reduceGroup(/* do something */)
The job execution still failed with the following exception:
org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:429)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:116)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:203)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:221)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:817)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceTest.transform(ThresholdAcvCalcServiceTest.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:281)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:897)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:374)
... 50 more
Caused by: java.io.NotSerializableException: com.iri.aa.etl.rgm.service.ThresholdAcvCalcServiceImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:463)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:279)
... 52 more
Does anyone have any idea how I can surpass such issues?
Thanks in advance
Free forum by Nabble | Edit this page |