How are timestamps treated within an iterative DataStream loop within Flink?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How are timestamps treated within an iterative DataStream loop within Flink?

John Tipper

Hi All,

How are timestamps treated within an iterative DataStream loop within Flink?

For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

My questions revolve around how does Flink use timestamps within a feedback loop:

  • Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects?
  • How does the behaviour change when using event time processing?

Many thanks,

John


Reply | Threaded
Open this post in threaded view
|

Re: How are timestamps treated within an iterative DataStream loop within Flink?

Yun Gao

Hi John,

    As a whole, I think currently Flink does not have special mechanism for event-time in iteration. This means the IterationHead treats the initial input and the feedback input as two normal inputs and use the same mechanism with the tasks outside the iteration.

    This may cause disorder of the event-time inside the iteration. The event time relies on the watermark alignment mechanism to mark the least event-time of the following records. Suppose we have a watermark with event-time 10, The iteration head will first receive the  watermark from the initial input, and then receive it again from the feedback input after the first round of iteration. Then IterationHead will think the watermarks have aligned at the event-time 10, so it will emits the watermark with event-time 10 to the final output, which means that it will not receive and emit records whose event-time is less than 10. However, since the records may iterate multiple rounds, the IterationHead may still receive the records whose event-time is less than 10 again in the following rounds of iteration. Then the disorder of the event-time occurs.

Best,
Yun Gao

------------------------------------------------------------------
From:John Tipper <[hidden email]>
Send Time:2019 Jun. 8 (Sat.) 21:19
Subject:How are timestamps treated within an iterative DataStream loop within Flink?

Hi All,

How are timestamps treated within an iterative DataStream loop within Flink?

For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

My questions revolve around how does Flink use timestamps within a feedback loop:

  • Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects?
  • How does the behaviour change when using event time processing?

Many thanks,

John