Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

Sam Huang

Hi,

I'm using Flink 1.2.0 to read from Kafka-0.8.1.1_2.10.

I have written a flink streaming job that creates (event) time based window and then computes some stats. However, the window function is never called. I used the debug watermark code and noticed that no watermark is generated. If I read from file, then only one watermark is generated. Here is my code (reading from kafka)- 

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);

// Read from kafka (it works as the following print statement works)
DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
// jsonEventStream.print();

jsonEventStream
.flatMap(new strToTupleFlatMapFunImpl())
.assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())
.flatMap(new jsonToTupleListFlatMapFunImpl())
.keyBy(0, 1, 2)
.timeWindow(Time.seconds(60))
.allowedLateness(Time.seconds(10))
.reduce(new ReduceFunImpl(), new WindowFunImpl()) // reduce fun gets called but not window fun
.addSink(new InfluxDBSink(INFLUXDB_DB));

env.execute();
}

private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>> getRawJsonTimestampsAndWatermarksAssigner() {
return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(WINDOW_LATENESS)) {
@Override
public long extractTimestamp(Tuple2<String, Long> tuple) {
return tuple.f1;
}
};
}


public static StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws IOException {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
//env.getConfig().setAutoWatermarkInterval(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
return env;
}


Any help will be appreciated.

Thank you,

Sam

Reply | Threaded
Open this post in threaded view
|

Re: Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

Aljoscha Krettek
Hi Sam,
do you have an idea what the timestamps in your data are, i.e. the tuple.f1 field you're extracting. What you could try is instead of windowing simply print your data and observe the timestamps. Maybe we can learn something from this about why the window doesn't trigger.

Best,
Aljoscha

On Wed, Mar 1, 2017 at 12:13 AM, Sam Huang <[hidden email]> wrote:

Hi,

I'm using Flink 1.2.0 to read from Kafka-0.8.1.1_2.10.

I have written a flink streaming job that creates (event) time based window and then computes some stats. However, the window function is never called. I used the debug watermark code and noticed that no watermark is generated. If I read from file, then only one watermark is generated. Here is my code (reading from kafka)- 

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkUtil.createExecutionEnvironment(args);

// Read from kafka (it works as the following print statement works)
DataStream<String> jsonEventStream = JsonEventStreamReader.readStream(env);
// jsonEventStream.print();

jsonEventStream
.flatMap(new strToTupleFlatMapFunImpl())
.assignTimestampsAndWatermarks(getRawJsonTimestampsAndWatermarksAssigner())
.flatMap(new jsonToTupleListFlatMapFunImpl())
.keyBy(0, 1, 2)
.timeWindow(Time.seconds(60))
.allowedLateness(Time.seconds(10))
.reduce(new ReduceFunImpl(), new WindowFunImpl()) // reduce fun gets called but not window fun
.addSink(new InfluxDBSink(INFLUXDB_DB));

env.execute();
}

private static BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>> getRawJsonTimestampsAndWatermarksAssigner() {
return new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(WINDOW_LATENESS)) {
@Override
public long extractTimestamp(Tuple2<String, Long> tuple) {
return tuple.f1;
}
};
}


public static StreamExecutionEnvironment createExecutionEnvironment(String[] args) throws IOException {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
//env.getConfig().setAutoWatermarkInterval(1000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
return env;
}


Any help will be appreciated.

Thank you,

Sam


Reply | Threaded
Open this post in threaded view
|

Re: Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

Sam Huang
So sorry I forgot to reply. I've solved the problem, turns out I didn't input data which generates a watermark greater than my first window end time, so no window was triggered.
Reply | Threaded
Open this post in threaded view
|

Re: Event-time tumbling window doesn't fire- Flink 1.2.0, Kafka-0.8_2.10

Aljoscha Krettek
Great you could figure it out! And thanks for letting us know.

On Wed, Mar 8, 2017, at 03:03, Sam Huang wrote:

> So sorry I forgot to reply. I've solved the problem, turns out I didn't
> input
> data which generates a watermark greater than my first window end time,
> so
> no window was triggered.
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Event-time-tumbling-window-doesn-t-fire-Flink-1-2-0-Kafka-0-8-2-10-tp11976p12095.html
> Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.