Hi, I have been facing issues while trying to read from a hdfs sequence file. This is my code snippet DataSource<Tuple2<Text, Text>> input = env Upon executing this in yarn cluster mode, I am getting following error The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551) flipkart.EnrichementFlink.main(EnrichementFlink.java:31) When I add the TypeInformation myself as follows, I run into the same issue. DataSource<Tuple2<Text, Text>> input = env When I add these libraries in the lib folder, flink-hadoop-compatibility_2.11-1.7.0.jar the error changes to this java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeSerializerSnapshot at org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) at org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97) at org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) Can someone help me resolve this issue? Thanks, Akshay |
Could anyone please help me with this? Thanks, Akshay On Mon, 10 Dec 2018, 6:05 pm Akshay Mendole <[hidden email] wrote:
|
In reply to this post by Akshay Mendole
Hi,
I am a bit confused by the explanation, the exception that you mentioned, is it happening in the first code snippet ( with the TypeInformation.of(…)) or the second one? From looking into the code, I would expect the exception can only happen in the second snippet (without TypeInformation) but I am also wondering what the exception is for the first snippet then, because from the code I think the exception cannot be the same but something different, see: Vs Can you please clarify? I would expect that it should work once you call the method and provide the type info, or else what exactly is the exception there. Best, Stefan
|
Hi Stefen, You are correct. I logged the error messages incorrectly in my previous mail. When I execute this code snippet DataSource<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir)); I got this error The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551) flipkart.EnrichementFlink.main(EnrichementFlink.java:31) When I gave TypeInfomation manually, DataSource<Tuple2<Text, Text>> input = env.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir), I started getting this error message org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129) Caused by: java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2082) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1701) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1643) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:921) When I copied flink-hadoop-compatibility_2.11-1.7.0.jar to flink lib directory and executed, I got this error message java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeSerializerSnapshot at org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) at org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252) at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97) at org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379) at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) Let me know if you need more information. Thanks, Akshay On Tue, Dec 11, 2018 at 4:45 PM Stefan Richter <[hidden email]> wrote:
|
Hi,
In that case, are you sure that your Flink version corresponds to the version of the flink-hadoop-compatibility jar? It seems that you are using Flink 1.7 for the jar and your cluster needs to run that version as well. IIRC, this particular class was introduced with 1.7, so using a different version of other jars would be expected to give you this exception. Best, Stefan
|
Free forum by Nabble | Edit this page |