Hi,
We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly. To test the program, we are generating a stream of data from command line. This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance. Please could you suggest what could be wrong. Many thanks. Sujit Sakre This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Hi, a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner? Cheers, Aljoscha On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
|
Hi Aljoscha,
Thanks. Yes, we are using Event Time. Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch. Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2 Are we using the properties for Kafka correctly? We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11 More about the behavior: I have noticed that sometimes even after the first writing to the Kafka queue, and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random. Following is the rough outline of our code. public class SlidingWindo public static void main(String[] args) throws Exception { // set up the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.get env.setStreamTimeCharacteristi // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookee kafkaProps.setProperty("bootst kafkaProps.setProperty("group. // always read the Kafka topic from the start kafkaProps.setProperty("auto.o FlinkKafkaConsumer09<Tuple5<St "test", // kafka topic name new dataSchema(), kafkaProps); DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer); DataStream<Tuple5<String, String, Float, Float, String>> keyedStream = stream1 keyedStream.keyBy(4) .window(SlidingEventTimeWindow .apply(new CustomSlidingWindowFunction()) env.execute("Sliding Event Time Window Processing"); } } public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{ @Override public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input, Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception { .... } // Implemented custom Periodic Watermark as below from
public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) {
//System.out.println("inside extractTimestamp");
Date parseDate = null;
SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
try {
parseDate = dateFormat.parse(element.f0);
} catch (ParseException e) {
e.printStackTrace();
}
long timestamp = parseDate.getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as twice the current highest timestamp minus the out-of-orderness bound
// this is because it is not covering the lateness sufficiently; now it does
// in future this may be multiple of 3 or more if necessary to cover the gap in records received
return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness);
}
} Sujit Sakre On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Hi, I would guess that the watermark generation does not work as expected. I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation. On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
|
Hi Robert, Aljoscha,
Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark. We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset. Is there a formula or algorithm for generating the right watermark? Please could suggest. Thanks again. Sujit Sakre On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Hi, what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case? Cheers, Aljoscha On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <[hidden email]> wrote:
|
Hi Aljoscha,
Thanks for your response. We wanted to customize the watermark period calculation since we were not getting the desired results with BoundedOutOfOrdernessGenerator class. As for the current problem, we have identified the problem of why the records are not processed as expected. It is related to the watermark calculation. The formula is to assign the sliding window size (in milliseconds) as maximum out of orderness parameter and add it to the current maximum timestamp while generating new watermark. i.e. @Override public Watermark getCurrentWatermark() { Watermark watermark = new Watermark(currentMaxTimestamp + maxOutOfOrderness); return watermark; } Sujit Sakre On 2 February 2017 at 21:09, Aljoscha Krettek <[hidden email]> wrote:
This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Implementing this formula seems to have solved our problem now. Thanks. On 2 February 2017 at 21:21, Sujit Sakre <[hidden email]> wrote:
This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully; (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system. Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted. You should scan attachments (if any) for viruses. Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN. Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
|
Ok, thanks for letting us know! On Thu, 2 Feb 2017 at 17:40 Sujit Sakre <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |