I'm trying to read data from my eventhub in Azure, but i end up with the
Flink error message 'findAndCreateTableSource failed' using Flink 1.13-Snapshot source_ddl = f"""CREATE TABLE dms_source( x_value VARCHAR ) WITH ( 'connector.type' = 'Kafka', 'connector.version' = 'universal', 'connector.partition' = '0', 'connector.sasl.jaas.config'= 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://**EVEN_HUB_NAME**.servicebus.windows.net/;SharedAccessKeyName=**KEY_ NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";', 'connector.sasl.mechanism' = 'PLAIN', 'connector.security.protocol' = 'SASL_SSL', 'connector.properties.bootstrap.servers' = '**EVEN_HUB_NAME**.servicebus.windows.net:9093', 'connector.properties.group.id' = '$Default', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json') """ Any tips on how to debug this? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Could you provide the exception stack trace? Regards,
Roman On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal <[hidden email]> wrote: I'm trying to read data from my eventhub in Azure, but i end up with the |
Traceback (most recent call last):
File "streaming-dms.py", line 309, in <module> anomalies() File "streaming-dms.py", line 142, in anomalies t_env.sql_query(query).insert_into("ark_sink") File "/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 748, in sql_query j_table = self._j_tenv.sqlQuery(query) File "/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco raise java_exception pyflink.util.exceptions.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49) at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193) at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Have you also include the kakfa-connector related jar in the classpath? Best, Yun
|
My JAR files included in the same folder i run the python code:
flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR kafka-clients-2.7.0.JAR -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, Could you have a try to add the jar via python configuration explicitly? It might refer to [1]. Best, Yun [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program ------------------------------------------------------------------ |
hi Yun,
thanks for the help! if i direct the Kafka connector in the DDL to a local Kafka cluster, it works fine. So i assume access to the JAR files should not be the issue. This is how i referred to the JAR files from Python: t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar") All the best, Joris -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Joris, Are you sure that all nodes have access to these jars? Usually, shared resources reside on some kind of distributed file system or network directory. I'm pulling in Dian who can probably help better. Best, Arvid On Tue, Feb 9, 2021 at 1:50 PM joris.vanagtmaal <[hidden email]> wrote: hi Yun, |
Hi Arvid,
I'm currently running PyFlink locally in the JVM with a parallelism of 1, and the same file works fine if i direct it to a Kafka cluster (running in a local docker instance). I assumed that the JAR pipeline definition in the python file would make sure they are made available on the cluster (or in this case the local JVM), right? But if i need to verify the JVM has access to the JAR files, i need a bit of guidance how to do that. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |