Re: Timestamp and key preservation over operators

Posted by Averell on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Timestamp-and-key-preservation-over-operators-tp27627p27655.html

Hi Fabian, Guowei

Thanks for the help. My flow is as the attached photo. Where (1) and (2) are
the main data streams from file sources, while (3) and (4) are the
enrichment data, also from file sources.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-05-01_at_08.png>

(5) is to merge-parse (1) and (2), which consists of:
        A tumbling window function, with early trigger (basing on the number of
records in the window: FIRE when there have been at least one msg from each
stream 1 & 2, not waiting for window end-time)
        A flat map function to parse the incoming msg
        A filter and a map

(6) works as a data enricher, to enrich output of (5) with data from (3) and
(4). As (4) is broadcasted, what My implementation for (6) is like:
        /stream5.union(stream3).keyBy(key2).connect(stream4).process(MyFunction6
extends KeyedBroadcastProcessFunction)/
In this KeyedBroadcastProcessFunction, one msg from (5) would trigger one
output, while a msg from (3) or (4) doesn't send out any records, but update
the States only.

Regarding messages type:
        Outputs of (1) and (2) are of the same type EventType1.
        Output of (3) is of type EventType2_1 extends EventType2
        Output of (5) is of type EventType2_2 extends EventType2
        Input of (6) is of type EventType2 (from the unioned-keyed-stream), and of
type Type3 (from the broadcast stream)
        Output of (6) is of the type EventType2_3, which is mapped from EvenType2_1

As seen on my screenshot, only (5) showed watermark, not (6) nor (7). I
noticed that problem because my (7) didn't work as expected. And when I put
an eventTimeExtractor between (6) and (7), then (7) worked.

Typing all the way until now, I guess that I have known where my issue came
from: I have not assign timestamp/watermark for (3) and (4) because I
thought that they are just idle sources of enrichment data.

/*Because of this, I have another question:*/
I read the text regarding Idling sources [1], but not sure how to implement
that for my file sources. Could you please recommend a
solution/good-practice here?

I have one more question about the recommendation [2] to emit timestamp and
watermark from within the source function. Is there any way to do that with
the file sources?

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#idling-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html#source-functions-with-timestamps-and-watermarks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/