Hi, guys I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`, my job failed to submit and throws exception like following. This is really weird, cause when I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides, my sql logic is like : INSERT INTO realtime_product_sell 2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception. org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 16 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 19 more Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786) at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412) at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407) at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768) at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410) at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407) at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 25 more
|
Hi fulin, It seems like a bug in the code generation. Could you provide us more information? 1. what planner are you using? blink or legacy planner? 2. how do you register your UDF? 3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf? sunfulin <[hidden email]> 于2020年3月1日周日 下午5:41写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi, Benchao, Thanks for the reply. Could you provide us more information? 1. what planner are you using? blink or legacy planner? I am using Blink Planner. Not test with legacy planner because my program depend a lot of new feature based on blink planner. 2. how do you register your UDF? Just use the code : tableEnv.registerFunction ("ts2Date", new ts2Date()); tableEnv is a StreamTableEnvironment. 3. does this has a relation with checkpointing? what if you enable checkpointing and not use your udf? and disable checkpointing and use udf? I don't think this is related with checkpoint. If I enable checkpointing and not use my udf, I did not see any exception and submit job successfully. If I disable checkpointing and use udf, the job can submit successfully too. I dive a lot with this exception. Maybe it is related with some classloader issue. Hope for your suggestion. 在 2020-03-01 17:54:03,"Benchao Li" <[hidden email]> 写道:
|
Could you show how your UDF `ts2Date` is implemented? sunfulin <[hidden email]> 于2020年3月1日周日 下午6:05写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Below is the code. The function trans origin field timeStr "2020-03-01 12:01:00.234" to target timeStr accroding to dayTag. public class ts2Date extends ScalarFunction { public String eval (String timeStr, boolean dayTag) { if(timeStr == null) { } } At 2020-03-01 18:14:30, "Benchao Li" <[hidden email]> wrote:
|
The UDF looks good. Could you also paste your DDL? Then we can produce your bug easily. sunfulin <[hidden email]> 于2020年3月1日周日 下午6:39写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
CREATE TABLE realtime_product_sell (
|
Could you also provide us the DDL for lscsp_sc_order_all and dim_app_cust_info ? sunfulin <[hidden email]> 于2020年3月1日周日 下午9:22写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
create table lscsp_sc_order_all (
|
Hi fulin, I cannot reproduce your exception on current master using your SQLs. I searched the error message, it seems that this issue[1] is similar with yours, but it seems that current compile util does not have this issue. BTW, do you using 1.10? sunfulin <[hidden email]> 于2020年3月2日周一 上午11:17写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi, Yep, I am using 1.10 Did you submit the job in the cluster or just run it in your IDE? Because I can also run it successfully in my IDE, but cannot run it through cluster by a shading jar. So I think maybe the problem is related with maven jar classpath. But not sure about that. If you can submit the job by a shade jar through cluster , could you share the project pom settings and sample test code ?
|
I just run it in my IDE. sunfulin <[hidden email]> 于2020年3月2日周一 下午9:04写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Free forum by Nabble | Edit this page |