Hi All, Do I need to use DataStream API or Table API to construct sources? I am just trying to read from Kafka and print it to console. And yes I tried it with datastreams and it works fine but I want to do it using Table related APIs. I don't see any documentation or a sample on how to create Kafka table source or any other source using Table Source API's so after some digging I wrote the following code. My ultimate goal is to avoid Datastream API as much as possible and just use Table API & SQL but somehow I feel the Flink framework focuses on DataStream than the SQL interface. am I wrong? From the user perspective wouldn't it make more sense to focus on SQL interfaces for both streaming and batch? import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; I get the following error The program finished with the following exception: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511) org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165) org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.Iterator$class.foreach(Iterator.scala:891) scala.collection.AbstractIterator.foreach(Iterator.scala:1334) scala.collection.IterableLike$class.foreach(IterableLike.scala:72) scala.collection.AbstractIterable.foreach(Iterable.scala:54) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) scala.collection.AbstractTraversable.map(Traversable.scala:104) org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) Test.main(Test.java:40) The error seems to be on the line tableEnvironment.toAppendStream(resultTable, Row.class).print();and I am not sure why it is not able to serialize? Thanks! |
Also why do I need to convert to DataStream to print the rows of a table? Why not have a print method in the Table itself? On Sat, Feb 29, 2020 at 3:40 AM kant kodali <[hidden email]> wrote:
|
Hi,
You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not part of any public API. You don’t have to convert DataStream into Table to read from Kafka in Table API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer as it’s documented here [1]. But you should be able to use Kafka Table connector directly, as it is described in the docs [2][3]. Piotrek [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
|
Hi, Thanks for the pointer. Looks like the documentation says to use tableEnv.registerTableSink however in my IDE it shows the method is deprecated in Flink 1.10. so I am still not seeing a way to add a sink that can print to stdout? what sink should I use to print to stdout and how do I add it without converting into DataStream? Thanks! On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski <[hidden email]> wrote:
|
Hi kant, AFAIK, there is no "print to stdout" sink for Table API now, you can implement one custom sink following this doc[1]. IMO, an out-of-box print table sink is very useful, and I've created an issue[2] to track this. kant kodali <[hidden email]> 于2020年3月1日周日 上午2:30写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi Benchao, Agreed a ConsoleSink is very useful but that is not the only problem here. Documentation says use tableEnv.registerTableSink all over the place https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink however that function is deprecated. so how do I add any other Sink? Thanks! On Sat, Feb 29, 2020 at 6:05 PM Benchao Li <[hidden email]> wrote:
|
Here is my updated code after digging through the source code (not sure if it is correct ). It sill doesnt work because it says for CSV the connector.type should be filesystem not Kafka but documentation says it is supported. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; On Sat, Feb 29, 2020 at 7:48 PM kant kodali <[hidden email]> wrote:
|
Hi,
> Thanks for the pointer. Looks like the documentation says to use tableEnv.registerTableSink however in my IDE it shows the method is deprecated in Flink 1.10. It looks like not all of the documentation was updated after methods were deprecated. However if you look at the java docs of the `registerTableSink` method, you can find an answer [1]. > It sill doesnt work because it says for CSV the connector.type should be filesystem not Kafka. Can you post the full stack trace? As I’m not familiar with the Table API, maybe you Timo or Dawid know what’s going on here? Piotrek
|
------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) at Test.main(Test.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 8 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.property-version=1 connector.topic=test-topic1 connector.type=kafka connector.version=0.11 format.property-version=1 format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=f0 update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) ... 34 more On Sat, Feb 29, 2020 at 11:35 PM Piotr Nowojski <[hidden email]> wrote:
|
Hi kant, CSV format is an independent module, you need to add it as your dependency. <dependency> kant kodali <[hidden email]> 于2020年3月1日周日 下午3:43写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
hi kant, > Also why do I need to convert to DataStream to print the rows of a table? Why not have a print method in the Table itself? Flink 1.10 introduces a utility class named TableUtils to convert a Table to List<Row>, this utility class is mainly used for demonstration or testing and is only applicable for small batch jobs and small finite append only stream jobs. code like: Table table = tEnv.sqlQuery("select ..."); List<Row> result = TableUtils.collectToList(table); result..... currently, we are planner to implement Table#collect[1], after that Table#head and Table#print may be also introduced soon. > The program finished with the following exception: please make sure that the kafka version in Test class and the kafka version in pom dependency are same. I tested your code successfully. Bests, Godfrey Benchao Li <[hidden email]> 于2020年3月1日周日 下午4:44写道:
|
The dependency was already there. Below is my build.gradle. Also I checked the kafka version and looks like the jar flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"downloads kafka-clients version 2.2.0. So I changed my code to version 2.2.0 and same problem persists. buildscript { On Sun, Mar 1, 2020 at 12:50 AM godfrey he <[hidden email]> wrote:
|
I think you should use `flink-sql-connector-kafka-0.11_2.11` instead of `flink-connector-kafka_2.11`. Bests, Godfrey kant kodali <[hidden email]> 于2020年3月1日周日 下午5:15写道:
|
I don't know how gradle works, but in Maven, packaging dependencies into one fat jar needs to specify how SPI property files should be dealt with, like <transformers> Could you check that your final jar contains correct resource file? godfrey he <[hidden email]> 于2020年3月1日周日 下午5:25写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi Benchao, That worked! Pasting the build.gradle file here. However this only works for 0.11 and it needs zookeeper.connect() which shouldn't be required. not sure why it is required in Flink Kafka connector? If I change the version to 2.2 in the code and specify this jar flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" or flinkShadowJar "org.apache.flink:flink-sql-connector-kafka-0.11_2.11:${flinkVersion}" //Not sure if I should use this one for Kafka >= 0.11 It doesn't work either. buildscript { On Sun, Mar 1, 2020 at 1:38 AM Benchao Li <[hidden email]> wrote:
|
Hi Kant, If you want to use the universal kafka connector you use
"universal" for the version. The community decided to no longer
distinguish different kafka connector versions, but to use the
newest kafka client version for all versions of kafka 1.0+. So if
you want to use the connector from flink-sql-connector-kafka_2.11
use "universal" for the version. As for the collect/print sink. We do realize importance of the sink and there were a few approaches to implement one. Including the TableUtils mentioned by godfrey. It does not have strong consistency guarantees and is recommended rather only for experiments/testing. There is also an ongoing discussion how to implement such a sink for both batch and streaming here: https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17046455#comment-17046455 Best, Dawid On 01/03/2020 12:00, kant kodali wrote:
signature.asc (849 bytes) Download Attachment |
* What went wrong: Could not determine the dependencies of task ':shadowJar'. > Could not resolve all dependencies for configuration ':flinkShadowJar'. > Could not find org.apache.flink:flink-sql-connector-kafka_2.11:universal. Searched in the following locations: - https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom - https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar - https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.pom - https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-sql-connector-kafka_2.11/universal/flink-sql-connector-kafka_2.11-universal.jar Required by: project : On Sun, Mar 1, 2020 at 6:43 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Hi Kant, I think Dawid meant to not add the Kafka version number like this: flinkShadowJar "org.apache.flink:flink-sql-connector-kafka_2.11:${flinkVersion}" On Sun, Mar 1, 2020 at 7:31 PM kant kodali <[hidden email]> wrote:
|
Hi Arvid, Yes I got it..and it works as said in my previous email. Thanks! On Mon, Mar 2, 2020 at 12:10 AM Arvid Heise <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |