from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableConfig, BatchTableEnvironment, DataTypes, StreamTableEnvironment
from pyflink.table.descriptors import Kafka, Json, FileSystem, Schema
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)
INPUT_TOPIC = 'xyz'
INPUT_TABLE = 'raw_message'
PROD_ZOOKEEPER = '...'
PROD_KAFKA = '...'
OUTPUT_TOPIC = 'summary_output'
OUTPUT_TABLE = 'feature_summary'
LOCAL_ZOOKEEPER = 'localhost:2181'
LOCAL_KAFKA = 'localhost:9092'
t_env.connect(
Kafka()
.version('universal')
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)
).register_table_source(INPUT_TABLE)
t_env.connect(Kafka()
.version('universal')
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"{"
" type: 'object',"
" properties: {"
" lon: {"
" type: 'number'"
" },"
" rideTime: {"
" type: 'string',"
" format: 'date-time'"
" }"
" }"
"}"
)).register_table_sink(OUTPUT_TABLE)
t_env.from_path(INPUT_TABLE) \
.insert_into(OUTPUT_TABLE)
t_env.execute('IU pyflink job')
However, I am getting the following exception :
Traceback (most recent call last):
File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
return f(*a, **kw)
File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o32.registerTableSource.
: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:42)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
at org.apache.flink.table.descriptors.SchemaValidator.validate(SchemaValidator.java:90)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getValidatedProperties(KafkaTableSourceSinkFactoryBase.java:269)
at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:158)
at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:53)
... 13 more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 46, in <module>
).register_table_source(INPUT_TABLE)
File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/descriptors.py", line 1295, in register_table_source
self._j_connect_table_descriptor.registerTableSource(name)
File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'findAndCreateTableSource failed.'
The relevant part seems to be Caused by: org.apache.flink.table.api.ValidationException: Could not find the required schema in property 'schema'.
This is probably a basic error, but I can't figure out how I can know what's wrong with the schema. Is the schema not properly declared? Is some field missing?
FWIW I have included the JSON and kafka connector JARs in the required location.
Regards,
Manas