Re: ConnectedIterativeStreams and processing state 1.4.2
Posted by
Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/ConnectedIterativeStreams-and-processing-state-1-4-2-tp19870p19917.html
Hi,
Why do you want to do the enrichment downstream and send the data back up? The problem is that feedback edges (or iterations, they are the same in Flink) have some issues with fault-tolerance. Could you maybe outline a bit more in-depths what you're doing and what the flow of data and enrichment is?
Best,
Aljoscha
Hi.
Because the data that I will cache come from a downstream operator and iterations was the only way to look data back to a prev. Operator as I know
Med venlig hilsen / Best regards
Lasse Nedergaard
Hi,
Why can not you use simple CoProcessFunction and handle cache updates within it’s processElement1 or processElement2 method?
Piotrek
Hi.
I have a case where I have a input stream that I want to enrich with external data. I want to cache some of the external lookup data to improve the overall performances.
To update my cache (a CoProcessFunction) I would use iteration to send the external enriched information back to the cache and update a mapstate. I use CoProcesFunction as the input stream and the enrich stream contains 2 diff.object types and I don't want to mix them.
Because I use a ConnectedIterativeStream I can't use state in my CoProcessFunction because the ConnectedIterativeStream create a DataStream based on the Feedback signature and not the stream I close the iteration with and it is not possible to provide a keySelector in the withFeedbackType
Form Flink source
public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) {
super(input.getExecutionEnvironment(), input, new DataStream(input.getExecutionEnvironment(), new CoFeedbackTransformation(input.getParallelism(), feedbackType, waitTime)));
}
and both streams need to be keyed before state are assigned to the operator.
Any ideas how to workaround this problem?
My sudo code is as below.
IterativeStream.ConnectedIterativeStreams<InputObject, EnrichData> iteration = inputStream
.keyBy(obj -> obj.getkey))
.iterate(maxWaitForIterations).withFeedbackType(TypeInformation.of(new TypeHint<EnrichData>() {}));
DataStream<ReportMessageBase> enrichedStream = iteration
.process(new EnrichFromState());
DataStream<ReportMessageBase> notEnrichedOutput = enrichedStream
.filter(obj -> obj.enriched);
EnrichService EnrichService = new EnrichService();
DataStream<InputObject> enrichedFromApi = EnrichService.parse(notEnrichedOutput);
DataStream<EnrichData> newEnrich = enrichedFromApi
.map(obj -> {
EnrichData newData = new EnrichData();
newData.xx = obj.xx();
return newData;
})
.keyBy(obj -> obj.getkey);
iteration.closeWith(newAddresses);
....