PyFlink :: Bootstrap UDF function

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink :: Bootstrap UDF function

Rinat
Hi mates !

I keep moving in my research of new features of PyFlink and I'm really excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated in the Apache Beam process, but I don't know how it's called by PyFlink in single thread fashion or shared among multiple threads. In other words, I want to know, should I care about synchronization of my bootstrap logic or not.

Here is a code example of my UDF function:
class MyFunction(ScalarFunction):
def __init__(self):
self.initialized = False

def __bootstrap(self):
return "bootstrapped"

def eval(self, urls):
if self.initialized:
self.__bootstrap()
return "my-result"

my_function = udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())

Thx a lot for your help !
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink :: Bootstrap UDF function

Dian Fu
Hi Rinat,

It's called in single thread fashion and so there is no need for the synchronization.

Besides, there is a pair of open/close methods in the ScalarFunction and you could also override them and perform the initialization work in the open method.

Regards,
Dian

在 2020年10月15日,上午3:19,Sharipov, Rinat <[hidden email]> 写道:

Hi mates !

I keep moving in my research of new features of PyFlink and I'm really excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated in the Apache Beam process, but I don't know how it's called by PyFlink in single thread fashion or shared among multiple threads. In other words, I want to know, should I care about synchronization of my bootstrap logic or not.

Here is a code example of my UDF function:
class MyFunction(ScalarFunction):
def __init__(self):
self.initialized = False

def __bootstrap(self):
return "bootstrapped"

def eval(self, urls):
if self.initialized:
self.__bootstrap()
return "my-result"

my_function = udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())

Thx a lot for your help !

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink :: Bootstrap UDF function

Rinat
Hi Dian !

Thx a lot for your reply, it's very helpful for us.



чт, 15 окт. 2020 г. в 04:30, Dian Fu <[hidden email]>:
Hi Rinat,

It's called in single thread fashion and so there is no need for the synchronization.

Besides, there is a pair of open/close methods in the ScalarFunction and you could also override them and perform the initialization work in the open method.

Regards,
Dian

在 2020年10月15日,上午3:19,Sharipov, Rinat <[hidden email]> 写道:

Hi mates !

I keep moving in my research of new features of PyFlink and I'm really excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated in the Apache Beam process, but I don't know how it's called by PyFlink in single thread fashion or shared among multiple threads. In other words, I want to know, should I care about synchronization of my bootstrap logic or not.

Here is a code example of my UDF function:
class MyFunction(ScalarFunction):
def __init__(self):
self.initialized = False

def __bootstrap(self):
return "bootstrapped"

def eval(self, urls):
if self.initialized:
self.__bootstrap()
return "my-result"

my_function = udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())

Thx a lot for your help !