PyFlink cluster runtime issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink cluster runtime issue

Manas Kale
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)
Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime.
What could that be?


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink cluster runtime issue

Xingbo Huang
Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument -j of the command line[2] or the Python Table API to specify the jar. For details about the APIs of adding Java dependency, you can refer to the relevant documentation[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月28日周五 下午9:06写道:
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)
Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime.
What could that be?


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink cluster runtime issue

Manas Kale
Hi Xingbo,
Thanks, that worked. Just to make sure, the only way currently to submit a pyFlink job is through the command line right? Can I do that through the GUI?

On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument -j of the command line[2] or the Python Table API to specify the jar. For details about the APIs of adding Java dependency, you can refer to the relevant documentation[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月28日周五 下午9:06写道:
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)
Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime.
What could that be?


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink cluster runtime issue

Xingbo Huang
Hi Manas,

We can't submit a pyflink job through flink web currently. The only way currently to submit a pyFlink job is through the command line.

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月29日周六 下午12:51写道:
Hi Xingbo,
Thanks, that worked. Just to make sure, the only way currently to submit a pyFlink job is through the command line right? Can I do that through the GUI?

On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument -j of the command line[2] or the Python Table API to specify the jar. For details about the APIs of adding Java dependency, you can refer to the relevant documentation[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月28日周五 下午9:06写道:
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)
Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime.
What could that be?


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink cluster runtime issue

Manas Kale
Ok, thank you!

On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang, <[hidden email]> wrote:
Hi Manas,

We can't submit a pyflink job through flink web currently. The only way currently to submit a pyFlink job is through the command line.

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月29日周六 下午12:51写道:
Hi Xingbo,
Thanks, that worked. Just to make sure, the only way currently to submit a pyFlink job is through the command line right? Can I do that through the GUI?

On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument -j of the command line[2] or the Python Table API to specify the jar. For details about the APIs of adding Java dependency, you can refer to the relevant documentation[3]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年8月28日周五 下午9:06写道:
Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able to run my application without any problems if I execute it as a normal python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command "python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :
CREATE TABLE {INPUT_TABLE} (
monitorId STRING,
deviceId STRING,
state INT,
feature_1 DOUBLE,
feature_2 DOUBLE,
feature_3 DOUBLE,
feature_4 DOUBLE,
feature_5 DOUBLE,
feature_6 DOUBLE,
time_str TIMESTAMP(3),
WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA}',
'format' = 'json'
)
Clearly, even though my standalone pyFlink version and cluster Flink versions are the same, something is different with the cluster runtime.
What could that be?