Hi to all,
I'm trying to read and print out the content of my parquet directory with Flink 1.11 (using the bridge API). However Flink complains that there is no topology to execute..what am I doing wrong? The exception is: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at it.okkam.datalinks.batch.flink.ProfileTest.main(ProfileTest.java:52) This is the code: ------------------------ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().build()); tableEnv.executeSql(-----see below [1] ----); Table inputTable = tableEnv.sqlQuery("SELECT * FROM source"); tableEnv.toAppendStream(inputTable, new RowTypeInfo(inputTable.getSchema().getFieldTypes());).print() final JobExecutionResult jobRes = tableEnv.execute("test-job"); [1] ---------- CREATE TABLE `source` ( `col1` BIGINT, `col2` STRING ) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'update-mode' = 'append', 'path' = '/tmp/parquet-test', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='success-file', 'format.parquet.compression'='snappy', 'format.parquet.enable.dictionary'='true', 'format.parquet.block.size'='0', 'sink.shuffle-by-partition.enable' = 'true' ) ----------- Thanks in advance, Flavio |
Hi, Flavio
In Flink 1.11, once a Table has transformed to DataStream, only StreamExecutionEnvironment can execute the DataStream program, please use env.execute(“test-job”) in this case, you can get mote information from [1]. Best, Leonard Xu |
Hi Flavio, In 1.11 we have provided an easier way to print table content, after you got the `table` object, all you need to to is calling `table.execute().print();` Best, Kurt On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu <[hidden email]> wrote:
|
Hi Flavio, For print: - As Kurt said, you can use `table.execute().print();`, records will be collected to the client (NOTE it is client) and print to client console. - But if you want print records in runtime tasks like DataStream.print, you can use [1] Best, Jingsong On Thu, Jul 16, 2020 at 10:18 AM Kurt Young <[hidden email]> wrote:
Best, Jingsong Lee |
Thanks for the suggestions Kurt and Jingsong! Very helpful On Thu, Jul 16, 2020 at 4:30 AM Jingsong Li <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |