Re: Flink Kafka connector in Python

Posted by Xingbo Huang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-Kafka-connector-in-Python-tp36270p36640.html

Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which extends from python tuple.
You can use the following statement to import Row : from pyflink.table import Row

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月6日周一 下午8:08写道:
I also tried doing this by using a User Defined Function. 
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc.
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))

t_env.from_path(INPUT_TABLE) \
.select("data_converter(data)") \ # <--- here "data" is the field "data" from the previous mail
.insert_into(OUTPUT_TABLE)

I used a ROW to hold multiple values but I can't figure out how I can return a populated ROW object from the eval() method. Where is the method to construct a row/field object and return it?


Thanks!


On Fri, Jul 3, 2020 at 12:40 PM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thanks for the reply, I didn't know that a table schema also needs to be declared after the connect or but I understand now.
I have another question: how do I write the parsing schemas for a field that itself is a valid JSON string? For example:
{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}
The field "data" is a string of valid JSON with string:number objects. I'm currently trying using JSON schema object and DataTypes.ROW, but am getting deserialization errors.
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "object"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("data", DataTypes.ROW())
)
Regards,
Manas

On Thu, Jul 2, 2020 at 6:25 PM Xingbo Huang <[hidden email]> wrote:
Hi, Manas
You need to define the schema. You can refer to the following example:
 t_env.connect(
    Kafka()
        .version('0.11')
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", "localhost:2181")
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        "{"
        "  type: 'object',"
        "  properties: {"
        "    lon: {"
        "      type: 'number'"
        "    },"
        "    rideTime: {"
        "      type: 'string',"
        "      format: 'date-time'"
        "    }"
        "  }"
        "}"
    )
) \
    .with_schema(  # declare the schema of the table
    Schema()
        .field("lon", DataTypes.DECIMAL(20, 10))
        .field("rideTime", DataTypes.TIMESTAMP(6))
).register_table_source(INPUT_TABLE)

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月2日周四 下午7:59写道:
Hi,
I'm trying to get a simple consumer/producer running using the following code referred from the provided links : 
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema

exec_env = StreamExecutionEnvironment.get_execution_environment()

t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'

OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'


t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)

.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)

t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)

t_env.execute('IU pyflink job')
However, I am getting the following exception : 
Traceback (most recent call last):
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
    ).register_table_source(INPUT_TABLE)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
    self._j_connect_table_descriptor.registerTableSource(name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'

The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.

Regards,
Manas

On Tue, Jun 30, 2020 at 11:58 AM Manas Kale <[hidden email]> wrote:
Hi Xingbo,
Thank you for the information, it certainly helps!

Regards,
Manas

On Mon, Jun 29, 2020 at 6:18 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data. Besides, You can refer to the common questions about PyFlink[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年6月29日周一 下午8:10写道:
Hi,
I want to consume and write to Kafak from Flink's python API. 

The only way I found to do this was through this question on SO where the user essentially copies FlinkKafka connector JARs into the Flink runtime's lib/ directory. 
  • Is this the recommended method to do this? If not, what is?
  • Is there any official documentation for using Kafka with pyFlink? Is this officially supported? 
  • How does the method described in the link work? Does the Flink runtime load and expose all JARs in /lib to the python script? Can I write custom operators in Java and use those through python?
Thanks,
Manas