Problem getting watermark right with event time

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem getting watermark right with event time

Sudan S
Hi,

I am having a problem getting watermark right. The setup is 
- I have a Flink Job which reads from a Kafka topic, uses Protobuf Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up the value and finally returns the result.

The code is pasted below. 

The problem here is, I'm not able to reach the sink. I am able to reach the assignTimestamp when the timestamp arrives, but past that, neither process function nor the sink function is getting invoked in spite of pumping events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        Properties kafkaConsumerProps = new Properties();
        kafkaConsumerProps.setProperty("bootstrap.servers", "{bootstrap_servers}");
        kafkaConsumerProps.setProperty("group.id", "{group_id}");


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(100);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setMaxParallelism(5);
        env.setParallelism(5);

        SingleOutputStreamOperator<Eventi.Event> texStream = env
                .addSource(new FlinkKafkaConsumer011<>("auth", new EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
        SlidingEventTimeWindows window = SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
        texStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Eventi.Event>() {
            @Override
            public long extractAscendingTimestamp(Eventi.Event element) {
                return element.getEventTime().getSeconds() * 1000;
            }
        }).keyBy(Eventi.Event::getEventTime).window(window).process(new ProcessWindowFunction<Eventi.Event, Object, Timestamp, TimeWindow>() {
            @Override
            public void process(Timestamp timestamp, Context context, Iterable<Eventi.Event> elements, Collector<Object> out) throws Exception {
                int sum = 0;
                for (Eventi.Event element : elements) {
                    sum++;
                }
                out.collect(sum);
            }
        }).print()

        env.execute();
    }
}



"The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business."
Reply | Threaded
Open this post in threaded view
|

Re: Problem getting watermark right with event time

Fabian Hueske-2
Hi Sudan,

I noticed a few issues with your code:

1) Please check the computation of timestamps. Your code

public long extractAscendingTimestamp(Eventi.Event element) {
      return element.getEventTime().getSeconds() * 1000;
}

only seems to look at the seconds of a timestamp. Typically, you would just return the whole timestamp encoded as a long that represents the milliseconds since epoch (1970-01-01 00:00:00.000).
Why do you multiple with 1000?

2) An AscendingTimestampExtractor assumes that records arrive with strictly ascending timestamps.
If the timestamps in your data are slightly out of order, you probably want another watermark assigner for example a BoundedOutOfOrdernessTimestampExtractor [1].

3) You probably don't want to key on event time:

keyBy(Eventi.Event::getEventTime)

Usually, you choose a partitioning key here. If you cannot partition your data and all records should be grouped in the single stream of windows you should use DataStream.windowAll().
Note however, that this means that your code cannot run in parallel. See [2] for details.

 Best, Fabian


Am So., 19. Apr. 2020 um 21:37 Uhr schrieb Sudan S <[hidden email]>:
Hi,

I am having a problem getting watermark right. The setup is 
- I have a Flink Job which reads from a Kafka topic, uses Protobuf Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up the value and finally returns the result.

The code is pasted below. 

The problem here is, I'm not able to reach the sink. I am able to reach the assignTimestamp when the timestamp arrives, but past that, neither process function nor the sink function is getting invoked in spite of pumping events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        Properties kafkaConsumerProps = new Properties();
        kafkaConsumerProps.setProperty("bootstrap.servers", "{bootstrap_servers}");
        kafkaConsumerProps.setProperty("group.id", "{group_id}");


        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(100);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setMaxParallelism(5);
        env.setParallelism(5);

        SingleOutputStreamOperator<Eventi.Event> texStream = env
                .addSource(new FlinkKafkaConsumer011<>("auth", new EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
        SlidingEventTimeWindows window = SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
        texStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Eventi.Event>() {
            @Override
            public long extractAscendingTimestamp(Eventi.Event element) {
                return element.getEventTime().getSeconds() * 1000;
            }
        }).keyBy(Eventi.Event::getEventTime).window(window).process(new ProcessWindowFunction<Eventi.Event, Object, Timestamp, TimeWindow>() {
            @Override
            public void process(Timestamp timestamp, Context context, Iterable<Eventi.Event> elements, Collector<Object> out) throws Exception {
                int sum = 0;
                for (Eventi.Event element : elements) {
                    sum++;
                }
                out.collect(sum);
            }
        }).print()

        env.execute();
    }
}



"The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business."