Kafka data not read in FlinkKafkaConsumer09 second time from command line

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka data not read in FlinkKafkaConsumer09 second time from command line

Sujit Sakre
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Aljoscha Krettek
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Sujit Sakre
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail-m_-7713016722828623542gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

rmetzger0
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.

Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Sujit Sakre
Hi Robert, Aljoscha,

Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark.

We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset.

Is there a formula or algorithm for generating the right watermark? Please could suggest.

Thanks again.


Sujit Sakre


On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_6704868842818651820m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Aljoscha Krettek
Hi,
what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case?

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <[hidden email]> wrote:
Hi Robert, Aljoscha,

Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark.

We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset.

Is there a formula or algorithm for generating the right watermark? Please could suggest.

Thanks again.


Sujit Sakre


On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_3151350845860039203m_6704868842818651820m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Sujit Sakre
Hi Aljoscha,

Thanks for your response.

We wanted to customize the watermark period calculation since we were not getting the desired results with BoundedOutOfOrdernessGenerator class.

As for the current problem, we have identified the problem of why the records are not processed as expected. It is related to the watermark calculation. The formula is to assign the sliding window size (in milliseconds) as maximum out of orderness parameter  and add it to the current maximum timestamp while generating new watermark.

i.e. 

   @Override
   public Watermark getCurrentWatermark() {
   
    Watermark watermark = new Watermark(currentMaxTimestamp + maxOutOfOrderness);

       return watermark;
   }


Sujit Sakre


On 2 February 2017 at 21:09, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case?

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <[hidden email]> wrote:
Hi Robert, Aljoscha,

Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark.

We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset.

Is there a formula or algorithm for generating the right watermark? Please could suggest.

Thanks again.


Sujit Sakre


On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail-m_-8722451040438803126m_3151350845860039203m_6704868842818651820m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Sujit Sakre

Implementing this formula seems to have solved our problem now. Thanks.


On 2 February 2017 at 21:21, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.

We wanted to customize the watermark period calculation since we were not getting the desired results with BoundedOutOfOrdernessGenerator class.

As for the current problem, we have identified the problem of why the records are not processed as expected. It is related to the watermark calculation. The formula is to assign the sliding window size (in milliseconds) as maximum out of orderness parameter  and add it to the current maximum timestamp while generating new watermark.

i.e. 

   @Override
   public Watermark getCurrentWatermark() {
   
    Watermark watermark = new Watermark(currentMaxTimestamp + maxOutOfOrderness);

       return watermark;
   }


Sujit Sakre


On 2 February 2017 at 21:09, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case?

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <[hidden email]> wrote:
Hi Robert, Aljoscha,

Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark.

We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset.

Is there a formula or algorithm for generating the right watermark? Please could suggest.

Thanks again.


Sujit Sakre


On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_4015600397865012649gmail-m_-8722451040438803126m_3151350845860039203m_6704868842818651820m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg m_4015600397865012649gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_4015600397865012649gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_4015600397865012649gmail-m_-8722451040438803126gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Kafka data not read in FlinkKafkaConsumer09 second time from command line

Aljoscha Krettek
Ok, thanks for letting us know!

On Thu, 2 Feb 2017 at 17:40 Sujit Sakre <[hidden email]> wrote:

Implementing this formula seems to have solved our problem now. Thanks.


On 2 February 2017 at 21:21, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks for your response.

We wanted to customize the watermark period calculation since we were not getting the desired results with BoundedOutOfOrdernessGenerator class.

As for the current problem, we have identified the problem of why the records are not processed as expected. It is related to the watermark calculation. The formula is to assign the sliding window size (in milliseconds) as maximum out of orderness parameter  and add it to the current maximum timestamp while generating new watermark.

i.e. 

   @Override
   public Watermark getCurrentWatermark() {
   
    Watermark watermark = new Watermark(currentMaxTimestamp + maxOutOfOrderness);

       return watermark;
   }


Sujit Sakre


On 2 February 2017 at 21:09, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what about using BoundedOutOfOrdernessGenerator? Why did that not work for your case?

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:20 Sujit Sakre <[hidden email]> wrote:
Hi Robert, Aljoscha,

Many thanks for pointing out. Watermark generation is the problem. It is generating timestamps far ahead of current year due to our code which tried to cover all records but inadvertently made a very large watermark.

We have tried fixing this with other combinations of generating watermarks, however, we are unable to find the right combination and end up not processing at least one record from our dataset.

Is there a formula or algorithm for generating the right watermark? Please could suggest.

Thanks again.


Sujit Sakre


On 26 January 2017 at 20:17, Robert Metzger <[hidden email]> wrote:
Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <[hidden email]> wrote:
Hi Aljoscha,

Thanks.

Yes, we are using Event Time.
Yes, Flink program is kept running in the IDE, i.e. eclipse and not closed, after the first batch of events and when adding the second batch.
Yes, We do have acustom timestamp/watermark assigner, implemented as BoundedOutOfOrdernessGenerator2

Are we using the properties for Kafka correctly?
We are using Flink 1.1.1 and Flink Kafka connector: flink-connector-kafka-0.9_2.11

More about the behavior: 
I have noticed that sometimes even after the first writing to the Kafka queue,  and when the Flink program runs, sometimes it does process the queue immediately. We need to restart. This is quite random.

Following is the rough outline of our code.

public class SlidingWindow2{

public static void main(String[] args) throws Exception {

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// configure the Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "demo");
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset" ,"earliest");

                FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, String>> consumer = new FlinkKafkaConsumer09<>(
"test",            // kafka topic name
new dataSchema(),
kafkaProps);
                DataStream<Tuple5<String, String, Float, Float, String>> stream1 = env.addSource(consumer);
                DataStream<Tuple5<String, String, Float, Float, String>> keyedStream stream1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator2());

                keyedStream.keyBy(4)
.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) 
.apply(new CustomSlidingWindowFunction());

                env.execute("Sliding Event Time Window Processing");

           }
}


public static class CustomSlidingWindowFunction implements WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, String, Float, Float, String>, Tuple, TimeWindow>{

@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, String, Float, Float, String>> input,
Collector<Tuple5<String, String, Float, Float, String>> out) throws Exception {

....
        }


// Implemented custom Periodic Watermark as below from public static class BoundedOutOfOrdernessGenerator2 implements AssignerWithPeriodicWatermarks<Tuple5<String, String, Float, Float, String>> { /** * */ private static final long serialVersionUID = 1L; private final long maxOutOfOrderness = MAX_EVENT_DELAY; // constant set in seconds private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, String, Float, Float, String> element, long previousElementTimestamp) { //System.out.println("inside extractTimestamp"); Date parseDate = null; SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) { e.printStackTrace(); } long timestamp = parseDate.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as twice the current highest timestamp minus the out-of-orderness bound // this is because it is not covering the lateness sufficiently; now it does // in future this may be multiple of 3 or more if necessary to cover the gap in records received return new Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }





Sujit Sakre



On 24 January 2017 at 22:34, Aljoscha Krettek <[hidden email]> wrote:
Hi,
a bit more information would be useful. Are you using event-time? Is the Flink program kept running after adding the first batch of events and when adding the second batch or is it to invocations of your Flink program? Do you have a custom timestamp/watermark assigner?

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <[hidden email]> wrote:
Hi,

We are using a sliding window function to process data read from Kafka Stream. We are using FlinkKafkaConsumer09 to read the data. The window function and sink are running correctly.

To test the program, we are generating a stream of data from command line.
This works when we add set of records once. When we add again, it does not work, Flink produces no result, even though the records are added to same Kafka topic from the same command line instance.

Please could you suggest what could be wrong.

Many thanks.


Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_7067207838049424028m_4015600397865012649gmail-m_-8722451040438803126m_3151350845860039203m_6704868842818651820m_-4085986195332816948gmail-m_-7713016722828623542gmail_msg m_7067207838049424028m_4015600397865012649gmail-m_-8722451040438803126gmail_msg gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_7067207838049424028m_4015600397865012649gmail-m_-8722451040438803126gmail_msg gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="m_7067207838049424028m_4015600397865012649gmail-m_-8722451040438803126gmail_msg gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.