Hi to all,
I was trying to update my legacy code to Flink 1.11. Before I was using a BatchTableEnv and now I've tried to use the following: EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); Unfortunately in the StreamTableEnvironmentImpl code there's : if (!settings.isStreamingMode()) { throw new TableException( "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); } What should I do here? Thanks in advance, Flavio |
You should be good with using the TableEnvironment. The
StreamTableEnvironment is needed only if you want to convert to DataStream. We do not support converting batch Table programs to DataStream yet. A following code should work: EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment.create(settings); Best, Dawid On 10/07/2020 11:48, Flavio Pompermaier wrote: > Hi to all, > I was trying to update my legacy code to Flink 1.11. Before I was > using a BatchTableEnv and now I've tried to use the following: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > if (!settings.isStreamingMode()) { > throw new TableException( > "StreamTableEnvironment can not run in batch mode for now, please use > TableEnvironment."); > } > > What should I do here? > > Thanks in advance, > Flavio signature.asc (849 bytes) Download Attachment |
Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput. Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); MyInputFormat myInputformat = new MyInputFormat(dsFields, ft).finish(); DataSet<Row> rows = env.createInput(myInputformat); Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); btEnv.registerTableSink("out", dsFields, ft, outSink); btEnv.insertInto(table, "out", btEnv.queryConfig()); env.execute(); } On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <[hidden email]> wrote: You should be good with using the TableEnvironment. The Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you can migrate them to TableSource instead (InputFormat can be reused), but TableSource will be deprecated later. you can try new table source[1] Best, Godfrey Flavio Pompermaier <[hidden email]> 于2020年7月10日周五 下午8:54写道:
|
How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface On Fri, Jul 10, 2020 at 3:38 PM godfrey he <[hidden email]> wrote:
|
Is it correct to do something like this? TableSource<Row> myTableSource = new BatchTableSource<Row>() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { return execEnv.createInput(myInputformat); } }; On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <[hidden email]> wrote:
|
hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true. Best, Godfrey Flavio Pompermaier <[hidden email]> 于2020年7月10日周五 下午10:32写道:
|
Now I'm able to run my code but there's something I don't understand: what is the difference between the following two? //common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInputformat.getFieldTypes(), outSink);
The second option fails with the following exception: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query expression encountered in illegal context at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) Best, Flavio On Sun, Jul 12, 2020 at 5:04 PM godfrey he <[hidden email]> wrote:
|
A typo of "INSERTO"? Try this? tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); Best, Jark On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <[hidden email]> wrote:
|
You're right Jark..sorry I didn't see the typo. The backticks are also mandatory. Maybe the exception message could be more meaningful and specify the token that caused the error instead of a general "SQL parse failed. Non-query expression encountered in illegal context". Thanks a lot for the support, Flavio On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <[hidden email]> wrote:
|
I agree with you [hidden email] , the exception message definitely should be improved. We created a similar issue a long time before https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be complicated. Best, Jark On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <[hidden email]> wrote:
|
Ok..just one last thing: to use my TableSource I use the deprecated API registerTableSource: tableEnv.registerTableSource("MySourceDataset", tableSource); The javadoc says to use executeSql but this requires some extra steps (that are not mentioned in the documentation). Do I have to create a TableFactory, right? How do I register it? Is there an example somewhere? On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <[hidden email]> wrote:
|
Hi Flavio, tableEnv.registerTableSource is deprecated in order to migrate to use DDL and the new connector interface (i.e. FLIP-95 [1]). You may need to implement a `ScanTableSource` that uses `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. Best, Jark On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <[hidden email]> wrote:
|
And what about the env.registerTypeWithKryoSerializer? Now to create the table environment I don't use the ExecutionEnvironment anymore..how can I register those serializers? For example I used to run env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); Best, Flavio On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <[hidden email]> wrote:
|
You can set string-based configuration on `tEnv.getConfig.getConfiguration.setString(..)` to replace them. Maybe you can try pipeline.default-kryo-serializers [1]. Best, On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <[hidden email]> wrote:
|
Thanks, that was definitely helpful! On Mon, Jul 13, 2020 at 4:39 PM Jark Wu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |