Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

joris.vanagtmaal
I'm trying to read data from my eventhub in Azure, but i end up with the
Flink error message 'findAndCreateTableSource failed'

using Flink 1.13-Snapshot

source_ddl = f"""CREATE TABLE dms_source(
                                x_value VARCHAR
                         ) WITH (
                          'connector.type' = 'Kafka',
                          'connector.version' = 'universal',
                          'connector.partition' = '0',
                          'connector.sasl.jaas.config'=
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://**EVEN_HUB_NAME**.servicebus.windows.net/;SharedAccessKeyName=**KEY_
NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
                          'connector.sasl.mechanism' = 'PLAIN',
                          'connector.security.protocol' = 'SASL_SSL',
                          'connector.properties.bootstrap.servers' =
'**EVEN_HUB_NAME**.servicebus.windows.net:9093',
                          'connector.properties.group.id' = '$Default',
                          'connector.startup-mode' = 'latest-offset',
                          'format.type' = 'json')
                        """

Any tips on how to debug this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

r_khachatryan
Hi,

Could you provide the exception stack trace?

Regards,
Roman


On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal <[hidden email]> wrote:
I'm trying to read data from my eventhub in Azure, but i end up with the
Flink error message 'findAndCreateTableSource failed'

using Flink 1.13-Snapshot

source_ddl = f"""CREATE TABLE dms_source(
                                x_value VARCHAR
                         ) WITH (
                          'connector.type' = 'Kafka',
                          'connector.version' = 'universal',
                          'connector.partition' = '0',
                          'connector.sasl.jaas.config'=
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://**EVEN_HUB_NAME**.servicebus.windows.net/;SharedAccessKeyName=**KEY_
NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
                          'connector.sasl.mechanism' = 'PLAIN',
                          'connector.security.protocol' = 'SASL_SSL',
                          'connector.properties.bootstrap.servers' =
'**EVEN_HUB_NAME**.servicebus.windows.net:9093',
                          'connector.properties.group.id' = '$Default',
                          'connector.startup-mode' = 'latest-offset',
                          'format.type' = 'json')
                        """

 Any tips on how to debug this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

joris.vanagtmaal
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in <module>
    anomalies()
  File "streaming-dms.py", line 142, in anomalies
    t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
    j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
         at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
         at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
         at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
         at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
         at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
         at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
         at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
         at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
         at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
         at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
         at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
         at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
         at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
         at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
         at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
         at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
         at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
         at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Yun Gao
Hi,

Have you also include the kakfa-connector related jar in the classpath?

Best,
Yun
------------------Original Mail ------------------
Sender:joris.vanagtmaal <[hidden email]>
Send Date:Tue Feb 9 03:16:52 2021
Recipients:User-Flink <[hidden email]>
Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in <module>
    anomalies()
  File "streaming-dms.py", line 142, in anomalies
    t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
    j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

joris.vanagtmaal
My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Yun Gao

Hi,

Could you have a try to add the jar via python configuration explicitly? It might refer to [1].

Best,
 Yun

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program 
------------------------------------------------------------------
Sender:joris.vanagtmaal<[hidden email]>
Date:2021/02/09 15:50:27
Recipient:<[hidden email]>
Theme:Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

joris.vanagtmaal
hi Yun,

thanks for the help!

if i direct the Kafka connector in the DDL to a local Kafka cluster, it
works fine. So i assume access to the JAR files should not be the issue.

This is how i referred to the JAR files from Python:
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

All the best,
Joris



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Arvid Heise-4
Hi Joris,

Are you sure that all nodes have access to these jars? Usually, shared resources reside on some kind of distributed file system or network directory.

I'm pulling in Dian who can probably help better.

Best,

Arvid

On Tue, Feb 9, 2021 at 1:50 PM joris.vanagtmaal <[hidden email]> wrote:
hi Yun,

thanks for the help!

if i direct the Kafka connector in the DDL to a local Kafka cluster, it
works fine. So i assume access to the JAR files should not be the issue.

This is how i referred to the JAR files from Python:
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

All the best,
Joris



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

joris.vanagtmaal
Hi Arvid,

I'm currently running PyFlink locally in the JVM with a parallelism of 1,
and the same file works fine if i direct it to a Kafka cluster (running in a
local docker instance).

I assumed that the JAR pipeline definition in the python file would make
sure they are made available on the cluster (or in this case the local JVM),
right?

But if i need to verify the JVM has access to the JAR files, i need a bit of
guidance how to do that.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/