I am trying to run a Java flink streaming job (basic word count with Kafka) locally. But I am stuck with the following error. Any ideas on the cause of the error?
Job execution switched to status RUNNING. 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(1/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(1/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(2/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(2/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(3/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(3/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(4/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(4/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(5/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(5/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(6/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(6/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(7/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(7/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(8/8) switched to SCHEDULED 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(8/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(1/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(1/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(2/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(2/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(3/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(3/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(4/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(4/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(5/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(5/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(6/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(6/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(7/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(7/8) switched to DEPLOYING 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(8/8) switched to SCHEDULED 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(8/8) switched to DEPLOYING 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(5/8) switched to FAILED java.lang.Exception: Could not load the task's invokable class. at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:450) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: class org.apache.flink.streaming.runtime.tasks.SourceStreamTask at java.lang.Class.asSubclass(Class.java:3404) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:693) ... 2 more 10/29/2015 11:15:54 Custom Source -> Flat Map -> Map(2/8) switched to FAILED java.lang.Exception: Could not load the task's invokable class. at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:450) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: class org.apache.flink.streaming.runtime.tasks.SourceStreamTask at java.lang.Class.asSubclass(Class.java:3404) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:693) ... 2 more 10/29/2015 11:15:54 Job execution switched to status FAILING. 10/29/2015 11:15:54 Grouped Aggregation -> (Map -> Stream Sink, Stream Sink)(1/8) switched to FAILED java.lang.Exception: Could not load the task's invokable class. at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:450) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: class org.apache.flink.streaming.runtime.tasks.OneInputStreamTask at java.lang.Class.asSubclass(Class.java:3404) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:693) ... 2 more |
Hi! You probably miss some jars in your classpath. Usually Maven/SBT resolve that automatically, so I assume you are manually constructing the classpath here? For this particular error, you probably miss the "flink-streaming-core" (0.9.1) / "flink-streaming-java" (0.10) in your classpath. I would recommend to use the 0.10 version (the latest release candidate) for your experiments. Stephan On Mon, Nov 2, 2015 at 10:10 AM, Saleh <[hidden email]> wrote: I am trying to run a Java flink streaming job (basic word count with Kafka) |
Hi Stephan,
Thanx for the response. Actually I am using Maven and below are all Flink dependencies I have in my pom.xml file. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>0.9.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-core</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java8</artifactId> <version>0.9.1</version> </dependency> The problem is when I create a jar file using eclipse's export functionality, I get the previously mentioned Could not load the task's invokable class exception. And when I create the jar file with Maven's mvn clean package command, I get different exception shown below: org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'kafkaFlinkWordCount(FlinkWordCount.java:53)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:207) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1336) at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:1029) The error message continues and near to the end there is this kind of message: at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. It seems that your compiler has not stored them into the .class file. Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. See the documentation for more information about how to compile jobs containing lambda expressions. I tried to use flink-0.10-SNAPSHORT as you suggested, but it introduced a syntax error in my source code with this message: The method groupBy(0) os undefined for the type SingleOutputStreamOperation<Tuple2<String, Integer>, capture #1 of ?> |
It seems that Eclipse's export jar functionality is broken. But since maven is working properly, I assume the issue is resolved. For the other error, can you post the source code of the main() method somewhere? There are some hints how to resolve the issue in the exceptions (use of .returns() annotation, missing type parameters). Maybe one of those hints resolves the issues already. On Tue, Nov 3, 2015 at 2:14 PM, Saleh <[hidden email]> wrote: Hi Stephan, |
Free forum by Nabble | Edit this page |