Flink UDF registration from jar at runtime

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink UDF registration from jar at runtime

Jakub N
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Guowei Ma
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

AW: Flink UDF registration from jar at runtime

Jakub N
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Guowei Ma
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Jakub N
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts. 

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Jakub N
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Guowei Ma
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

AW: Flink UDF registration from jar at runtime

Jakub N
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Guowei Ma
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()

myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Dawid Wysakowicz-2

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Jakub N
In reply to this post by Jakub N
Hi Dawid,

According to your suggestion, given that a I spawn a LocalEnvironment, I tried the following:

val root = new File("custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]

val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
  fsTableEnv.createTemporaryFunction("myFunction", udf)
  fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the environment. (The class is located outside of the classpath and is loaded succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?

Kind regards,

Jakub




Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Dawid Wysakowicz-2

Hey,

I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:


@Test
public void testClassloader() throws IOException, ClassNotFoundException {
   URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
      folder.newFolder(),
      "BoolToInt.java",
      ""
         + "import org.apache.flink.table.functions.ScalarFunction;"
         + "\n"
         + "public class BoolToInt extends ScalarFunction {\n"
         + "\tpublic int eval(boolean b) {\n"
         + "\t\treturn b ? 1 : 0;\n"
         + "\t}\n"
         + "}"
   );

   TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .build());

   Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) functionClassloader.loadClass("BoolToInt");

   try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
      tEnv.createFunction("BoolToInt", boolToInt);
      TableResult tableResult = tEnv.executeSql("SELECT BoolToInt(TRUE)");
      tableResult.print();
   }
}
I verified this runs on the current master. The ClassLoaderUtils is a Flink utility which writes out the provided code and loads it into a classloader. As far as I can tell it mimics your situation pretty well.

Best,
Dawid

On 10/12/2020 20:16, Jakub N wrote:
Hi Dawid,

According to your suggestion, given that a I spawn a LocalEnvironment, I tried the following:

val root = new File("custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]

val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
  fsTableEnv.createTemporaryFunction("myFunction", udf)
  fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the environment. (The class is located outside of the classpath and is loaded succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?

Kind regards,

Jakub




Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Jakub N
In reply to this post by Jakub N
Hi Dawid,

thanks a lot for your help. Unfortunately our issue still persists but with your example we managed to reconstruct our problem in the following code:
File folder = new File("custom");
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
    folder,
    "StringFunc.java",
    ""
        + "import org.apache.flink.table.functions.ScalarFunction;"
        + "\n"
        + "public class StringFunc extends ScalarFunction {\n"
        + "\tpublic String eval(String b) {\n"
        + "\t\treturn b + \" : udf invoked\";\n"
        + "\t}\n"
        + "}"
);

EnvironmentSettings fsSettings = EnvironmentSettings
    .newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

Class<ScalarFunction> stringFunc = (Class<ScalarFunction>) functionClassloader.loadClass("StringFunc");

List<String> data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
    fsTableEnv.createFunction("StringFunc", stringFunc);
    fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

    //Surprisingly the following line can find the StringFunc
    //fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM recipes").print();
}
This leads to a `java.lang.ClassNotFoundException: StringFunc`. As mentioned in the code, the commented line surprisingly works as intended. 
Do you have any ideas on why this the case?

Kind regards,

Jakub



Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 21:32
Bis: Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hey,

I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:


@Test
public void testClassloader() throws IOException, ClassNotFoundException {
   URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
      folder.newFolder(),
      "BoolToInt.java",
      ""
         + "import org.apache.flink.table.functions.ScalarFunction;"
         + "\n"
         + "public class BoolToInt extends ScalarFunction {\n"
         + "\tpublic int eval(boolean b) {\n"
         + "\t\treturn b ? 1 : 0;\n"
         + "\t}\n"
         + "}"
   );

   TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .build());

   Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) functionClassloader.loadClass("BoolToInt");

   try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
      tEnv.createFunction("BoolToInt", boolToInt);
      TableResult tableResult = tEnv.executeSql("SELECT BoolToInt(TRUE)");
      tableResult.print();
   }
}
I verified this runs on the current master. The ClassLoaderUtils is a Flink utility which writes out the provided code and loads it into a classloader. As far as I can tell it mimics your situation pretty well.

Best,
Dawid

On 10/12/2020 20:16, Jakub N wrote:
Hi Dawid,

According to your suggestion, given that a I spawn a LocalEnvironment, I tried the following:

val root = new File("custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]

val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
  fsTableEnv.createTemporaryFunction("myFunction", udf)
  fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the environment. (The class is located outside of the classpath and is loaded succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?

Kind regards,

Jakub




Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Dawid Wysakowicz-2

Hi Jakub,


Sorry for a late reply. I've just came back from the Christmas holidays.


Unfortunately you're right and it's not as easy as I originally thought. Apologies for that. It works when you use a constant because of a constant expressions reduction. So the function is executed before actually submitting the job to the cluster.


It does not work if the function needs to be invoked on the cluster because Tasks use a classloader build from a jar files shipped through the BlobManager. Tasks cannot simply use a thread classloader, because it does not make much sense in case of a distributed setup. What you could try to do with rather a hack is to add the files with generated classes to the cluster. You could try this approach:


        URLClassLoader functionClassloader = ...

        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
            EnvironmentSettings fsSettings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();

            Configuration effectiveConfiguration = new Configuration();
            effectiveConfiguration.set(DeploymentOptions.TARGET, "local");
            effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
            ConfigUtils.encodeCollectionToConfig(
                    effectiveConfiguration,
                    PipelineOptions.CLASSPATHS,
                    Arrays.asList(functionClassloader.getURLs().clone()),
                    URL::toString);
            StreamExecutionEnvironment fsEnv = new StreamExecutionEnvironment(effectiveConfiguration);

            StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

            fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            fsEnv.getConfig().enableObjectReuse();

            List<String> data = new ArrayList<>();
            data.add("cake");
            Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
            table.printSchema();
            fsTableEnv.registerTable("recipes", table);
            fsTableEnv.createFunction("StringFunc", stringFunc);
            fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

            //Surprisingly the following line can find the StringFunc
            //fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM recipes").print();
        }


This manually creates a LocalEnvironment with the URLs added to the cluster classpath. If you use a RemoteEnvironment you could use the ctor that accepts a globalClasspath. Bear in mind it uses lower level APIs, that have no stability guarantees.


Best,

Dawid


On 13/12/2020 21:22, Jakub N wrote:
Hi Dawid,

thanks a lot for your help. Unfortunately our issue still persists but with your example we managed to reconstruct our problem in the following code:
File folder = new File("custom");
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
    folder,
    "StringFunc.java",
    ""
        + "import org.apache.flink.table.functions.ScalarFunction;"
        + "\n"
        + "public class StringFunc extends ScalarFunction {\n"
        + "\tpublic String eval(String b) {\n"
        + "\t\treturn b + \" : udf invoked\";\n"
        + "\t}\n"
        + "}"
);

EnvironmentSettings fsSettings = EnvironmentSettings
    .newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

Class<ScalarFunction> stringFunc = (Class<ScalarFunction>) functionClassloader.loadClass("StringFunc");

List<String> data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
    fsTableEnv.createFunction("StringFunc", stringFunc);
    fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

    //Surprisingly the following line can find the StringFunc
    //fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM recipes").print();
}
This leads to a `java.lang.ClassNotFoundException: StringFunc`. As mentioned in the code, the commented line surprisingly works as intended. 
Do you have any ideas on why this the case?

Kind regards,

Jakub



Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 21:32
Bis: Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hey,

I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:


@Test
public void testClassloader() throws IOException, ClassNotFoundException {
   URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
      folder.newFolder(),
      "BoolToInt.java",
      ""
         + "import org.apache.flink.table.functions.ScalarFunction;"
         + "\n"
         + "public class BoolToInt extends ScalarFunction {\n"
         + "\tpublic int eval(boolean b) {\n"
         + "\t\treturn b ? 1 : 0;\n"
         + "\t}\n"
         + "}"
   );

   TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .build());

   Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) functionClassloader.loadClass("BoolToInt");

   try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
      tEnv.createFunction("BoolToInt", boolToInt);
      TableResult tableResult = tEnv.executeSql("SELECT BoolToInt(TRUE)");
      tableResult.print();
   }
}
I verified this runs on the current master. The ClassLoaderUtils is a Flink utility which writes out the provided code and loads it into a classloader. As far as I can tell it mimics your situation pretty well.

Best,
Dawid

On 10/12/2020 20:16, Jakub N wrote:
Hi Dawid,

According to your suggestion, given that a I spawn a LocalEnvironment, I tried the following:

val root = new File("custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]

val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
  fsTableEnv.createTemporaryFunction("myFunction", udf)
  fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the environment. (The class is located outside of the classpath and is loaded succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?

Kind regards,

Jakub




Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink UDF registration from jar at runtime

Jakub N
In reply to this post by Jakub N
Hi Dawid,

The approach you sent indeed solved our problem. 

You helped me and my colleague tremendously, great thanks.

Kind regards,

Jakub



From: Dawid Wysakowicz
Sent: Tuesday, January 5, 2021 16:57
To: Jakub N
Cc: [hidden email]
Subject: Re: Flink UDF registration from jar at runtime

Hi Jakub,


Sorry for a late reply. I've just came back from the Christmas holidays.


Unfortunately you're right and it's not as easy as I originally thought. Apologies for that. It works when you use a constant because of a constant expressions reduction. So the function is executed before actually submitting the job to the cluster.


It does not work if the function needs to be invoked on the cluster because Tasks use a classloader build from a jar files shipped through the BlobManager. Tasks cannot simply use a thread classloader, because it does not make much sense in case of a distributed setup. What you could try to do with rather a hack is to add the files with generated classes to the cluster. You could try this approach:


        URLClassLoader functionClassloader = ...

        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
            EnvironmentSettings fsSettings = EnvironmentSettings
                    .newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();

            Configuration effectiveConfiguration = new Configuration();
            effectiveConfiguration.set(DeploymentOptions.TARGET, "local");
            effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
            ConfigUtils.encodeCollectionToConfig(
                    effectiveConfiguration,
                    PipelineOptions.CLASSPATHS,
                    Arrays.asList(functionClassloader.getURLs().clone()),
                    URL::toString);
            StreamExecutionEnvironment fsEnv = new StreamExecutionEnvironment(effectiveConfiguration);

            StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

            fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            fsEnv.getConfig().enableObjectReuse();

            List<String> data = new ArrayList<>();
            data.add("cake");
            Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
            table.printSchema();
            fsTableEnv.registerTable("recipes", table);
            fsTableEnv.createFunction("StringFunc", stringFunc);
            fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

            //Surprisingly the following line can find the StringFunc
            //fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM recipes").print();
        }


This manually creates a LocalEnvironment with the URLs added to the cluster classpath. If you use a RemoteEnvironment you could use the ctor that accepts a globalClasspath. Bear in mind it uses lower level APIs, that have no stability guarantees.


Best,

Dawid


On 13/12/2020 21:22, Jakub N wrote:
Hi Dawid,

thanks a lot for your help. Unfortunately our issue still persists but with your example we managed to reconstruct our problem in the following code:
File folder = new File("custom");
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
    folder,
    "StringFunc.java",
    ""
        + "import org.apache.flink.table.functions.ScalarFunction;"
        + "\n"
        + "public class StringFunc extends ScalarFunction {\n"
        + "\tpublic String eval(String b) {\n"
        + "\t\treturn b + \" : udf invoked\";\n"
        + "\t}\n"
        + "}"
);

EnvironmentSettings fsSettings = EnvironmentSettings
    .newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

Class<ScalarFunction> stringFunc = (Class<ScalarFunction>) functionClassloader.loadClass("StringFunc");

List<String> data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
    fsTableEnv.createFunction("StringFunc", stringFunc);
    fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

    //Surprisingly the following line can find the StringFunc
    //fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM recipes").print();
}
This leads to a `java.lang.ClassNotFoundException: StringFunc`. As mentioned in the code, the commented line surprisingly works as intended. 
Do you have any ideas on why this the case?

Kind regards,

Jakub



Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 21:32
Bis: Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hey,

I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:


@Test
public void testClassloader() throws IOException, ClassNotFoundException {
   URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
      folder.newFolder(),
      "BoolToInt.java",
      ""
         + "import org.apache.flink.table.functions.ScalarFunction;"
         + "\n"
         + "public class BoolToInt extends ScalarFunction {\n"
         + "\tpublic int eval(boolean b) {\n"
         + "\t\treturn b ? 1 : 0;\n"
         + "\t}\n"
         + "}"
   );

   TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .build());

   Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) functionClassloader.loadClass("BoolToInt");

   try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
      tEnv.createFunction("BoolToInt", boolToInt);
      TableResult tableResult = tEnv.executeSql("SELECT BoolToInt(TRUE)");
      tableResult.print();
   }
}
I verified this runs on the current master. The ClassLoaderUtils is a Flink utility which writes out the provided code and loads it into a classloader. As far as I can tell it mimics your situation pretty well.

Best,
Dawid

On 10/12/2020 20:16, Jakub N wrote:
Hi Dawid,

According to your suggestion, given that a I spawn a LocalEnvironment, I tried the following:

val root = new File("custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), Thread.currentThread().getContextClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]

val ignored = TemporaryClassLoaderContext.of(classLoader)
try {
  fsTableEnv.createTemporaryFunction("myFunction", udf)
  fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
}
unfortunately this still results in a ClassNotFoundException when executing the environment. (The class is located outside of the classpath and is loaded succesfully, instances of it behave as expected)
Did I possibly missunderstand what you were proposing?

Kind regards,

Jakub




Von: Dawid Wysakowicz
Gesendet: Donnerstag, 10. Dezember 2020 09:59
Bis: Guowei Ma; Jakub N
Cc: [hidden email]
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must be there when compiling the program and when executing on the cluster. As of now the TableEnvironment uses the Thread context classloader as the "user classloader" when compiling the query. Therefore you can do the trick via:

ClassLoader yourClassloader = ... // create your classloader with the UDF

try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(yourClassloader)) {

    fsTableEnv.createTemporaryFunction("myFunction", udf)

    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

}

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when executing. If you are just spawning a LocalEnvironment the above should do the trick as it will use the context classloader. If you are submitting to a cluster, you can submit multiple jars as part of a single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can try to go through a few classes there and see how it is done. I am afraid it's not the easiest task as there are quite a few classes to navigate through. You could start from e.g. org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].


Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

  1. Unfortunately the UDF and the job are not in the same fatjar. Essentially there is only one "fatjar" containing the Flink environment + the job, the UDF is separate. 
  2. Yes,  that is correct.
  3. As explained in 1.  I don't submit job jars to the Flink environment, instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified. 
"Submitting to the environment" works as follows:
  1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2. (Register UDF's)
  3. Create tables
  4. Query on the tables
  5. Execute the environment
The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma <[hidden email]>
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar" 
2. You "new" a UDF object in the job(). 
3. You submit the  "fatjar" to the local Flink environment. 

In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[hidden email]> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the "job's classpath. As I am using a local Flink environment and `sbt pack` (similar to a fatjar) to create launch scripts therefore, to my understanding, I can't access the classpath (when the project is packed). 
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved `myFunction` to the class path where execution starts.

Kind regards,

Jakub 


Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[hidden email]> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):
val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
def execute(): Unit = fsEnv.execute()
myFunction.java
import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

    public String eval(String s) {
        return "myFunction - " + s;
    }

}
Execution works as follows: A QueryCommand instance is created, some properties are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can also have a look at the source code here (https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma <[hidden email]>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N <[hidden email]>
Cc: [hidden email] <[hidden email]>
Betreff: Re: Flink UDF registration from jar at runtime
 
Hi, Jakub
In theory there should not be any problem because you could register the function object.
So would you like to share your code and the shell command that you submit your job? 
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <[hidden email]> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java class file during runtime. What I have tried so far is using Java's Classloader getting an instance of a ScalarFunction (UDF) and registering it in the StreamTableEnvironment. When I try executing a query making use of the UDF I get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when invoking any of its methods.

Do you have any ideas on why this is failing?