Hi,
I am trying to connect to kafka through flink, but having some difficulty getting the right table-factory-source. I currently get the error: NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. my sbt file looks like this: name := "writeToSQL" From the documentation https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablefactory I can see what is missing, but I do not know how to solve it. The documentation says the following: Define a TableFactoryA Factories leverage Java’s Service Provider Interfaces (SPI) for discovering. This means that every dependency and JAR file should contain a file But how do I do that? I thought the sbt-file would take care of this. Any help is highly appreciated! Martin Frank Hansen |
Hi, Could you share the SQL DDL and the full exception message? It might be you are using the wrong `connector.version` or other options. Best, Jark On Fri, 15 May 2020 at 20:14, Martin Frank Hansen <[hidden email]> wrote:
|
Hi Jark Wu, Thanks for your answer, Here is what I have so far import java.util.Properties Here is my error message: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at FlinkToSQL$.delayedEndpoint$FlinkToSQL$1(FlinkToSQL.scala:65) at FlinkToSQL$delayedInit$body.apply(FlinkToSQL.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at FlinkToSQL$.main(FlinkToSQL.scala:15) at FlinkToSQL.main(FlinkToSQL.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) ... 12 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=security.protocol connector.properties.0.value=SSL connector.properties.1.key=key.deserializer connector.properties.1.value=org.apache.kafka.common.serialization.ByteArrayDeserializer connector.properties.2.key=value.deserializer connector.properties.2.value=org.apache.kafka.common.serialization.ByteArrayDeserializer connector.properties.3.key=zookeeper.connect connector.properties.3.value=z-1.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181,z-2.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181,z-3.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:2181 connector.properties.4.key=ssl.endpoint.identification.algorithm connector.properties.4.value= connector.properties.5.key=group.id connector.properties.5.value=very_small_test connector.properties.6.key=bootstrap.servers connector.properties.6.value=b-2.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094,b-1.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094,b-3.realtime-tracking-poc.3ubjky.c3.kafka.eu-west-1.amazonaws.com:9094 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=very_small_test connector.type=kafka connector.version=universal format.derive-schema=true format.fail-on-missing-field=false format.property-version=1 format.type=json schema.0.name=fullVisitorId schema.0.type=VARCHAR schema.1.name=eventID schema.1.type=VARCHAR schema.2.name=CustomDimensions schema.2.type=VARCHAR schema.3.name=page schema.3.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) ... 30 more Command exiting with ret '1' Den lør. 16. maj 2020 kl. 04.57 skrev Jark Wu <[hidden email]>:
Martin Frank Hansen Data Engineer Pilestræde 34 | DK-1147 København K | T: +45 33 75 75 75 | berlingskemedia.dk |
Hi,
Look likes you missed two required parameters: version and topic[1], you need to add them for both source table and sink table.
Best, Leonard Xu [1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
|
More precisely: Should the sink table `sql-sink` missed required version option.
|
Hi Leonard, Thank you so much! It worked, I did get a new error but it is unrelated to this question. Den man. 18. maj 2020 kl. 15.21 skrev Leonard Xu <[hidden email]>:
Best regards |
Free forum by Nabble | Edit this page |