Why UDF can not be non-static inner class ?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Why UDF can not be non-static inner class ?

Jeff Zhang
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

Timo Walther
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang


Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

Jeff Zhang
Hi Timo,

I add flink-table jar into classpath of scala-shell, then create UDF there.

image.png

Timo Walther <[hidden email]> 于2019年1月7日周一 下午5:34写道:
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang




--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

Stephan Ewen
Is there a reason why Table UDFs should be treated differently from functions in DataStream / DataSet?


On Mon, Jan 7, 2019 at 10:39 AM Jeff Zhang <[hidden email]> wrote:
Hi Timo,

I add flink-table jar into classpath of scala-shell, then create UDF there.

image.png

Timo Walther <[hidden email]> 于2019年1月7日周一 下午5:34写道:
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang




--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

Timo Walther
In theory, we could also allow non-static UDFs. I guess we just made the decision for simplicity. Otherwise it would require to also execute the ClosureCleaner on those functions.


Am 07.01.19 um 10:45 schrieb Stephan Ewen:
Is there a reason why Table UDFs should be treated differently from functions in DataStream / DataSet?


On Mon, Jan 7, 2019 at 10:39 AM Jeff Zhang <[hidden email]> wrote:
Hi Timo,

I add flink-table jar into classpath of scala-shell, then create UDF there.

image.png

Timo Walther <[hidden email]> 于2019年1月7日周一 下午5:34写道:
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang




--
Best Regards

Jeff Zhang


Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

jincheng sun
In reply to this post by Jeff Zhang
Hi Jeff, 
I guess is the scalar inner class name issue, JDK require the start with $ for inner class. So I think you can try to named the UDF with $ prefix.  And  I have had test as follows:

import org.apache.flink.table.functions._
class $MyUDF extends ScalarFunction { def eval(str: String): String = str}
class MyUDF2 extends ScalarFunction { def eval(str: String): String = str}
btenv.registerFunction("myUdf", new $MyUDF())
btenv.registerFunction("myUdf2", new MyUDF2())
val data = Seq("Flink","Bob", "Bob", "something", "Hello", "Flink","Bob")
val source = benv.fromCollection(data).toTable(btenv, 'word) 
source.select("myUdf(word)").toDataSet[Row].print()   ---- work well
source.select("myUdf2(word)").toDataSet[Row].print() ---- throw the exception.

scala> import org.apache.flink.table.functions._
import org.apache.flink.table.functions._
scala> class $MyUDF extends ScalarFunction { def eval(str: String): String = str}
defined class $MyUDF
scala> class MyUDF2 extends ScalarFunction { def eval(str: String): String = str}
defined class MyUDF2
scala> btenv.registerFunction("myUdf", new $MyUDF())
scala> btenv.registerFunction("myUdf2", new MyUDF2())
scala> val data = Seq("Flink","Bob", "Bob", "something", "Hello", "Flink","Bob")
data: Seq[String] = List(Flink, Bob, Bob, something, Hello, Flink, Bob)
scala> val source = benv.fromCollection(data).toTable(btenv, 'word) 
source: org.apache.flink.table.api.Table = UnnamedTable$0
scala> source.select("myUdf(word)").toDataSet[Row].print()
Flink
Bob
Bob
something
Hello
Flink
Bob
scala> source.select("myUdf2(word)").toDataSet[Row].print()
java.lang.InternalError: Malformed class name
  at java.lang.Class.getSimpleName(Class.java:1330)

Hope it helps you, welcome any feedback...

Thanks,
Jincheng 
 

Jeff Zhang <[hidden email]> 于2019年1月7日周一 下午5:39写道:
Hi Timo,

I add flink-table jar into classpath of scala-shell, then create UDF there.



Timo Walther <[hidden email]> 于2019年1月7日周一 下午5:34写道:
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang




--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: Why UDF can not be non-static inner class ?

Jeff Zhang

Thanks jincheng for the workaround,  But I don't think it make sense to force user to always prefix $ for their UDF. 

Based on the above discussion, the only side effect of using non-static UDF is to execute ClosureCleaner, am I right ? Regarding the  serializability and thread-safeness, each slot could still own its own UDF instance even for non-static class. Or do I miss anything here ?


jincheng sun <[hidden email]> 于2019年1月9日周三 上午8:22写道:
Hi Jeff, 
I guess is the scalar inner class name issue, JDK require the start with $ for inner class. So I think you can try to named the UDF with $ prefix.  And  I have had test as follows:

import org.apache.flink.table.functions._
class $MyUDF extends ScalarFunction { def eval(str: String): String = str}
class MyUDF2 extends ScalarFunction { def eval(str: String): String = str}
btenv.registerFunction("myUdf", new $MyUDF())
btenv.registerFunction("myUdf2", new MyUDF2())
val data = Seq("Flink","Bob", "Bob", "something", "Hello", "Flink","Bob")
val source = benv.fromCollection(data).toTable(btenv, 'word) 
source.select("myUdf(word)").toDataSet[Row].print()   ---- work well
source.select("myUdf2(word)").toDataSet[Row].print() ---- throw the exception.

scala> import org.apache.flink.table.functions._
import org.apache.flink.table.functions._
scala> class $MyUDF extends ScalarFunction { def eval(str: String): String = str}
defined class $MyUDF
scala> class MyUDF2 extends ScalarFunction { def eval(str: String): String = str}
defined class MyUDF2
scala> btenv.registerFunction("myUdf", new $MyUDF())
scala> btenv.registerFunction("myUdf2", new MyUDF2())
scala> val data = Seq("Flink","Bob", "Bob", "something", "Hello", "Flink","Bob")
data: Seq[String] = List(Flink, Bob, Bob, something, Hello, Flink, Bob)
scala> val source = benv.fromCollection(data).toTable(btenv, 'word) 
source: org.apache.flink.table.api.Table = UnnamedTable$0
scala> source.select("myUdf(word)").toDataSet[Row].print()
Flink
Bob
Bob
something
Hello
Flink
Bob
scala> source.select("myUdf2(word)").toDataSet[Row].print()
java.lang.InternalError: Malformed class name
  at java.lang.Class.getSimpleName(Class.java:1330)

Hope it helps you, welcome any feedback...

Thanks,
Jincheng 
 

Jeff Zhang <[hidden email]> 于2019年1月7日周一 下午5:39写道:
Hi Timo,

I add flink-table jar into classpath of scala-shell, then create UDF there.

image.png

Timo Walther <[hidden email]> 于2019年1月7日周一 下午5:34写道:
Hi Jeff,

the main reason why UDFs in SQL and also functions in DataSet and DataStream API should be static is serializability and thread-safeness. The classes should be self-contained entities that can be serialized and shipped to the cluster. The instances are duplicated and one instance is used per thread in the task managers.

I'm not very familiar with the Scala shell. How do you declare a UDF there?

Regards,
Timo


Am 07.01.19 um 02:22 schrieb Jeff Zhang:
The reason I ask this is that I try to use UDF in flink scala shell, but I will hit the following error.

image.png

After some investigation, I found it is a scala issue. https://issues.scala-lang.org/browse/SI-2034

The workaround is to add argument ""-Yrepl-class-based" for scala shell. 
But then I hit another error in 

After I remove the code above, I can make everything work,  so I just wonder why UDF can not be non-static inner class ? 

 
--
Best Regards

Jeff Zhang




--
Best Regards

Jeff Zhang


--
Best Regards

Jeff Zhang