Print table content in Flink 1.11

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Print table content in Flink 1.11

Flavio Pompermaier
Hi to all,
I'm trying to read and print out the content of my parquet directory with Flink 1.11 (using the bridge API). However Flink complains that there is no topology to execute..what am I doing wrong? The exception is:

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at it.okkam.datalinks.batch.flink.ProfileTest.main(ProfileTest.java:52)

This is the code: ------------------------

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
        EnvironmentSettings.newInstance().inStreamingMode().build());
tableEnv.executeSql(-----see below  [1] ----);
Table inputTable = tableEnv.sqlQuery("SELECT * FROM source");
tableEnv.toAppendStream(inputTable,
   new RowTypeInfo(inputTable.getSchema().getFieldTypes());).print()
final JobExecutionResult jobRes = tableEnv.execute("test-job");

[1] ----------
CREATE TABLE `source` (
`col1` BIGINT,
`col2` STRING
) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'update-mode' = 'append',
'path' = '/tmp/parquet-test',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file',
'format.parquet.compression'='snappy',
'format.parquet.enable.dictionary'='true',
'format.parquet.block.size'='0',
'sink.shuffle-by-partition.enable' = 'true'
)
-----------

Thanks in advance,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Print table content in Flink 1.11

Leonard Xu
Hi, Flavio


在 2020年7月16日,00:19,Flavio Pompermaier <[hidden email]> 写道:

final JobExecutionResult jobRes = tableEnv.execute("test-job");


In Flink 1.11, once a Table has transformed to DataStream, only StreamExecutionEnvironment can execute the DataStream program, please use env.execute(“test-job”) in this case, you can get mote information from [1].


Best,
Leonard Xu

Reply | Threaded
Open this post in threaded view
|

Re: Print table content in Flink 1.11

Kurt Young
Hi Flavio,

In 1.11 we have provided an easier way to print table content, after you got the `table` object,
all you need to to is calling `table.execute().print();`

Best,
Kurt


On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu <[hidden email]> wrote:
Hi, Flavio


在 2020年7月16日,00:19,Flavio Pompermaier <[hidden email]> 写道:

final JobExecutionResult jobRes = tableEnv.execute("test-job");


In Flink 1.11, once a Table has transformed to DataStream, only StreamExecutionEnvironment can execute the DataStream program, please use env.execute(“test-job”) in this case, you can get mote information from [1].


Best,
Leonard Xu

Reply | Threaded
Open this post in threaded view
|

Re: Print table content in Flink 1.11

Jingsong Li
Hi Flavio,

For print:
- As Kurt said, you can use `table.execute().print();`, records will be collected to the client (NOTE it is client) and print to client console.
- But if you want print records in runtime tasks like DataStream.print, you can use [1]


Best,
Jingsong

On Thu, Jul 16, 2020 at 10:18 AM Kurt Young <[hidden email]> wrote:
Hi Flavio,

In 1.11 we have provided an easier way to print table content, after you got the `table` object,
all you need to to is calling `table.execute().print();`

Best,
Kurt


On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu <[hidden email]> wrote:
Hi, Flavio


在 2020年7月16日,00:19,Flavio Pompermaier <[hidden email]> 写道:

final JobExecutionResult jobRes = tableEnv.execute("test-job");


In Flink 1.11, once a Table has transformed to DataStream, only StreamExecutionEnvironment can execute the DataStream program, please use env.execute(“test-job”) in this case, you can get mote information from [1].


Best,
Leonard Xu



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Print table content in Flink 1.11

Flavio Pompermaier
Thanks for the suggestions Kurt and Jingsong! Very helpful

On Thu, Jul 16, 2020 at 4:30 AM Jingsong Li <[hidden email]> wrote:
Hi Flavio,

For print:
- As Kurt said, you can use `table.execute().print();`, records will be collected to the client (NOTE it is client) and print to client console.
- But if you want print records in runtime tasks like DataStream.print, you can use [1]


Best,
Jingsong

On Thu, Jul 16, 2020 at 10:18 AM Kurt Young <[hidden email]> wrote:
Hi Flavio,

In 1.11 we have provided an easier way to print table content, after you got the `table` object,
all you need to to is calling `table.execute().print();`

Best,
Kurt


On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu <[hidden email]> wrote:
Hi, Flavio


在 2020年7月16日,00:19,Flavio Pompermaier <[hidden email]> 写道:

final JobExecutionResult jobRes = tableEnv.execute("test-job");


In Flink 1.11, once a Table has transformed to DataStream, only StreamExecutionEnvironment can execute the DataStream program, please use env.execute(“test-job”) in this case, you can get mote information from [1].


Best,
Leonard Xu



--
Best, Jingsong Lee