Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed
Posted by
Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-1-11-Simple-pipeline-data-stream-table-with-egg-data-stream-failed-tp36886p36914.html
Hi Dmytro,
`StreamTableEnvironment` does not support batch mode currently. Only
`TableEnvironment` supports the unified story. I saw that you disabled
the check in the `create()` method. This check exists for a reason.
For batch execution, the planner sets specific properties on the stream
graph that the StreamExecutionEnvironment cannot handle (e.g. blocking
inputs). My guess would be that this is the reason for your exception.
Have you tried to use the unified `TableEnvironment`?
Regards,
Timo
On 23.07.20 15:14, Dmytro Dragan wrote:
> Hi All,
>
> We are working on migration existing pipelines from Flink 1.10 to Flink
> 1.11.
>
> We are using Blink planner and have unified pipelines which can be used
> in stream and batch mode.
>
> Stream pipelines works as expected, but batch once fail on Flink 1.11 if
> they have any table aggregation transformation.
>
> Simple example of failed pipeline:
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment./getExecutionEnvironment/();
> env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
>
> TableConfig tableConfig = new TableConfig();
> tableConfig.setIdleStateRetentionTime(
> org.apache.flink.api.common.time.Time./minutes/(10),
> org.apache.flink.api.common.time.Time./minutes/(30)
> );
> EnvironmentSettings settings =
> EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
>
> // is created using work around with ignoring settings.isStreamingMode()
> check
> StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
>
> DataStreamSource<A> streamSource = env.fromCollection(/asList/(new
> A("1"), new A("2")));
>
> Table table = tEnv.fromDataStream(streamSource);
> tEnv.createTemporaryView("A", table);
>
> String sql = "select s from A group by s";
>
> tEnv
> .toRetractStream(tEnv.sqlQuery(sql), Row.class)
> .flatMap(new RetractFlatMap())
> .map(Row::toString)
> .addSink(new TestSinkFunction<>());
>
> env.execute("");
>
> /values/.forEach(System./out/::println);
>
> Exception:
>
> Caused by: java.lang.IllegalStateException: Trying to consume an input
> partition whose producer is not ready (result type: BLOCKING, partition
> consumable: false, producer state: DEPLOYING, partition id:
> 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
>
> at
> org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
>
> …
>
> Adding StreamTableEnvironment execute does not help.
>
> Could you please advise what I`m missing?
>