Hi,
We are using Sliding Event Time Window with Kafka Consumer. The window size is 6 minutes, and slide is 2 minutes. We have written a window function to select a particular window out of multiple windows for a keyed stream, e.g. we select about 16 windows out of multiple windows for the keyed stream based on a particular condition. Upon a normal execution, we get 16 windows for processing inside the condition (in window function mentioned). These windows we are putting in different files, named after window start and end times. the code is as below: Calling code public class RealTimeProcessingSlidingWindo public static void main(String[] args) throws Exception { // set up the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.get env.setStreamTimeCharacteristi // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookee kafkaProps.setProperty("bootst kafkaProps.setProperty("group. // always read the Kafka topic from the start kafkaProps.setProperty("auto.o FlinkKafkaConsumer09<Tuple5<St "test", // kafka topic name new dataSchema(), kafkaProps); DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer); DataStream<Tuple5<String, String, Float, Float, String>> keyedStream = stream1 keyedStream.keyBy(4) .window(SlidingEventTimeWindow .apply(new CustomSlidingWindowFunction()) env.execute("Sliding Event Time Window Processing"); } } public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{ @Override public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input, Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception { HashMap<String, Tuple5<String, String, Float, Float, String>> windowMap= new HashMap<String,Tuple5<String, String, Float, Float, String>>(); for (Tuple5<String, String, Float, Float, String> wr: input){ windowMap.put(wr.f1.toString() } ... SimpleDateFormat sf = new SimpleDateFormat(IndianDateTim if(windowMap.containsKey(tk)){ Tuple5<String, String, Float, Float, String> t = (Tuple5<String, String, Float, Float, String>) windowMap.get(tk); Date d = sf.parse(t.f0.trim()); ... // Condition for selecting a window if(d.after(x) && d.before(y)){ // Write the window output to separate files named after window Lat and Lon writeWindowToFile(t, window, input); } } } } // Get the buffered writer private static synchronized BufferedWriter getWriter(String fileName) throws IOException{ return new BufferedWriter(new FileWriter(fileName, true)); } // Writes an entire window to file for the records in that window private static synchronized void writeWindowToFile(Tuple5<Strin // Create a file to write a window to String fileName = target.f2.toString() + "-" + target.f3.toString()+".csv"; BufferedWriter br = getWriter(fileName); // Iterate and put the records in file for (Tuple5<String, String, Float, Float, String> tr: input){ br.write(tr.f1.toString().trim convertLongIntoDate(window.get tr.f0+", "+tr.f2+", "+tr.f3+'\n'); } // flush the writer and close it br.close(); } We have written the code to be threadsafe while creating and writing to file In this code, If we execute the code multiple times on the Kafka Stream (with certain records) most times we get 16 files with corresponding window records, which is the correct behavior. However sometimes only 4 files get created or 1 file or any number less than 16 gets created randomly, this is anomalous behavior. What could be the cause of such behavior? How do we resolve this? Please, could you identify and suggest a solution/s. Thanks. Sujit Sakre This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Free forum by Nabble | Edit this page |