Python DataStream API Questions -- Java/Scala Interoperability?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Python DataStream API Questions -- Java/Scala Interoperability?

Kevin Lam
Hello everyone,

I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?

2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.  

Looking forward to any thoughts!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```
Reply | Threaded
Open this post in threaded view
|

Re: Python DataStream API Questions -- Java/Scala Interoperability?

Shuiqiang Chen
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. 

As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java.  They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes:
class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function


class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation:
class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()
Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang 
  

Kevin Lam <[hidden email]> 于2021年3月3日周三 上午2:50写道:
Hello everyone,

I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?

2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.  

Looking forward to any thoughts!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```
Reply | Threaded
Open this post in threaded view
|

Re: Python DataStream API Questions -- Java/Scala Interoperability?

Kevin Lam
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. 

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <[hidden email]> wrote:
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. 

As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java.  They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes:
class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function


class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation:
class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()
Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang 
  

Kevin Lam <[hidden email]> 于2021年3月3日周三 上午2:50写道:
Hello everyone,

I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?

2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.  

Looking forward to any thoughts!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```
Reply | Threaded
Open this post in threaded view
|

Re: Python DataStream API Questions -- Java/Scala Interoperability?

Kevin Lam
A follow-up question--In the example you provided Shuiqiang, there were no arguments passed to the constructor of the custom sink/source.

What's the best way to pass arguments to the constructor?

On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <[hidden email]> wrote:
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. 

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <[hidden email]> wrote:
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. 

As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java.  They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes:
class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function


class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation:
class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()
Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang 
  

Kevin Lam <[hidden email]> 于2021年3月3日周三 上午2:50写道:
Hello everyone,

I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?

2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.  

Looking forward to any thoughts!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```
Reply | Threaded
Open this post in threaded view
|

Re: Python DataStream API Questions -- Java/Scala Interoperability?

Shuiqiang Chen
Hi Kevin,

Sorry for the late reply.

Actually, you are able to pass arguments to the constructor of the Java object when instancing in Python. Basic data types (char/boolean/int/long/float/double/string, etc) can be directory passed while complex types (array/list/map/POJO, etc) must be converted to java objects before passing. Please refer to https://www.py4j.org/py4j_java_collections.html for more information.

Best,
Shuiqiang

Kevin Lam <[hidden email]> 于2021年3月11日周四 上午4:28写道:
A follow-up question--In the example you provided Shuiqiang, there were no arguments passed to the constructor of the custom sink/source.

What's the best way to pass arguments to the constructor?

On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <[hidden email]> wrote:
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. 

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <[hidden email]> wrote:
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. 

As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java.  They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes:
class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function


class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation:
class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()
Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang 
  

Kevin Lam <[hidden email]> 于2021年3月3日周三 上午2:50写道:
Hello everyone,

I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?

2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.  

Looking forward to any thoughts!


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    ...
    ds.add_sink(MyBigTableSink, ...)
    env.execute("Application with Custom Sink")

if __name__ == '__main__':
    example()
```