http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Sliding-Event-Time-Window-Processing-Window-Function-inconsistent-behavior-tp10946.html
Hi,
We are using Sliding Event Time Window with Kafka Consumer. The window size is 6 minutes, and slide is 2 minutes. We have written a window function to select a particular window out of multiple windows for a keyed stream, e.g. we select about 16 windows out of multiple windows for the keyed stream based on a particular condition.
Upon a normal execution, we get 16 windows for processing inside the condition (in window function mentioned). These windows we are putting in different files, named after window start and end times.
the code is as below:
Calling code
public class RealTimeProcessingSlidingWindow{
public static void main(String[] args) throws Exception {
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", DEMO_GROUP); // always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");
FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test", // kafka topic name
new dataSchema(),
kafkaProps);
DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
DataStream<Tuple5<String, String, Float, Float, String>> keyedStream = stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());
keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2))) // 6 min window with 2 min sliding window
.apply(new CustomSlidingWindowFunction());
env.execute("Sliding Event Time Window Processing");
}
}
public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {
HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap= new HashMap<String,Tuple5<String, String, Float, Float, String>>();
for (Tuple5<String, String, Float, Float, String> wr: input){
windowMap.put(wr.f1.toString().trim(), wr);
}
...
SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);
if(windowMap.containsKey(tk)){
Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String, Float, Float, String>) windowMap.get(tk);
Date d = sf.parse(t.f0.trim());
...
// Condition for selecting a window
if(d.after(x) && d.before(y)){
// Write the window output to separate files named after window Lat and Lon
writeWindowToFile(t, window, input);
}
}
}
}
// Get the buffered writer
private static synchronized BufferedWriter getWriter(String fileName) throws IOException{
return new BufferedWriter(new FileWriter(fileName, true));
}
// Writes an entire window to file for the records in that window
private static synchronized void writeWindowToFile(Tuple5<String, String, Float, Float, String> target, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input) throws IOException{
// Create a file to write a window to
String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv";
BufferedWriter br = getWriter(fileName);
// Iterate and put the records in file
for (Tuple5<String, String, Float, Float, String> tr: input){
br.write(tr.f1.toString().trim()+", "+
convertLongIntoDate(window.getStart())+", "+convertLongIntoDate(window.getEnd())+", "+
tr.f0+", "+tr.f2+", "+tr.f3+'\n');
}
// flush the writer and close it
br.close();
}
We have written the code to be threadsafe while creating and writing to file
In this code, If we execute the code multiple times on the Kafka Stream (with certain records) most times we get 16 files with corresponding window records, which is the correct behavior.
However sometimes only 4 files get created or 1 file or any number less than 16 gets created randomly, this is anomalous behavior.
What could be the cause of such behavior? How do we resolve this?
Please, could you identify and suggest a solution/s.
Thanks.