Hi folks,
i try to join two datasets containing some PoJos. Each PoJo inherit a field "sessionId" from the parent class. The field is private but has a public getter. The join is like this: DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); But the result is the following execption: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key. at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287) at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890) at x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) I spend some time with google around but I don't get an idea what is wrong. I hope some of you can give me a hint... Greets Dominique |
Hi Dominique,
It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA public? There are some requirements for POJO classes [1]. [1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos Regards, Chiwan Park > On Feb 9, 2016, at 7:42 PM, Dominique Rondé <[hidden email]> wrote: > > Hi folks, > > i try to join two datasets containing some PoJos. Each PoJo inherit a field "sessionId" from the parent class. The field is private but has a public getter. > > The join is like this: > DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); > > But the result is the following execption: > > Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key. > at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287) > at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890) > at x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) > > I spend some time with google around but I don't get an idea what is wrong. I hope some of you can give me a hint... > > Greets > Dominique > |
Oh, the fields in SourceA have public getters. Does the fields in SourceA have public setter? SourceA needs public setter for private fields.
Regards, Chiwan Park > On Feb 9, 2016, at 7:45 PM, Chiwan Park <[hidden email]> wrote: > > Hi Dominique, > > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA public? There are some requirements for POJO classes [1]. > > [1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos > > Regards, > Chiwan Park > >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <[hidden email]> wrote: >> >> Hi folks, >> >> i try to join two datasets containing some PoJos. Each PoJo inherit a field "sessionId" from the parent class. The field is private but has a public getter. >> >> The join is like this: >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); >> >> But the result is the following execption: >> >> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key. >> at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287) >> at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890) >> at x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) >> >> I spend some time with google around but I don't get an idea what is wrong. I hope some of you can give me a hint... >> >> Greets >> Dominique >> > |
The fields in SourceA and SourceB are private but have public getters and setters. The classes provide an empty and public constructor. Am 09.02.2016 11:47 schrieb "Chiwan Park" <[hidden email]>:
Oh, the fields in SourceA have public getters. Does the fields in SourceA have public setter? SourceA needs public setter for private fields. |
Could you share the code for your types Cheers, On Tue, Feb 9, 2016 at 11:53 AM, Dominique Rondé <[hidden email]> wrote:
|
In reply to this post by Dominique Rondé
What is the type of sessionId? It must be a key type in order to be used as key. If it is a generic class, it must implement Comparable to be used as key. 2016-02-09 11:53 GMT+01:00 Dominique Rondé <[hidden email]>:
|
I wrote a sample inherited POJO example [1]. The example works with Flink 0.10.1 and 1.0-SNAPSHOT.
[1]: https://gist.github.com/chiwanpark/0389ce946e4fff58d611 Regards, Chiwan Park > On Feb 9, 2016, at 8:07 PM, Fabian Hueske <[hidden email]> wrote: > > What is the type of sessionId? > It must be a key type in order to be used as key. If it is a generic class, it must implement Comparable to be used as key. > > 2016-02-09 11:53 GMT+01:00 Dominique Rondé <[hidden email]>: > The fields in SourceA and SourceB are private but have public getters and setters. The classes provide an empty and public constructor. > > Am 09.02.2016 11:47 schrieb "Chiwan Park" <[hidden email]>: > Oh, the fields in SourceA have public getters. Does the fields in SourceA have public setter? SourceA needs public setter for private fields. > > Regards, > Chiwan Park > > > On Feb 9, 2016, at 7:45 PM, Chiwan Park <[hidden email]> wrote: > > > > Hi Dominique, > > > > It seems that `SourceA` is not dealt as POJO. Are all fields in SourceA public? There are some requirements for POJO classes [1]. > > > > [1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#pojos > > > > Regards, > > Chiwan Park > > > >> On Feb 9, 2016, at 7:42 PM, Dominique Rondé <[hidden email]> wrote: > >> > >> Hi folks, > >> > >> i try to join two datasets containing some PoJos. Each PoJo inherit a field "sessionId" from the parent class. The field is private but has a public getter. > >> > >> The join is like this: > >> DataSet<Tuple2<SourceA,SourceB>> joinedDataSet = sourceA.join(SourceB).where("sessionId").equalTo("sessionId"); > >> > >> But the result is the following execption: > >> > >> Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<x.y.z.service.eventstore.dto.SourceA>) cannot be used as key. > >> at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:287) > >> at org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.where(JoinOperator.java:890) > >> at x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:55) > >> > >> I spend some time with google around but I don't get an idea what is wrong. I hope some of you can give me a hint... > >> > >> Greets > >> Dominique > >> > > > > |
In reply to this post by Till Rohrmann
Sorry, i was out for lunch. Maybe the problem is that sessionID is a
String?
public abstract class Parent{ private Date eventDate; private EventType eventType; private String sessionId; public Parent() { } //GETTER & SETTER } public class SourceA extends Parent{ private Boolean outboundMessage; private String soapMessage; public SourceA () { super(); } //GETTER & SETTER } public class SourceB extends Parent{ private Integer id; private String username; public SourceB () { super(); } //GETTER & SETTER } Am 09.02.2016 um 12:06 schrieb Till
Rohrmann:
-- Dominique Rondé | Senior Consultant codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland mobil: +49 (0) 172.7182592 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz |
String is perfectly fine as key. Looks like SourceA / SourceB are not correctly identified as Pojos.2016-02-09 14:25 GMT+01:00 Dominique Rondé <[hidden email]>:
|
In reply to this post by Dominique Rondé
Could you post the complete example code (Flink job including the type definitions). For example, if the data sets are of type Cheers, On Tue, Feb 9, 2016 at 2:25 PM, Dominique Rondé <[hidden email]> wrote:
|
Here we go!
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("xxx.xxx.xxx.xxx", 53408,"flink-job.jar"); DataSource<String> datasourceA= env.readTextFile("hdfs://dev//sourceA/"); DataSource<String> datasourceB= env.readTextFile("hdfs://dev//sourceB/"); DataSet<SourceA> sourceA= datasourceA.map(new SourceAMapper()); DataSet<SourceB> sourceB= datasourceB.map(new SourceBMapper()); sourceA.join(sourceB).where("sessionId").equalTo("sessionId").print(); Thanks a lot! Dominique Am 09.02.2016 um 14:36 schrieb Till
Rohrmann:
-- Dominique Rondé | Senior Consultant codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland mobil: +49 (0) 172.7182592 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz |
I tested the Cheers, On Tue, Feb 9, 2016 at 2:46 PM, Dominique Rondé <[hidden email]> wrote:
|
Hi all,
i finally figured out that there is a getter for a boolean field which may be the source of the trouble. It seems that getBooleanField (as we use it) is not the best choice. Now the plan is executed with another error code. :( Caused by: java.lang.Exception: Unsupported driver strategy for join driver: CO_GROUP_RAW Is there any link to a documentation or some example code which you may recommend beside the offical documentation? But folks, thanks for your greate support! A really nice community here! Greets Dominique Am 09.02.2016 um 19:41 schrieb Till
Rohrmann:
-- Dominique Rondé | Senior Consultant codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland mobil: +49 (0) 172.7182592 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz |
Hi, I guess that's not the case since all code snippets were Java so far. glad you could resolve the POJO issue, but the new error doesn't look right. The CO_GROUP_RAW strategy should only be used for programs that are implemented against the Python DataSet API. 2016-02-09 20:13 GMT+01:00 Dominique Rondé <[hidden email]>:
|
Hi,
your guess is correct. I use java all the time... Here is the complete stacktrace: Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:367) at org.apache.flink.client.program.Client.runBlocking(Client.java:345) at org.apache.flink.client.program.Client.runBlocking(Client.java:312) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:212) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:189) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:160) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1583) at x.y.z.eventstore.processing.pmc.PmcProcessor.main(PmcProcessor.java:103) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The data preparation for task 'CHAIN Join(Join at main(PmcProcessor.java:103)) -> FlatMap (collect())' , caused an error: Unsupported driver strategy for join driver: CO_GROUP_RAW at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Unsupported driver strategy for join driver: CO_GROUP_RAW at org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:193) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459) ... 3 more Am 09.02.2016 um 21:03 schrieb Fabian
Hueske:
-- Dominique Rondé | Senior Consultant codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland mobil: +49 (0) 172.7182592 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz |
Hi Dominique, can you check if the versions of the remotely running job manager & task managers are the same as the Flink version that is used to submit the job? The version and commit hash are logged at the top of the JM and TM log files.2016-02-10 7:33 GMT+01:00 Dominique Rondé <[hidden email]>:
|
Hi Fabian,
your hint was good! Maven fools me with the dependency management. Now everything works as expected! Many many thanks to all of you! Greets Dominique Am 10.02.2016 um 08:45 schrieb Fabian
Hueske:
-- Dominique Rondé | Senior Consultant codecentric AG | Kreuznacherstrasse 30 | 60486 Frankfurt | Deutschland mobil: +49 (0) 172.7182592 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz |
Free forum by Nabble | Edit this page |