http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-1-11-Simple-pipeline-data-stream-table-with-egg-data-stream-failed-tp36886p36957.html
planner is not build on top of DataStream API. It uses features of lower
levels (StreamOperator, Transformation). In the mid-term we want to
> Hi Timo,
> Thank you for response.
>
> Well, it was working.
> We have a number of pipelines in production which reuse DataStream and Table API parts on Flink 1.10, both for stream and batch.
> The same that simple case without aggregation would work in Flink 1.11
>
> But let`s assume there are some incompatible changes and such approach would not work anymore.
>
> In case of TableEnvironment there is no way to create/retract stream.
> I would assume that it is possible to wrapped stream in bounded StreamTableSource/ StreamTableSink
> and use deprecated TableEnvironment methods to register them, but I`m wonder if there is a better way to do it.
>
> It sounds a quite strange that with having Blink planner which optimise DataStream pipelines for stream and batch jobs,
> there is necessity to write the same things on DataStream and DataSet API.
>
>
> On 24/07/2020, 15:36, "Timo Walther" <
[hidden email]> wrote:
>
> Hi Dmytro,
>
> `StreamTableEnvironment` does not support batch mode currently. Only
> `TableEnvironment` supports the unified story. I saw that you disabled
> the check in the `create()` method. This check exists for a reason.
>
> For batch execution, the planner sets specific properties on the stream
> graph that the StreamExecutionEnvironment cannot handle (e.g. blocking
> inputs). My guess would be that this is the reason for your exception.
>
> Have you tried to use the unified `TableEnvironment`?
>
> Regards,
> Timo
>
>
>
>
> On 23.07.20 15:14, Dmytro Dragan wrote:
> > Hi All,
> >
> > We are working on migration existing pipelines from Flink 1.10 to Flink
> > 1.11.
> >
> > We are using Blink planner and have unified pipelines which can be used
> > in stream and batch mode.
> >
> > Stream pipelines works as expected, but batch once fail on Flink 1.11 if
> > they have any table aggregation transformation.
> >
> > Simple example of failed pipeline:
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment./getExecutionEnvironment/();
> > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
> >
> > TableConfig tableConfig = new TableConfig();
> > tableConfig.setIdleStateRetentionTime(
> > org.apache.flink.api.common.time.Time./minutes/(10),
> > org.apache.flink.api.common.time.Time./minutes/(30)
> > );
> > EnvironmentSettings settings =
> > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
> >
> > // is created using work around with ignoring settings.isStreamingMode()
> > check
> > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
> >
> > DataStreamSource<A> streamSource = env.fromCollection(/asList/(new
> > A("1"), new A("2")));
> >
> > Table table = tEnv.fromDataStream(streamSource);
> > tEnv.createTemporaryView("A", table);
> >
> > String sql = "select s from A group by s";
> >
> > tEnv
> > .toRetractStream(tEnv.sqlQuery(sql), Row.class)
> > .flatMap(new RetractFlatMap())
> > .map(Row::toString)
> > .addSink(new TestSinkFunction<>());
> >
> > env.execute("");
> >
> > /values/.forEach(System./out/::println);
> >
> > Exception:
> >
> > Caused by: java.lang.IllegalStateException: Trying to consume an input
> > partition whose producer is not ready (result type: BLOCKING, partition
> > consumable: false, producer state: DEPLOYING, partition id:
> > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
> >
> > at
> > org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
> >
> > …
> >
> > Adding StreamTableEnvironment execute does not help.
> >
> > Could you please advise what I`m missing?
> >
>
>
>
>