|
Hi Manas, If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple. You can use the following statement to import Row : from pyflink.table import Row
Best, Xingbo I also tried doing this by using a User Defined Function. class DataConverter(ScalarFunction): def eval(self, str_data): data = json.loads(str_data) return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc. t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()], result_type = DataTypes.ROW([ DataTypes.FIELD("feature1", DataTypes.STRING()) ]))) t_env.from_path(INPUT_TABLE) \ .select("data_converter(data)") \ # <--- here "data" is the field "data" from the previous mail .insert_into(OUTPUT_TABLE)
I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it?
Thanks!
Hi Xingbo, Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now. I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example: { "monitorId": 865, "deviceId": "94:54:93:49:96:13", "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}", "state": 2, "time": 1593687809180 }
The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors. .with_format( Json() .json_schema( """ { "type": "object", "properties": { "monitorId": { "type": "string" }, "deviceId": { "type": "string" }, "data": { "type": "object" }, "state": { "type": "integer" }, "time": { "type": "string" } } } """ ) ) \ .with_schema( Schema() .field("monitorId", DataTypes.STRING()) .field("deviceId", DataTypes.STRING()) .field("data", DataTypes.ROW()) ) Regards, Manas
Hi, Manas You need to define the schema. You can refer to the following example:
t_env.connect( Kafka() .version('0.11') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA) .property("zookeeper.connect", "localhost:2181") .start_from_latest() ) \ .with_format( Json() .json_schema( "{" " type: 'object'," " properties: {" " lon: {" " type: 'number'" " }," " rideTime: {" " type: 'string'," " format: 'date-time'" " }" " }" "}" ) ) \ .with_schema( # declare the schema of the table Schema() .field("lon", DataTypes.DECIMAL(20, 10)) .field("rideTime", DataTypes.TIMESTAMP(6)) ).register_table_source(INPUT_TABLE)
Best, Xingbo
Hi, I'm trying to get a simple consumer/producer running using the following code referred from the provided links : from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig() t_env = StreamTableEnvironment.create(exec_env, t_config)
INPUT_TOPIC = 'xyz' INPUT_TABLE = 'raw_message' PROD_ZOOKEEPER = '...' PROD_KAFKA = '...'
OUTPUT_TOPIC = 'summary_output' OUTPUT_TABLE = 'feature_summary' LOCAL_ZOOKEEPER = 'localhost:2181' LOCAL_KAFKA = 'localhost:9092'
t_env.connect( Kafka() .version('universal') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA)
.start_from_latest() ) \ .with_format( Json() .json_schema( "{" " type: 'object'," " properties: {" " lon: {" " type: 'number'" " }," " rideTime: {" " type: 'string'," " format: 'date-time'" " }" " }" "}" ) ).register_table_source(INPUT_TABLE)
t_env.connect(Kafka() .version('universal') .topic(OUTPUT_TOPIC) .property("bootstrap.servers", LOCAL_KAFKA)
.start_from_latest() ) \ .with_format( Json() .json_schema( "{" " type: 'object'," " properties: {" " lon: {" " type: 'number'" " }," " rideTime: {" " type: 'string'," " format: 'date-time'" " }" " }" "}" )).register_table_sink(OUTPUT_TABLE)
t_env.from_path(INPUT_TABLE) \ .insert_into(OUTPUT_TABLE)
t_env.execute('IU pyflink job')
However, I am getting the following exception : Traceback (most recent call last): File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource. : org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'. at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53) ... 13 more
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module> ).register_table_source(INPUT_TABLE) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source self._j_connect_table_descriptor.registerTableSource(name) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'. This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing? FWIW I have included the JSON and kafka connector JARs in the required location.
Regards,
Manas
Hi Xingbo, Thank you for the information, it certainly helps!
Regards, Manas
Hi, I want to consume and write to Kafak from Flink's python API.
The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. - Is this the recommended method to do this? If not, what is?
- Is there any official documentation for using Kafka with pyFlink? Is this officially supported?
- How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks, Manas
|