[table-walkthrough exception] Unable to create a source for reading table...

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[table-walkthrough exception] Unable to create a source for reading table...

Lingfeng Pu
Hi,

I'm following the tutorial to run the "flink-playground/table-walkthrough" project on IDEA. However, I got the exception as follows:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.

The key localhost environment info shows below:
1. OS: Fedora 34; 2. Flink version: 1.13.1;
3. Java version: 1.8; 4. Maven version: 3.6.3;
5. Docker version: 20.10.7 (API version: 1.41).

The entire error report shows below:
/usr/java/jdk1.8.0_291-amd64/bin/java -javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=46805:/var/lib/snapd/snap/intellij-idea-community/302/bin -Dfile.encoding=UTF-8 -classpath /usr/java/jdk1.8.0_291-amd64/jre/lib/charsets.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/deploy.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/cldrdata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/dnsns.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jaccess.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jfxrt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/localedata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/nashorn.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunec.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunjce_provider.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunpkcs11.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/zipfs.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/javaws.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jce.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfr.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfxswt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jsse.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/management-agent.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/plugin.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/resources.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/rt.jar:/home/AkatsukiG5/IdeaProjects/flink-playgrounds-master/table-walkthrough/target/classes:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java/1.12.1/flink-table-api-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-common/1.12.1/flink-table-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-files/1.12.1/flink-connector-files-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-base/1.12.1/flink-connector-base-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-asm-7/7.1-12.0/flink-shaded-asm-7-7.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-api/1.7.15/slf4j-api-1.7.15.jar:/home/AkatsukiG5/Documents/mavenRepo/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/force-shading/1.12.1/force-shading-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java-bridge_2.11/1.12.1/flink-table-api-java-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-java/1.12.1/flink-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-java_2.11/1.12.1/flink-streaming-java_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-file-sink-common/1.12.1/flink-file-sink-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-guava/18.0-12.0/flink-shaded-guava-18.0-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-scala_2.11/1.12.1/flink-streaming-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-scala_2.11/1.12.1/flink-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-reflect/2.11.12/scala-reflect-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-compiler/2.11.12/scala-compiler-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-planner-blink_2.11/1.12.1/flink-table-planner-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala_2.11/1.12.1/flink-table-api-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala-bridge_2.11/1.12.1/flink-table-api-scala-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-runtime-blink_2.11/1.12.1/flink-table-runtime-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/commons-compiler/3.0.11/commons-compiler-3.0.11.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/calcite/avatica/avatica-core/1.17.0/avatica-core-1.17.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/reflections/reflections/0.9.10/reflections-0.9.10.jar:/home/AkatsukiG5/Documents/mavenRepo/org/javassist/javassist/3.19.0-GA/javassist-3.19.0-GA.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-clients_2.11/1.12.1/flink-clients_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-core/1.12.1/flink-core-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-annotations/1.12.1/flink-annotations-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-metrics-core/1.12.1/flink-metrics-core-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-runtime_2.11/1.12.1/flink-runtime_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-queryable-state-client-java/1.12.1/flink-queryable-state-client-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-hadoop-fs/1.12.1/flink-hadoop-fs-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-io/commons-io/2.7/commons-io-2.7.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-netty/4.1.49.Final-12.0/flink-shaded-netty-4.1.49.Final-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-jackson/2.10.1-12.0/flink-shaded-jackson-2.10.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-zookeeper-3/3.4.14-12.0/flink-shaded-zookeeper-3-3.4.14-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-actor_2.11/2.5.21/akka-actor_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/config/1.3.3/config-1.3.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-stream_2.11/2.5.21/akka-stream_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/ssl-config-core_2.11/0.3.7/ssl-config-core_2.11-0.3.7.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-protobuf_2.11/2.5.21/akka-protobuf_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-slf4j_2.11/2.5.21/akka-slf4j_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/org/clapper/grizzled-slf4j_2.11/1.3.2/grizzled-slf4j_2.11-1.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar:/home/AkatsukiG5/Documents/mavenRepo/com/twitter/chill_2.11/0.7.6/chill_2.11-0.7.6.jar:/home/AkatsukiG5/Documents/mavenRepo/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/home/AkatsukiG5/Documents/mavenRepo/org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-optimizer_2.11/1.12.1/flink-optimizer_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar:/home/AkatsukiG5/Documents/mavenRepo/junit/junit/4.12/junit-4.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/hamcrest/hamcrest-all/1.3/hamcrest-all-1.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/home/AkatsukiG5/Documents/mavenRepo/log4j/log4j/1.2.17/log4j-1.2.17.jar org.apache.flink.playgrounds.spendreport.SpendReport
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.

Table options are:

'connector'='kafka'
'format'='csv'
'properties.bootstrap.servers'='kafka:9092'
'topic'='transactions'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:254)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:346)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:74)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:52)
at org.apache.flink.table.operations.AggregateQueryOperation.accept(AggregateQueryOperation.java:82)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:74)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:217)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)
at org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:84)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:367)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:354)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 48 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:235)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:363)
... 50 more

Process finished with exit code 1

I've also attached the pom.xml of this project to this mail. Could anyone helps me figure out what happens? I'll appreciate that.


pom.xml (15K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [table-walkthrough exception] Unable to create a source for reading table...

Arvid Heise-4
Hi Lingfeng,

could you try
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>test</scope>-->
        </dependency>

to your pom?

On Wed, Jun 9, 2021 at 5:04 AM Lingfeng Pu <[hidden email]> wrote:
Hi,

I'm following the tutorial to run the "flink-playground/table-walkthrough" project on IDEA. However, I got the exception as follows:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.

The key localhost environment info shows below:
1. OS: Fedora 34; 2. Flink version: 1.13.1;
3. Java version: 1.8; 4. Maven version: 3.6.3;
5. Docker version: 20.10.7 (API version: 1.41).

The entire error report shows below:
/usr/java/jdk1.8.0_291-amd64/bin/java -javaagent:/var/lib/snapd/snap/intellij-idea-community/302/lib/idea_rt.jar=46805:/var/lib/snapd/snap/intellij-idea-community/302/bin -Dfile.encoding=UTF-8 -classpath /usr/java/jdk1.8.0_291-amd64/jre/lib/charsets.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/deploy.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/cldrdata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/dnsns.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jaccess.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/jfxrt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/localedata.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/nashorn.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunec.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunjce_provider.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/sunpkcs11.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/ext/zipfs.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/javaws.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jce.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfr.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jfxswt.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/jsse.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/management-agent.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/plugin.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/resources.jar:/usr/java/jdk1.8.0_291-amd64/jre/lib/rt.jar:/home/AkatsukiG5/IdeaProjects/flink-playgrounds-master/table-walkthrough/target/classes:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java/1.12.1/flink-table-api-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-common/1.12.1/flink-table-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-files/1.12.1/flink-connector-files-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-connector-base/1.12.1/flink-connector-base-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-asm-7/7.1-12.0/flink-shaded-asm-7-7.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-api/1.7.15/slf4j-api-1.7.15.jar:/home/AkatsukiG5/Documents/mavenRepo/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/force-shading/1.12.1/force-shading-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-java-bridge_2.11/1.12.1/flink-table-api-java-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-java/1.12.1/flink-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-java_2.11/1.12.1/flink-streaming-java_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-file-sink-common/1.12.1/flink-file-sink-common-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-guava/18.0-12.0/flink-shaded-guava-18.0-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-streaming-scala_2.11/1.12.1/flink-streaming-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-scala_2.11/1.12.1/flink-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-reflect/2.11.12/scala-reflect-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/scala-compiler/2.11.12/scala-compiler-2.11.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-planner-blink_2.11/1.12.1/flink-table-planner-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala_2.11/1.12.1/flink-table-api-scala_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-api-scala-bridge_2.11/1.12.1/flink-table-api-scala-bridge_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-table-runtime-blink_2.11/1.12.1/flink-table-runtime-blink_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/janino/3.0.11/janino-3.0.11.jar:/home/AkatsukiG5/Documents/mavenRepo/org/codehaus/janino/commons-compiler/3.0.11/commons-compiler-3.0.11.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/calcite/avatica/avatica-core/1.17.0/avatica-core-1.17.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/reflections/reflections/0.9.10/reflections-0.9.10.jar:/home/AkatsukiG5/Documents/mavenRepo/org/javassist/javassist/3.19.0-GA/javassist-3.19.0-GA.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-clients_2.11/1.12.1/flink-clients_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-core/1.12.1/flink-core-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-annotations/1.12.1/flink-annotations-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-metrics-core/1.12.1/flink-metrics-core-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-runtime_2.11/1.12.1/flink-runtime_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-queryable-state-client-java/1.12.1/flink-queryable-state-client-java-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-hadoop-fs/1.12.1/flink-hadoop-fs-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-io/commons-io/2.7/commons-io-2.7.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-netty/4.1.49.Final-12.0/flink-shaded-netty-4.1.49.Final-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-jackson/2.10.1-12.0/flink-shaded-jackson-2.10.1-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-shaded-zookeeper-3/3.4.14-12.0/flink-shaded-zookeeper-3-3.4.14-12.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-actor_2.11/2.5.21/akka-actor_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/config/1.3.3/config-1.3.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-stream_2.11/2.5.21/akka-stream_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/ssl-config-core_2.11/0.3.7/ssl-config-core_2.11-0.3.7.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-protobuf_2.11/2.5.21/akka-protobuf_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/com/typesafe/akka/akka-slf4j_2.11/2.5.21/akka-slf4j_2.11-2.5.21.jar:/home/AkatsukiG5/Documents/mavenRepo/org/clapper/grizzled-slf4j_2.11/1.3.2/grizzled-slf4j_2.11-1.3.2.jar:/home/AkatsukiG5/Documents/mavenRepo/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar:/home/AkatsukiG5/Documents/mavenRepo/com/twitter/chill_2.11/0.7.6/chill_2.11-0.7.6.jar:/home/AkatsukiG5/Documents/mavenRepo/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar:/home/AkatsukiG5/Documents/mavenRepo/org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar:/home/AkatsukiG5/Documents/mavenRepo/org/apache/flink/flink-optimizer_2.11/1.12.1/flink-optimizer_2.11-1.12.1.jar:/home/AkatsukiG5/Documents/mavenRepo/commons-cli/commons-cli/1.3.1/commons-cli-1.3.1.jar:/home/AkatsukiG5/Documents/mavenRepo/junit/junit/4.12/junit-4.12.jar:/home/AkatsukiG5/Documents/mavenRepo/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/hamcrest/hamcrest-all/1.3/hamcrest-all-1.3.jar:/home/AkatsukiG5/Documents/mavenRepo/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/home/AkatsukiG5/Documents/mavenRepo/log4j/log4j/1.2.17/log4j-1.2.17.jar org.apache.flink.playgrounds.spendreport.SpendReport
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.

Table options are:

'connector'='kafka'
'format'='csv'
'properties.bootstrap.servers'='kafka:9092'
'topic'='transactions'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:254)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:346)
at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:74)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:52)
at org.apache.flink.table.operations.AggregateQueryOperation.accept(AggregateQueryOperation.java:82)
at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:74)
at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:217)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:554)
at org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:84)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:367)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:354)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
... 48 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:235)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:363)
... 50 more

Process finished with exit code 1

I've also attached the pom.xml of this project to this mail. Could anyone helps me figure out what happens? I'll appreciate that.