Hello everyone, I have some questions about the Python API that hopefully folks in the Apache Flink community can help with. A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users. Questions: 1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map? 2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail? Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API: [3] Where MyBigTableSink is just somehow importing a Scala defined sink. More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc. Looking forward to any thoughts! [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks [2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py [3] Plaintext paste of code in screenshot, in case of attachment issues: ``` from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import MyBigTableSink def example(): env = StreamExecutionEnvironment.get_execution_environment() ... ds.add_sink(MyBigTableSink, ...) env.execute("Application with Custom Sink") if __name__ == '__main__': example() ``` |
Hi Kevin, Thank you for your questions. Currently, users are not able to defined custom source/sinks in Python. This is a greate feature that can unify the end to end PyFlink application development in Python and is a large topic that we have no plan to support at present. As you have noticed that `the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks`. These connectors are the extensions of the Python abstract class named `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java source/sink instance and maintain it to enable the interoperation between Python and Java. They can also accept a string of the full name of a Java/Scala defined Source/SinkFunction class and create the corresponding java instance. Bellow are the definition of these classes: class JavaFunctionWrapper(object): Therefore, you are able to defined custom sources/sinks in Scala and apply them in Python. Here is the recommended approach for implementation: class MyBigTableSink(SinkFunction): Remember that you must add the jar of the Scala defined SinkFunction by calling `env.add_jars()` before adding the SinkFunction. And your custom sources/sinks function must be the extension of `SourceFunction` and `SinkFunction`. Any further questions are welcomed! Best, Shuiqiang Kevin Lam <[hidden email]> 于2021年3月3日周三 上午2:50写道:
|
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try. On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen <[hidden email]> wrote:
|
A follow-up question--In the example you provided Shuiqiang, there were no arguments passed to the constructor of the custom sink/source. What's the best way to pass arguments to the constructor? On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam <[hidden email]> wrote:
|
Hi Kevin, Sorry for the late reply. Actually, you are able to pass arguments to the constructor of the Java object when instancing in Python. Basic data types (char/boolean/int/long/float/double/string, etc) can be directory passed while complex types (array/list/map/POJO, etc) must be converted to java objects before passing. Please refer to https://www.py4j.org/py4j_java_collections.html for more information. Best, Shuiqiang Kevin Lam <[hidden email]> 于2021年3月11日周四 上午4:28写道:
|
Free forum by Nabble | Edit this page |