class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""
def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function
def get_java_function(self):
return self._j_function
class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""
def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.
:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)
class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""
def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.
:param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)
def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")
if __name__ == '__main__':
example()
Hello everyone,
I have some questions about the Python API that hopefully folks in the Apache Flink community can help with.
A little background, I’m interested in using the Python Datastream API because of stakeholders who don’t have a background in Scala/Java, and would prefer Python if possible. Our team is open to maintaining Scala constructs on our end, however we are looking to expose Flink for stateful streaming via a Python API to end-users.
Questions:
1/ The docs mention that custom Sources and Sinks cannot be defined in Python, but must be written in Java/Scala [1]. What is the recommended approach for interoperating between custom sinks/sources written in Scala, with the Python API? If nothing is currently supported, is it on the road map?
2/ Also, I’ve noted that the Python DataStream API has several connectors [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is there a way for users to build their own connectors? What would this process entail?
Ideally, we’d like to be able to define custom sources/sinks in Scala and use them in our Python API Flink Applications. For example, defining a BigTable sink in Scala for use in the Python API:[3]Where MyBigTableSink is just somehow importing a Scala defined sink.
More generally, we’re interested in learning more about Scala/Python interoperability in Flink, and how we can expose the power of Flink’s Scala APIs to Python. Open to any suggestions, strategies, etc.
Looking forward to any thoughts!
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
[2] https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py[3] Plaintext paste of code in screenshot, in case of attachment issues:```from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink
def example():
env = StreamExecutionEnvironment.get_execution_environment()
...
ds.add_sink(MyBigTableSink, ...)
env.execute("Application with Custom Sink")
if __name__ == '__main__':
example()
```
Free forum by Nabble | Edit this page |