Flink Kafka connector in Python

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Kafka connector in Python

Manas Kale
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Xingbo Huang
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Manas Kale
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Manas Kale
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Xingbo Huang
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
    Kafka()
        .version('0.11')
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", "localhost:2181")
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("lon", DataTypes.DECIMAL(20, 10))
        .field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Manas Kale
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example:
{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors.
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)
Regards,
Manas

On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[hidden email]> wrote:
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
    Kafka()
        .version('0.11')
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", "localhost:2181")
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("lon", DataTypes.DECIMAL(20, 10))
        .field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Manas Kale
I also tried doing this by using a User Defined Function. 
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc.
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))

t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field "data" from the previous mail
.insert_into(OUTPUT_TABLE)

I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example:
{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors.
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)
Regards,
Manas

On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[hidden email]> wrote:
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
    Kafka()
        .version('0.11')
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", "localhost:2181")
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("lon", DataTypes.DECIMAL(20, 10))
        .field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Flink Kafka connector in Python

Xingbo Huang
Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple.
You can use the following statement to import Row : from pyflink.table import Row

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月6日周一 下午8:08写道:
I also tried doing this by using a User Defined Function. 
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc.
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))

t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field "data" from the previous mail
.insert_into(OUTPUT_TABLE)

I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example:
{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors.
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)
Regards,
Manas

On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[hidden email]> wrote:
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
    Kafka()
        .version('0.11')
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", "localhost:2181")
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("lon", DataTypes.DECIMAL(20, 10))
        .field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas