注册udf错误

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

注册udf错误

cxydevelop
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 

Reply | Threaded
Open this post in threaded view
|

回复: 注册udf错误

dixingxing85@163.com
你这个jar是在你本机上的,flink并不会自动分发到taskmanager上, 所以在集群上运行的时候应该会报错。
我们最近也在做自定义udf的事,我们的做法是引入一个jar service,这个jar service会把用户在jar servic 页面上上传的jar同时写到hdfs一份,供作业启动时使用。
我们也想知道有没有更简单的方法来实现。

 
发件人: [hidden email]
发送时间: 2020-04-20 12:05
收件人: [hidden email]
主题: 注册udf错误
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 

Reply | Threaded
Open this post in threaded view
|

Re: 注册udf错误

Jeff Zhang
你们可以参考下flink on zeppelin,可以很方便的在zeppelin里中定义udf,并且使用udf。这里有篇文章可以参考,

也可以加入钉钉群讨论,钉钉群号:30022475




[hidden email] <[hidden email]> 于2020年4月20日周一 下午2:11写道:
你这个jar是在你本机上的,flink并不会自动分发到taskmanager上, 所以在集群上运行的时候应该会报错。
我们最近也在做自定义udf的事,我们的做法是引入一个jar service,这个jar service会把用户在jar servic 页面上上传的jar同时写到hdfs一份,供作业启动时使用。
我们也想知道有没有更简单的方法来实现。

 
发件人: [hidden email]
发送时间: 2020-04-20 12:05
收件人: [hidden email]
主题: 注册udf错误
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: 注册udf错误

Xintong Song
Hi all,

Please be aware that this is the general user mailing list. Discussions should be in English for the convenience of non-Chinese-speaker subscribers. 
For discussions in Chinese, please use the user-zh mailing list [1].

Thank you~

Xintong Song



On Mon, Apr 20, 2020 at 2:18 PM Jeff Zhang <[hidden email]> wrote:
你们可以参考下flink on zeppelin,可以很方便的在zeppelin里中定义udf,并且使用udf。这里有篇文章可以参考,

也可以加入钉钉群讨论,钉钉群号:30022475




[hidden email] <[hidden email]> 于2020年4月20日周一 下午2:11写道:
你这个jar是在你本机上的,flink并不会自动分发到taskmanager上, 所以在集群上运行的时候应该会报错。
我们最近也在做自定义udf的事,我们的做法是引入一个jar service,这个jar service会把用户在jar servic 页面上上传的jar同时写到hdfs一份,供作业启动时使用。
我们也想知道有没有更简单的方法来实现。

 
发件人: [hidden email]
发送时间: 2020-04-20 12:05
收件人: [hidden email]
主题: 注册udf错误
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: 注册udf错误

godfrey he
as dixingxing said,user defined jars does not been uploaded with JobGraph.
You can refer the approach of `CliFrontend#getEffectiveConfiguration `,
Flink client will upload the jar when submitting a job.

you may need change your code as below:
Configuration config = new Configuration();
ExecutionConfigAccessor.fromProgramOptions(checkNotNull(programOptions),checkNotNull(jobJars));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env)
....

Best,
Godfrey

Xintong Song <[hidden email]> 于2020年4月20日周一 下午2:32写道:
Hi all,

Please be aware that this is the general user mailing list. Discussions should be in English for the convenience of non-Chinese-speaker subscribers. 
For discussions in Chinese, please use the user-zh mailing list [1].

Thank you~

Xintong Song



On Mon, Apr 20, 2020 at 2:18 PM Jeff Zhang <[hidden email]> wrote:
你们可以参考下flink on zeppelin,可以很方便的在zeppelin里中定义udf,并且使用udf。这里有篇文章可以参考,

也可以加入钉钉群讨论,钉钉群号:30022475




[hidden email] <[hidden email]> 于2020年4月20日周一 下午2:11写道:
你这个jar是在你本机上的,flink并不会自动分发到taskmanager上, 所以在集群上运行的时候应该会报错。
我们最近也在做自定义udf的事,我们的做法是引入一个jar service,这个jar service会把用户在jar servic 页面上上传的jar同时写到hdfs一份,供作业启动时使用。
我们也想知道有没有更简单的方法来实现。

 
发件人: [hidden email]
发送时间: 2020-04-20 12:05
收件人: [hidden email]
主题: 注册udf错误
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 



--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: 注册udf错误

godfrey he
update example:

" StreamExecutionEnvironment.getExecutionEnvironment(config)" 
=> 
"new StreamExecutionEnvironment(config)"



godfrey he <[hidden email]> 于2020年4月20日周一 下午2:42写道:
as dixingxing said,user defined jars does not been uploaded with JobGraph.
You can refer the approach of `CliFrontend#getEffectiveConfiguration `,
Flink client will upload the jar when submitting a job.

you may need change your code as below:
Configuration config = new Configuration();
ExecutionConfigAccessor.fromProgramOptions(checkNotNull(programOptions),checkNotNull(jobJars));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env)
....

Best,
Godfrey

Xintong Song <[hidden email]> 于2020年4月20日周一 下午2:32写道:
Hi all,

Please be aware that this is the general user mailing list. Discussions should be in English for the convenience of non-Chinese-speaker subscribers. 
For discussions in Chinese, please use the user-zh mailing list [1].

Thank you~

Xintong Song



On Mon, Apr 20, 2020 at 2:18 PM Jeff Zhang <[hidden email]> wrote:
你们可以参考下flink on zeppelin,可以很方便的在zeppelin里中定义udf,并且使用udf。这里有篇文章可以参考,

也可以加入钉钉群讨论,钉钉群号:30022475




[hidden email] <[hidden email]> 于2020年4月20日周一 下午2:11写道:
你这个jar是在你本机上的,flink并不会自动分发到taskmanager上, 所以在集群上运行的时候应该会报错。
我们最近也在做自定义udf的事,我们的做法是引入一个jar service,这个jar service会把用户在jar servic 页面上上传的jar同时写到hdfs一份,供作业启动时使用。
我们也想知道有没有更简单的方法来实现。

 
发件人: [hidden email]
发送时间: 2020-04-20 12:05
收件人: [hidden email]
主题: 注册udf错误
我需要在main函数中手动加载自定义函数的jar包
加载jar包方法:

自定义函数:

使用
LoadJar.loadJar("/usr/local/soft/xxx.jar");
String createSqlSource = "create TABLE source(id varchar , name varchar)with( ... )"
tableEnv.sqlUpdate(createSqlSource);
TableFunction o = (TableFunction)Class.forName("udtf.CxyTestSplit").newInstance();
tableEnv.registerFunction("SplitColunm",o);
Table table = tableEnv.sqlQuery("select * from source ,LATERAL TABLE(SplitColunm(name)) as T(uid , user_name) ");
tableEnv.toAppendStream(table, Row.class).print();

代码是可以在idea开发中运行的 , 但是放到集群上会报错

2020-04-20 03:16:02,561 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more
2020-04-20 03:16:02,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (91369d7e00827850cbaba9e0d77b4bca) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
    at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:58)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 13: Cannot determine simple type name "udtf"
    at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
    at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
    at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
    at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
    at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
    at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
    at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
    at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
    at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
    at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
    at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
    at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
    at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
    at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
    at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
    at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
    at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
    at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
    at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
    at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
    at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
    at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
    at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:76)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:71)
    at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
    ... 13 more


 



--
Best Regards

Jeff Zhang