Hello,
I am trying to deploy a Python job with Kafka connector: from pyflink.datastream import StreamExecutionEnvironment from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json, Csv exec_env = StreamExecutionEnvironment.get_execution_environment() t_config = TableConfig() t_env = StreamTableEnvironment.create(exec_env, t_config) t_env.connect(Kafka() .version("0.11") .topic("my-topic") .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092') ) \ .in_append_mode() \ .with_format(Csv() .line_delimiter("\r\n") \ .derive_schema()) \ .with_schema(Schema() .field("tbd", DataTypes.INT())) \ .register_table_source('mySource') t_env.connect(FileSystem().path('../production_data/kafkaoutput')) \ .with_format(OldCsv() .field('tbd', DataTypes.INT())) \ .with_schema(Schema() .field("tbd", DataTypes.INT())) \ .register_table_sink('mySink') t_env.scan('mySource') \ .select('tbd') \ .where("tbd = 1") \ .insert_into('mySink') t_env.execute("tutorial_job") When I run a deploying command: bin/flink run -py /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar I get an error: Traceback (most recent call last): File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line 9, in <module> t_env = StreamTableEnvironment.create(exec_env, t_config) File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1478, in create File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create. : java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550) at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458) at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209) at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220) at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264) at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299) at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135) at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 39 more org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) What is the correct way to deploy python job on Flink which uses Kafka? It seems like it cannot get a correct dependency of Kafka. I wonder if there is some more simply solution and if it matters that i would like deploy a job on the K8s cluster. Thanks, Wojtek UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi Wojtek, you need to use the fat jar 'flink-sql-connector-kafka_2.11-1.11.0.jar' which you can download in the doc[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html Best, Xingbo Wojciech Korczyński <[hidden email]> 于2020年7月23日周四 下午4:57写道:
|
Thank you for your answer. I have replaced that .jar with Kafka version universal - the links to other versions are extinct. After the attempt of deploying: bin/flink run -py /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py --jarfile /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar there another error occurs: Traceback (most recent call last): File "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", line 20, in <module> .field("tbd", DataTypes.INT())) \ AttributeError: 'StreamTableDescriptor' object has no attribute 'register_table_source' org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Maybe the way the python program is written is incorrect. Can it be deprecated taking into account that the installed flink version is 1.11? Best regards, Wojtek czw., 23 lip 2020 o 12:01 Xingbo Huang <[hidden email]> napisał(a):
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi Wojtek, In flink 1.11, the methods register_table_source and register_table_sink of ConnectTableDescriptor have been removed. You need to use createTemporaryTable instead of these two methods.Besides, it seems that the version of your pyflink is 1.10, but the corresponding flink is 1.11. Best, Xingbo Wojciech Korczyński <[hidden email]> 于2020年7月23日周四 下午9:01写道:
|
Hi, thank you for your answer, it is very helpful. Currently my python program looks like: from pyflink.datastream import StreamExecutionEnvironment I have installed PyFlink 1.11 so the IDE is pointing me out the commands connect, scan, insert_into, execute are deprectade. What is the correct way the program should be different following 1.11 version of PyFlink? Kind regards, Wojtek pt., 24 lip 2020 o 04:21 Xingbo Huang <[hidden email]> napisał(a):
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi Wojtek, The following ways of using Pyflink is my personal recommendation: 1. Use DDL[1] to create your source and sink instead of the descriptor way, because as of flink 1.11, there are some bugs in the descriptor way. 2. Use `execute_sql` for single statement, use `create_statement_set` for multiple DML statements.[2] 3. Use `execute_insert` for single sink, use `TableTableEnvironment#create_statement_set` for multiple sinks 4. Use `from_path` method instead of `scan` method 5. Call the method `get_job_client().get_job_execution_result().result()` of TableResult which is the returned type of execute_insert or execute_sql after calling the method `excute_*` All PyFlink related common questions you can refer to the doc[3] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html Best, Xingbo Wojciech Korczyński <[hidden email]> 于2020年7月24日周五 下午4:44写道:
|
Hi, I've done like you recommended: from pyflink.datastream import StreamExecutionEnvironment I think it is correctly written. However after deploying that job I'm getting an error: wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar I presume that something is incorrect between the Flink version cause I want to deploy that job on Kubernetes. To create a cluster on K8S I used image flink:1.11.0-scala_2.11. Local version of Flink is release-1.11.What can cause that problem? Thanks, Wojtek pt., 24 lip 2020 o 11:32 Xingbo Huang <[hidden email]> napisał(a):
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi Wojciech, In many cases, you can make sure that your code can run correctly in local mode, and then submit the job to the cluster for testing. For how to add jar packages in local mode, you can refer to the doc[1]. Besides, you'd better use blink planner which is the default planner. For how to use blink planner, you can refer to the doc[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment Best, Xingbo Wojciech Korczyński <[hidden email]> 于2020年7月24日周五 下午9:40写道:
|
Hi, when I try it locally it runs well. The problem is when I run it using Kubernetes. I don't know how to make Flink and Kubernetes go well together in that case. Best, Wojtek pt., 24 lip 2020 o 17:51 Xingbo Huang <[hidden email]> napisał(a):
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi,
the InvalidClassException indicates that you are using different versions of the same class. Are you sure you are using the same Flink minor version (including the Scala suffix) for all dependencies and Kubernetes? Regards, Timo On 27.07.20 09:51, Wojciech Korczyński wrote: > Hi, > > when I try it locally it runs well. The problem is when I run it > using Kubernetes. I don't know how to make Flink and Kubernetes go well > together in that case. > > Best, Wojtek > > pt., 24 lip 2020 o 17:51 Xingbo Huang <[hidden email] > <mailto:[hidden email]>> napisał(a): > > Hi Wojciech, > In many cases, you can make sure that your code can run correctly in > local mode, and then submit the job to the cluster for testing. For > how to add jar packages in local mode, you can refer to the doc[1]. > Besides, you'd better use blink planner which is the default > planner. For how to use blink planner, you can refer to the doc[2] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment > > Best, > Xingbo > > Wojciech Korczyński <[hidden email] > <mailto:[hidden email]>> 于2020年7月24日周五 下午 > 9:40写道: > > Hi, > > I've done like you recommended: > > from pyflink.datastreamimport StreamExecutionEnvironment > from pyflink.datasetimport ExecutionEnvironment > from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment, ScalarFunction > from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, Json, Csv > from pyflink.table.udfimport udf > > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_config = TableConfig() > t_env = StreamTableEnvironment.create(exec_env, t_config) > > INPUT_TABLE ="my_topic" > INPUT_TOPIC ="my-topic" > LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092' > OUTPUT_TABLE ="my_topic_output" > OUTPUT_TOPIC ="my-topic-output" > > ddl_source =f""" > CREATE TABLE {INPUT_TABLE}( > message STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > > ddl_sink =f""" > CREATE TABLE {OUTPUT_TABLE}( > message STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{OUTPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > > t_env.execute_sql(ddl_source) > t_env.execute_sql(ddl_sink) > > result = t_env.execute_sql(f""" > INSERT INTO {OUTPUT_TABLE} > SELECT message > FROM {INPUT_TABLE} > """) > > result.get_job_client().get_job_execution_result().result() > > I think it is correctly written. > > However after deploying that job I'm getting an error: > > wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar) to field java.util.Properties.serialVersionUID > WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations > WARNING: All illegal access operations will be denied in a future release > Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa > Traceback (most recent call last): > File "kafka2flink.py", line 62, in <module> > result.get_job_client().get_job_execution_result().result() > File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py", line 78, in result > File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco > File "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o52.get. > : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) > at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116) > at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) > at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) > at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) > at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114) > ... 18 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > *Caused by: java.io.InvalidClassException: > org.apache.flink.table.types.logical.RowType$RowField; local > class incompatible: stream classdesc serialVersionUID = > 3988094341871744603, local class serialVersionUID = > -7902169369767750595 > = -7902169369767750595* > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) > at java.util.ArrayList.readObject(ArrayList.java:799) > at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:613) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1290) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) > ... 8 more > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > I presume that something is incorrect between the > FlinkversioncauseI want to deploy that job on Kubernetes. To > create a clusteron K8S I used image *flink:1.11.0-scala_2.11. *Local version of Flink is *release-1.11*. > > What can cause that problem? > > Thanks, > Wojtek > > pt., 24 lip 2020 o 11:32 Xingbo Huang <[hidden email] > <mailto:[hidden email]>> napisał(a): > > Hi Wojtek, > The following ways of using Pyflink is my personal > recommendation: > > 1. Use DDL[1] to create your source and sink instead of the > descriptor way, because as of flink 1.11, there are some > bugs in the descriptor way. > > 2. Use `execute_sql` for single statement, use > `create_statement_set` for multiple DML statements.[2] > > 3. Use `execute_insert` for single sink, use > `TableTableEnvironment#create_statement_set` for multiple sinks > > 4. Use `from_path` method instead of `scan` method > > 5. Call the method > `get_job_client().get_job_execution_result().result()` of > TableResult which is the returned type of execute_insert or > execute_sql after calling the method `excute_*` > > > All PyFlink related common questions you can refer to the doc[3] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html > > Best, > Xingbo > > Wojciech Korczyński <[hidden email] > <mailto:[hidden email]>> 于2020年7月24日周 > 五 下午4:44写道: > > Hi, > thank you for your answer, it is very helpful. > > Currently my python program looks like: > > from pyflink.datastreamimport StreamExecutionEnvironment > from pyflink.datasetimport ExecutionEnvironment > from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment > from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, Json, Csv > > exec_env = StreamExecutionEnvironment.get_execution_environment() > t_config = TableConfig() > t_env = StreamTableEnvironment.create(exec_env, t_config) > > t_env.connect(Kafka() > .version("universal") > .topic("my-topic") > .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092') > ) \ > .in_append_mode() \ > .with_format(Csv() > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("value", DataTypes.STRING())) \ > .create_temporary_table('mySource') > > t_env.connect(Kafka() > .version("universal") > .topic("my-topic-out") > .property("bootstrap.servers", 'my-cluster-kafka-bootstrap:9092') > ) \ > .with_format(Csv() > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("value", DataTypes.STRING())) \ > .in_append_mode() \ > .create_temporary_table('mySink') > > > t_env.scan('mySource') \ > .select('"flink_job_" + value') \ > .insert_into('mySink') > > t_env.execute("tutorial_job") > > I have installed PyFlink 1.11 so the IDE is pointing me > out the commandsconnect, scan, insert_into, *execute > *are deprectade. What is the correct way the program > should be different following 1.11 version of PyFlink? > > Kind regards, > Wojtek > > > pt., 24 lip 2020 o 04:21 Xingbo Huang > <[hidden email] <mailto:[hidden email]>> napisał(a): > > Hi Wojtek, > In flink 1.11, the methods register_table_source and > register_table_sink of ConnectTableDescriptor have > been removed. You need to use createTemporaryTable > instead of these two methods.Besides, it seems that > the version of your pyflink is 1.10, but the > corresponding flink is 1.11. > > Best, > Xingbo > > Wojciech Korczyński > <[hidden email] > <mailto:[hidden email]>> 于2020年7 > 月23日周四 下午9:01写道: > > Thank you for your answer. > > I have replaced that .jar with Kafka version > universal - the links to other versions are extinct. > > After the attempt of deploying: > bin/flink run -py > /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py > --jarfile > /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar > > there another error occurs: > Traceback (most recent call last): > File > "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", > line 20, in <module> > .field("tbd", DataTypes.INT())) \ > AttributeError: 'StreamTableDescriptor' object > has no attribute 'register_table_source' > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > Maybe the way the python program is written is > incorrect. Can it be deprecated taking into > account that the installed flink version is 1.11? > > Best regards, > Wojtek > > czw., 23 lip 2020 o 12:01 Xingbo Huang > <[hidden email] <mailto:[hidden email]>> > napisał(a): > > Hi Wojtek, > you need to use the fat jar > 'flink-sql-connector-kafka_2.11-1.11.0.jar' > which you can download in the doc[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > > Best, > Xingbo > > Wojciech Korczyński > <[hidden email] > <mailto:[hidden email]>> > 于2020年7月23日周四 下午4:57写道: > > Hello, > > I am trying to deploy a Python job with > Kafka connector: > > from pyflink.datastream import > StreamExecutionEnvironment > from pyflink.dataset import > ExecutionEnvironment > from pyflink.table import TableConfig, > DataTypes, BatchTableEnvironment, > StreamTableEnvironment > from pyflink.table.descriptors import > Schema, OldCsv, FileSystem, Kafka, Json, Csv > > exec_env = > StreamExecutionEnvironment.get_execution_environment() > t_config = TableConfig() > t_env = > StreamTableEnvironment.create(exec_env, > t_config) > > t_env.connect(Kafka() > .version("0.11") > .topic("my-topic") > > .property("bootstrap.servers", > 'my-cluster-kafka-bootstrap:9092') > ) \ > .in_append_mode() \ > .with_format(Csv() > > .line_delimiter("\r\n") \ > .derive_schema()) \ > .with_schema(Schema() > .field("tbd", > DataTypes.INT())) \ > .register_table_source('mySource') > > t_env.connect(FileSystem().path('../production_data/kafkaoutput')) > \ > .with_format(OldCsv() > .field('tbd', > DataTypes.INT())) \ > .with_schema(Schema() > .field("tbd", > DataTypes.INT())) \ > .register_table_sink('mySink') > > t_env.scan('mySource') \ > .select('tbd') \ > .where("tbd = 1") \ > .insert_into('mySink') > > t_env.execute("tutorial_job") > > When I run a deploying command: > bin/flink run -py > /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py > --jarfile > /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar > > > I get an error: > Traceback (most recent call last): > File > "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", > line 9, in <module> > t_env = > StreamTableEnvironment.create(exec_env, > t_config) > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 1478, in create > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error > occurred while calling > z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create. > : java.lang.NoClassDefFoundError: > org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase > at > java.base/java.lang.ClassLoader.defineClass1(Native > Method) > at > java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) > at > java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) > at > java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550) > at > java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458) > at > java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452) > at > java.base/java.security.AccessController.doPrivileged(Native > Method) > at > java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451) > at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > at > java.base/java.lang.Class.forName0(Native Method) > at > java.base/java.lang.Class.forName(Class.java:398) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220) > at > java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264) > at > java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299) > at > java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384) > at > java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) > at > org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) > at > org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) > at > org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) > at > org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135) > at > org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at > java.base/java.lang.Thread.run(Thread.java:834) > Caused by: > java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase > at > java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) > at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > ... 39 more > > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > What is the correct way to deploy python > job on Flink which uses Kafka? It seems > like it cannot get a correct dependency > of Kafka. > > I wonder if there is some more simply > solution and if it matters that i would > like deploy a job on the K8s cluster. > > Thanks, > Wojtek > > > UWAGA - Wiadomość oraz załączone do niej > dokumenty zawierają informacje poufne, które > mogą być również objęte tajemnicą handlową lub > służbową. Jeśli nie jesteś zamierzonym odbiorcą > wiadomości, proszę bezzwłocznie skontaktuj się z > nadawcą oraz usuń wiadomość ze swojego systemu. > Ujawnianie, kopiowanie, rozpowszechnianie czy > publikacja tej wiadomości oraz zawartych w niej > informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, > 50-072 Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego > Rejestru Sądowego, prowadzonego przez Sąd > Rejonowy dla Wrocławia-Fabrycznej VI Wydział > Gospodarczy Krajowego Rejestru Sądowego, NIP: > 8943079568, REGON 364634116.; Kapitał zakładowy: > 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached > thereto contain confidential information, which > may also be a trade secret or confidential. If > you are not the intended recipient of the > message, please contact the sender without delay > and delete the message from your system. > Disclosure, copying, dissemination or > publication of this message and information > contained therein is prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła > Włodkowica 21/3, 50-072 Wrocław, Poland; > Registered under the KRS number 0000621513 to > the National Court Register, kept by the > District Court for Wrocław-Fabryczna VI Economic > Department of the National Court Register, > VAT-ID: PL8943079568, REGON 364634116; Share > capital: PLN 5.000 fully paid-up. > > > UWAGA - Wiadomość oraz załączone do niej dokumenty > zawierają informacje poufne, które mogą być również > objęte tajemnicą handlową lub służbową. Jeśli nie jesteś > zamierzonym odbiorcą wiadomości, proszę bezzwłocznie > skontaktuj się z nadawcą oraz usuń wiadomość ze swojego > systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy > publikacja tej wiadomości oraz zawartych w niej > informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 > Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego Rejestru > Sądowego, prowadzonego przez Sąd Rejonowy dla > Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego > Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; > Kapitał zakładowy: 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached thereto > contain confidential information, which may also be a > trade secret or confidential. If you are not the > intended recipient of the message, please contact the > sender without delay and delete the message from your > system. Disclosure, copying, dissemination or > publication of this message and information contained > therein is prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, > 50-072 Wrocław, Poland; > Registered under the KRS number 0000621513 to the > National Court Register, kept by the District Court for > Wrocław-Fabryczna VI Economic Department of the National > Court Register, VAT-ID: PL8943079568, REGON 364634116; > Share capital: PLN 5.000 fully paid-up. > > > UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają > informacje poufne, które mogą być również objęte tajemnicą > handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą > wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz > usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, > rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w > niej informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego Rejestru > Sądowego, prowadzonego przez Sąd Rejonowy dla > Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru > Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: > 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached thereto contain > confidential information, which may also be a trade secret or > confidential. If you are not the intended recipient of the > message, please contact the sender without delay and delete the > message from your system. Disclosure, copying, dissemination or > publication of this message and information contained therein is > prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 > Wrocław, Poland; > Registered under the KRS number 0000621513 to the National Court > Register, kept by the District Court for Wrocław-Fabryczna VI > Economic Department of the National Court Register, VAT-ID: > PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully > paid-up. > > > UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje > poufne, które mogą być również objęte tajemnicą handlową lub służbową. > Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie > skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. > Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości > oraz zawartych w niej informacji jest zabronione. > > Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, > prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział > Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON > 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. > > NOTE - Message and the documents attached thereto contain confidential > information, which may also be a trade secret or confidential. If you > are not the intended recipient of the message, please contact the sender > without delay and delete the message from your system. Disclosure, > copying, dissemination or publication of this message and information > contained therein is prohibited. > > Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, > Poland; > Registered under the KRS number 0000621513 to the National Court > Register, kept by the District Court for Wrocław-Fabryczna VI Economic > Department of the National Court Register, VAT-ID: PL8943079568, REGON > 364634116; Share capital: PLN 5.000 fully paid-up. |
@Wojtek Just find that it has not defined the serialVersionUID in org.apache.flink.table.types.logical.RowType$RowField and so you have to make sure that the JDK version is the same between the client side and the server side. Could you check that? @Timo I think we should define the serialVersionUID for all the classes which implements Serializable. What do you think? Regards, Dian
|
Hi Dian,
we had this discussion in the past. Yes, it might help in certain cases. But on the other hand also helps in finding version mismatches when people misconfigured there dependencies. Different JVM versions should not result incompatible classes as the default serialVersionUID is standadized, no? Regards, Timo On 27.07.20 10:53, Dian Fu wrote: > @Wojtek Just find that it has not defined the serialVersionUID in > org.apache.flink.table.types.logical.RowType$RowField and so you have to > make sure that the JDK version is the same between the client side and > the server side. Could you check that? > > @Timo I think we should define the serialVersionUID for all the classes > which implements Serializable. What do you think? > > Regards, > Dian > >> 在 2020年7月27日,下午4:38,Timo Walther <[hidden email] >> <mailto:[hidden email]>> 写道: >> >> Hi, >> >> the InvalidClassException indicates that you are using different >> versions of the same class. Are you sure you are using the same Flink >> minor version (including the Scala suffix) for all dependencies and >> Kubernetes? >> >> Regards, >> Timo >> >> >> On 27.07.20 09:51, Wojciech Korczyński wrote: >>> Hi, >>> when I try it locally it runs well. The problem is when I run it >>> using Kubernetes. I don't know how to make Flink and Kubernetes go >>> well together in that case. >>> Best, Wojtek >>> pt., 24 lip 2020 o 17:51 Xingbo Huang <[hidden email] >>> <mailto:[hidden email]><mailto:[hidden email]>> napisał(a): >>> Hi Wojciech, >>> In many cases, you can make sure that your code can run correctly in >>> local mode, and then submit the job to the cluster for testing. For >>> how to add jar packages in local mode, you can refer to the doc[1]. >>> Besides, you'd better use blink planner which is the default >>> planner. For how to use blink planner, you can refer to the doc[2] >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment >>> Best, >>> Xingbo >>> Wojciech Korczyński <[hidden email] >>> <mailto:[hidden email]> >>> <mailto:[hidden email]>> 于2020年7月24日周五 下午 >>> 9:40写道: >>> Hi, >>> I've done like you recommended: >>> from pyflink.datastreamimport StreamExecutionEnvironment >>> from pyflink.datasetimport ExecutionEnvironment >>> from pyflink.tableimport TableConfig, DataTypes, >>> BatchTableEnvironment, StreamTableEnvironment, ScalarFunction >>> from pyflink.table.descriptorsimport Schema, OldCsv, >>> FileSystem, Kafka, Json, Csv >>> from pyflink.table.udfimport udf >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> t_config = TableConfig() >>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>> INPUT_TABLE ="my_topic" >>> INPUT_TOPIC ="my-topic" >>> LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092' >>> OUTPUT_TABLE ="my_topic_output" >>> OUTPUT_TOPIC ="my-topic-output" >>> ddl_source =f""" >>> CREATE TABLE {INPUT_TABLE}( >>> message STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{INPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'format' = 'json' >>> ) >>> """ >>> ddl_sink =f""" >>> CREATE TABLE {OUTPUT_TABLE}( >>> message STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{OUTPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'format' = 'json' >>> ) >>> """ >>> t_env.execute_sql(ddl_source) >>> t_env.execute_sql(ddl_sink) >>> result = t_env.execute_sql(f""" >>> INSERT INTO {OUTPUT_TABLE} >>> SELECT message >>> FROM {INPUT_TABLE} >>> """) >>> result.get_job_client().get_job_execution_result().result() >>> I think it is correctly written. >>> However after deploying that job I'm getting an error: >>> wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ >>> /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 >>> -py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar >>> WARNING: An illegal reflective access operation has occurred >>> WARNING: Illegal reflective access by >>> org.apache.flink.api.java.ClosureCleaner >>> (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar) >>> to field java.util.Properties.serialVersionUID >>> WARNING: Please consider reporting this to the maintainers of >>> org.apache.flink.api.java.ClosureCleaner >>> WARNING: Use --illegal-access=warn to enable warnings of >>> further illegal reflective access operations >>> WARNING: All illegal access operations will be denied in a >>> future release >>> Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa >>> Traceback (most recent call last): >>> File "kafka2flink.py", line 62, in <module> >>> result.get_job_client().get_job_execution_result().result() >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py", >>> line 78, in result >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", >>> line 1286, in __call__ >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", >>> line 147, in deco >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", >>> line 328, in get_return_value >>> py4j.protocol.Py4JJavaError: An error occurred while calling >>> o52.get. >>> : java.util.concurrent.ExecutionException: >>> org.apache.flink.client.program.ProgramInvocationException: Job >>> failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) >>> at >>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) >>> at >>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.base/java.lang.Thread.run(Thread.java:834) >>> Caused by: >>> org.apache.flink.client.program.ProgramInvocationException: Job >>> failed (JobID: d23fe59415e9c9d79d15a1cf7e5409aa) >>> at >>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116) >>> at >>> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) >>> at >>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) >>> at >>> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) >>> at >>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) >>> at >>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) >>> at >>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) >>> at >>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) >>> at >>> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) >>> at >>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) >>> at >>> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) >>> at >>> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) >>> at >>> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) >>> at >>> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) >>> at >>> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) >>> at >>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> ... 1 more >>> Caused by: >>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>> failed. >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>> at >>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114) >>> ... 18 more >>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>> suppressed by NoRestartBackoffTimeStrategy >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) >>> at >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) >>> at >>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) >>> at >>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) >>> at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: >>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot >>> instantiate user function. >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>> at java.lang.Thread.run(Thread.java:748) >>> *Caused by: java.io.InvalidClassException: >>> org.apache.flink.table.types.logical.RowType$RowField; local >>> class incompatible: stream classdesc serialVersionUID = >>> 3988094341871744603, local class serialVersionUID = >>> -7902169369767750595 >>> = -7902169369767750595* >>> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964) >>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) >>> at java.util.ArrayList.readObject(ArrayList.java:799) >>> at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at >>> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:613) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1290) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2257) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >>> at >>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) >>> ... 8 more >>> org.apache.flink.client.program.ProgramAbortException >>> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>> at >>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>> I presume that something is incorrect between the >>> FlinkversioncauseI want to deploy that job on Kubernetes. To >>> create a clusteron K8S I used image *flink:1.11.0-scala_2.11. >>> *Local version of Flink is *release-1.11*. >>> What can cause that problem? >>> Thanks, >>> Wojtek >>> pt., 24 lip 2020 o 11:32 Xingbo Huang <[hidden email] >>> <mailto:[hidden email]> >>> <mailto:[hidden email]>> napisał(a): >>> Hi Wojtek, >>> The following ways of using Pyflink is my personal >>> recommendation: >>> 1. Use DDL[1] to create your source and sink instead of the >>> descriptor way, because as of flink 1.11, there are some >>> bugs in the descriptor way. >>> 2. Use `execute_sql` for single statement, use >>> `create_statement_set` for multiple DML statements.[2] >>> 3. Use `execute_insert` for single sink, use >>> `TableTableEnvironment#create_statement_set` for multiple >>> sinks >>> 4. Use `from_path` method instead of `scan` method >>> 5. Call the method >>> `get_job_client().get_job_execution_result().result()` of >>> TableResult which is the returned type of execute_insert or >>> execute_sql after calling the method `excute_*` >>> All PyFlink related common questions you can refer to the >>> doc[3] >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html >>> Best, >>> Xingbo >>> Wojciech Korczyński <[hidden email] >>> <mailto:[hidden email]> >>> <mailto:[hidden email]>> 于2020年7月24日周 >>> 五 下午4:44写道: >>> Hi, >>> thank you for your answer, it is very helpful. >>> Currently my python program looks like: >>> from pyflink.datastreamimport StreamExecutionEnvironment >>> from pyflink.datasetimport ExecutionEnvironment >>> from pyflink.tableimport TableConfig, DataTypes, >>> BatchTableEnvironment, StreamTableEnvironment >>> from pyflink.table.descriptorsimport Schema, OldCsv, >>> FileSystem, Kafka, Json, Csv >>> exec_env = >>> StreamExecutionEnvironment.get_execution_environment() >>> t_config = TableConfig() >>> t_env = StreamTableEnvironment.create(exec_env, t_config) >>> t_env.connect(Kafka() >>> .version("universal") >>> .topic("my-topic") >>> .property("bootstrap.servers", >>> 'my-cluster-kafka-bootstrap:9092') >>> ) \ >>> .in_append_mode() \ >>> .with_format(Csv() >>> .line_delimiter("\r\n") \ >>> .derive_schema()) \ >>> .with_schema(Schema() >>> .field("value", DataTypes.STRING())) \ >>> .create_temporary_table('mySource') >>> t_env.connect(Kafka() >>> .version("universal") >>> .topic("my-topic-out") >>> .property("bootstrap.servers", >>> 'my-cluster-kafka-bootstrap:9092') >>> ) \ >>> .with_format(Csv() >>> .line_delimiter("\r\n") \ >>> .derive_schema()) \ >>> .with_schema(Schema() >>> .field("value", DataTypes.STRING())) \ >>> .in_append_mode() \ >>> .create_temporary_table('mySink') >>> t_env.scan('mySource') \ >>> .select('"flink_job_" + value') \ >>> .insert_into('mySink') >>> t_env.execute("tutorial_job") >>> I have installed PyFlink 1.11 so the IDE is pointing me >>> out the commandsconnect, scan, insert_into, *execute >>> *are deprectade. What is the correct way the program >>> should be different following 1.11 version of PyFlink? >>> Kind regards, >>> Wojtek >>> pt., 24 lip 2020 o 04:21 Xingbo Huang >>> <[hidden email] >>> <mailto:[hidden email]><mailto:[hidden email]>> napisał(a): >>> Hi Wojtek, >>> In flink 1.11, the methods register_table_source and >>> register_table_sink of ConnectTableDescriptor have >>> been removed. You need to use createTemporaryTable >>> instead of these two methods.Besides, it seems that >>> the version of your pyflink is 1.10, but the >>> corresponding flink is 1.11. >>> Best, >>> Xingbo >>> Wojciech Korczyński >>> <[hidden email] >>> <mailto:[hidden email]> >>> <mailto:[hidden email]>> 于2020年7 >>> 月23日周四 下午9:01写道: >>> Thank you for your answer. >>> I have replaced that .jar with Kafka version >>> universal - the links to other versions are >>> extinct. >>> After the attempt of deploying: >>> bin/flink run -py >>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py >>> --jarfile >>> /home/wojtek/Downloads/flink-sql-connector-kafka_2.11-1.11.0.jar >>> there another error occurs: >>> Traceback (most recent call last): >>> File >>> "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", >>> line 20, in <module> >>> .field("tbd", DataTypes.INT())) \ >>> AttributeError: 'StreamTableDescriptor' object >>> has no attribute 'register_table_source' >>> org.apache.flink.client.program.ProgramAbortException >>> at >>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at >>> java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>> at >>> >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>> at >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>> at >>> >>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>> Maybe the way the python program is written is >>> incorrect. Can it be deprecated taking into >>> account that the installed flink version is 1.11? >>> Best regards, >>> Wojtek >>> czw., 23 lip 2020 o 12:01 Xingbo Huang >>> <[hidden email] >>> <mailto:[hidden email]><mailto:[hidden email]>> >>> napisał(a): >>> Hi Wojtek, >>> you need to use the fat jar >>> 'flink-sql-connector-kafka_2.11-1.11.0.jar' >>> which you can download in the doc[1] >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html >>> Best, >>> Xingbo >>> Wojciech Korczyński >>> <[hidden email] >>> <mailto:[hidden email]> >>> <mailto:[hidden email]>> >>> 于2020年7月23日周四 下午4:57写道: >>> Hello, >>> I am trying to deploy a Python job with >>> Kafka connector: >>> from pyflink.datastream import >>> StreamExecutionEnvironment >>> from pyflink.dataset import >>> ExecutionEnvironment >>> from pyflink.table import TableConfig, >>> DataTypes, BatchTableEnvironment, >>> StreamTableEnvironment >>> from pyflink.table.descriptors import >>> Schema, OldCsv, FileSystem, Kafka, >>> Json, Csv >>> exec_env = >>> StreamExecutionEnvironment.get_execution_environment() >>> t_config = TableConfig() >>> t_env = >>> StreamTableEnvironment.create(exec_env, >>> t_config) >>> t_env.connect(Kafka() >>> .version("0.11") >>> .topic("my-topic") >>> .property("bootstrap.servers", >>> 'my-cluster-kafka-bootstrap:9092') >>> ) \ >>> .in_append_mode() \ >>> .with_format(Csv() >>> .line_delimiter("\r\n") >>> \ >>> .derive_schema()) \ >>> .with_schema(Schema() >>> .field("tbd", >>> DataTypes.INT())) \ >>> .register_table_source('mySource') >>> t_env.connect(FileSystem().path('../production_data/kafkaoutput')) >>> \ >>> .with_format(OldCsv() >>> .field('tbd', >>> DataTypes.INT())) \ >>> .with_schema(Schema() >>> .field("tbd", >>> DataTypes.INT())) \ >>> .register_table_sink('mySink') >>> t_env.scan('mySource') \ >>> .select('tbd') \ >>> .where("tbd = 1") \ >>> .insert_into('mySink') >>> t_env.execute("tutorial_job") >>> When I run a deploying command: >>> bin/flink run -py >>> /home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py >>> --jarfile >>> /home/wojtek/flink/flink-connectors/flink-connector-kafka-0.11/target/flink-connector-kafka-0.11_2.11-1.11-SNAPSHOT.jar >>> I get an error: >>> Traceback (most recent call last): >>> File >>> "/home/wojtek/PycharmProjects/k8s_kafka_demo/kafka2flink.py", >>> line 9, in <module> >>> t_env = >>> StreamTableEnvironment.create(exec_env, >>> t_config) >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/table/table_environment.py", >>> line 1478, in create >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", >>> line 1286, in __call__ >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py", >>> line 147, in deco >>> File >>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", >>> line 328, in get_return_value >>> py4j.protocol.Py4JJavaError: An error >>> occurred while calling >>> z:org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create. >>> : java.lang.NoClassDefFoundError: >>> org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase >>> at >>> java.base/java.lang.ClassLoader.defineClass1(Native >>> Method) >>> at >>> java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) >>> at >>> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174) >>> at >>> java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:550) >>> at >>> java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458) >>> at >>> java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452) >>> at >>> java.base/java.security.AccessController.doPrivileged(Native >>> Method) >>> at >>> java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451) >>> at >>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>> at >>> >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>> at >>> >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>> at >>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>> at >>> java.base/java.lang.Class.forName0(Native >>> Method) >>> at >>> java.base/java.lang.Class.forName(Class.java:398) >>> at >>> >>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1209) >>> at >>> >>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1220) >>> at >>> >>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1264) >>> at >>> java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1299) >>> at >>> java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1384) >>> at >>> java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) >>> at >>> >>> org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) >>> at >>> org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) >>> at >>> org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) >>> at >>> org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) >>> at >>> >>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:158) >>> at >>> >>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:135) >>> at >>> >>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:143) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at >>> java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at >>> java.base/java.lang.Thread.run(Thread.java:834) >>> Caused by: >>> java.lang.ClassNotFoundException: >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase >>> at >>> java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) >>> at >>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>> at >>> >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>> at >>> >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65) >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>> at >>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>> ... 39 more >>> org.apache.flink.client.program.ProgramAbortException >>> at >>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at >>> java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>> at >>> >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>> at >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >>> at >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >>> at >>> >>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >>> What is the correct way to deploy python >>> job on Flink which uses Kafka? It seems >>> like it cannot get a correct dependency >>> of Kafka. >>> I wonder if there is some more simply >>> solution and if it matters that i would >>> like deploy a job on the K8s cluster. >>> Thanks, >>> Wojtek >>> UWAGA - Wiadomość oraz załączone do niej >>> dokumenty zawierają informacje poufne, które >>> mogą być również objęte tajemnicą handlową lub >>> służbową. Jeśli nie jesteś zamierzonym odbiorcą >>> wiadomości, proszę bezzwłocznie skontaktuj się z >>> nadawcą oraz usuń wiadomość ze swojego systemu. >>> Ujawnianie, kopiowanie, rozpowszechnianie czy >>> publikacja tej wiadomości oraz zawartych w niej >>> informacji jest zabronione. >>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, >>> 50-072 Wrocław, >>> wpisana pod numerem KRS 0000621513 do Krajowego >>> Rejestru Sądowego, prowadzonego przez Sąd >>> Rejonowy dla Wrocławia-Fabrycznej VI Wydział >>> Gospodarczy Krajowego Rejestru Sądowego, NIP: >>> 8943079568, REGON 364634116.; Kapitał zakładowy: >>> 5.000 PLN w pełni opłacony. >>> NOTE - Message and the documents attached >>> thereto contain confidential information, which >>> may also be a trade secret or confidential. If >>> you are not the intended recipient of the >>> message, please contact the sender without delay >>> and delete the message from your system. >>> Disclosure, copying, dissemination or >>> publication of this message and information >>> contained therein is prohibited. >>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła >>> Włodkowica 21/3, 50-072 Wrocław, Poland; >>> Registered under the KRS number 0000621513 to >>> the National Court Register, kept by the >>> District Court for Wrocław-Fabryczna VI Economic >>> Department of the National Court Register, >>> VAT-ID: PL8943079568, REGON 364634116; Share >>> capital: PLN 5.000 fully paid-up. >>> UWAGA - Wiadomość oraz załączone do niej dokumenty >>> zawierają informacje poufne, które mogą być również >>> objęte tajemnicą handlową lub służbową. Jeśli nie jesteś >>> zamierzonym odbiorcą wiadomości, proszę bezzwłocznie >>> skontaktuj się z nadawcą oraz usuń wiadomość ze swojego >>> systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy >>> publikacja tej wiadomości oraz zawartych w niej >>> informacji jest zabronione. >>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 >>> Wrocław, >>> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru >>> Sądowego, prowadzonego przez Sąd Rejonowy dla >>> Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego >>> Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; >>> Kapitał zakładowy: 5.000 PLN w pełni opłacony. >>> NOTE - Message and the documents attached thereto >>> contain confidential information, which may also be a >>> trade secret or confidential. If you are not the >>> intended recipient of the message, please contact the >>> sender without delay and delete the message from your >>> system. Disclosure, copying, dissemination or >>> publication of this message and information contained >>> therein is prohibited. >>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, >>> 50-072 Wrocław, Poland; >>> Registered under the KRS number 0000621513 to the >>> National Court Register, kept by the District Court for >>> Wrocław-Fabryczna VI Economic Department of the National >>> Court Register, VAT-ID: PL8943079568, REGON 364634116; >>> Share capital: PLN 5.000 fully paid-up. >>> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają >>> informacje poufne, które mogą być również objęte tajemnicą >>> handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą >>> wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz >>> usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, >>> rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w >>> niej informacji jest zabronione. >>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, >>> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru >>> Sądowego, prowadzonego przez Sąd Rejonowy dla >>> Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru >>> Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: >>> 5.000 PLN w pełni opłacony. >>> NOTE - Message and the documents attached thereto contain >>> confidential information, which may also be a trade secret or >>> confidential. If you are not the intended recipient of the >>> message, please contact the sender without delay and delete the >>> message from your system. Disclosure, copying, dissemination or >>> publication of this message and information contained therein is >>> prohibited. >>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 >>> Wrocław, Poland; >>> Registered under the KRS number 0000621513 to the National Court >>> Register, kept by the District Court for Wrocław-Fabryczna VI >>> Economic Department of the National Court Register, VAT-ID: >>> PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully >>> paid-up. >>> UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają >>> informacje poufne, które mogą być również objęte tajemnicą handlową >>> lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, >>> proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze >>> swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy >>> publikacja tej wiadomości oraz zawartych w niej informacji jest >>> zabronione. >>> Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, >>> wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, >>> prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział >>> Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON >>> 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. >>> NOTE - Message and the documents attached thereto contain >>> confidential information, which may also be a trade secret or >>> confidential. If you are not the intended recipient of the message, >>> please contact the sender without delay and delete the message from >>> your system. Disclosure, copying, dissemination or publication of >>> this message and information contained therein is prohibited. >>> Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 >>> Wrocław, Poland; >>> Registered under the KRS number 0000621513 to the National Court >>> Register, kept by the District Court for Wrocław-Fabryczna VI >>> Economic Department of the National Court Register, VAT-ID: >>> PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up. > |
Thank you for your help! I downloaded Apache Flink 1.11.1 for Scala 2.11 and used flink:1.11.1-scala_2.11 docker hub image for Kubernetes deployments (jobmanager, taskmanager) and now it started working on Kubernetes. So the exact versions was the issue. Now my Python program is: from pyflink.datastream import StreamExecutionEnvironment I want to add UDF so I added the snippet: @udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING()) and changed the result code result = t_env.execute_sql(f""" pon., 27 lip 2020 o 11:15 Timo Walther <[hidden email]> napisał(a): Hi Dian, UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Hi Wojciech, The python udf code will be executed in Python Process. So you need to make sure that python 3 (3.5, 3.6 or 3.7) and PyFlink have been installed in your execution environment[1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/installation.html#environment-requirements Best, Xingbo Wojciech Korczyński <[hidden email]> 于2020年7月27日周一 下午6:37写道:
|
Hi Timo,
Thanks a lot for the reply. Got your point and it sounds reasonable for me! According to Java(TM) Object Serialization Specification [1], it defines how to calculate the default serialVersionUID. So I guess that different JVM should got the same default serialVersionUID given the same class file. However, it also has some side effect if relying on the generated serialVersionUID. We may lose the compatibilities in some cases it is possible to keep the compatibilities if we maintain the serialVersionUID ourselves. Both ways have their pros and cons. Thanks, Dian
|
In reply to this post by Xingbo Huang
Hi Xingbo, yes, you are right. I had to create custom docker image: FROM flink:1.11.1-scala_2.11 Now everything works. Thanks a lot! Best, Wojtek pon., 27 lip 2020 o 13:32 Xingbo Huang <[hidden email]> napisał(a):
UWAGA - Wiadomość oraz załączone do niej dokumenty zawierają informacje poufne, które mogą być również objęte tajemnicą handlową lub służbową. Jeśli nie jesteś zamierzonym odbiorcą wiadomości, proszę bezzwłocznie skontaktuj się z nadawcą oraz usuń wiadomość ze swojego systemu. Ujawnianie, kopiowanie, rozpowszechnianie czy publikacja tej wiadomości oraz zawartych w niej informacji jest zabronione. Alphamoon Sp. z o.o., ul. Pawła Włodkowica 21/3, 50-072 Wrocław, wpisana pod numerem KRS 0000621513 do Krajowego Rejestru Sądowego, prowadzonego przez Sąd Rejonowy dla Wrocławia-Fabrycznej VI Wydział Gospodarczy Krajowego Rejestru Sądowego, NIP: 8943079568, REGON 364634116.; Kapitał zakładowy: 5.000 PLN w pełni opłacony. NOTE - Message and the documents attached thereto contain confidential information, which may also be a trade secret or confidential. If you are not the intended recipient of the message, please contact the sender without delay and delete the message from your system. Disclosure, copying, dissemination or publication of this message and information contained therein is prohibited. Alphamoon Sp. z o.o. (Ltd.), ul. Pawła Włodkowica 21/3, 50-072 Wrocław, Poland; Registered under the KRS number 0000621513 to the National Court Register, kept by the District Court for Wrocław-Fabryczna VI Economic Department of the National Court Register, VAT-ID: PL8943079568, REGON 364634116; Share capital: PLN 5.000 fully paid-up.
|
Free forum by Nabble | Edit this page |