Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi,
I want to consume and write to Kafak from Flink's python API. The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory.
Thanks, Manas |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Manas, Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work. But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html Best, Xingbo Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Xingbo, Thank you for the information, it certainly helps! Regards, Manas On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi, I'm trying to get a simple consumer/producer running using the following code referred from the provided links : from pyflink.dataset import ExecutionEnvironment However, I am getting the following exception : Traceback (most recent call last): The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'. This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing? FWIW I have included the JSON and kafka connector JARs in the required location. Regards, Manas On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi, Manas You need to define the schema. You can refer to the following example: t_env.connect( Kafka() .version('0.11') .topic(INPUT_TOPIC) .property("bootstrap.servers", PROD_KAFKA) .property("zookeeper.connect", "localhost:2181") .start_from_latest() ) \ .with_format( Json() .json_schema( "{" " type: 'object'," " properties: {" " lon: {" " type: 'number'" " }," " rideTime: {" " type: 'string'," " format: 'date-time'" " }" " }" "}" ) ) \ .with_schema( # declare the schema of the table Schema() .field("lon", DataTypes.DECIMAL(20, 10)) .field("rideTime", DataTypes.TIMESTAMP(6)) ).register_table_source(INPUT_TABLE) Best, Xingbo Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Xingbo, Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now. I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example: { "monitorId": 865, "deviceId": "94:54:93:49:96:13", "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}", "state": 2, "time": 1593687809180 } The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors. .with_format( Regards, Manas On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[hidden email]> wrote:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
I also tried doing this by using a User Defined Function. class DataConverter(ScalarFunction): t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it? Thanks! On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[hidden email]> wrote:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Manas, If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple. You can use the following statement to import Row : from pyflink.table import Row Best, Xingbo Manas Kale <[hidden email]> 于2020年7月6日周一 下午8:08写道:
... [show rest of quote] |
Free forum by Nabble | Edit this page |