Window is not working in streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Window is not working in streaming

Abhijeet Kumar
Hello Team,

I'm new to Flink and I don't know why window is not working

DataStream<Tuple7<String, String, String, String, String, String, Long>> window2 = stream2.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>() {
public long extractAscendingTimestamp(
Tuple7<String, String, String, String, String, String, Long> t) {
return t.f6;
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new Reducer2());


window2.print();

So, this is the code that I've written for 2 sec window and I'm using event time for processing window

My data format is like 

13,234234,34,32445,3423fsdf,20191111111119

The last value in csv is time(YYYYMMDDHHmmss)

Definition of Reducer2:

public static final class Reducer2
implements ReduceFunction<Tuple7<String, String, String, String, String, String, Long>> {
public Tuple7<String, String, String, String, String, String, Long> reduce(
Tuple7<String, String, String, String, String, String, Long> t_new,
Tuple7<String, String, String, String, String, String, Long> t_old) {
return new Tuple7<String, String, String, String, String, String, Long>(t_new.f0, t_new.f1, t_new.f2,
t_new.f3, t_new.f4, t_new.f5, t_new.f6);
}
}

With my understanding when the data comes similar to above sample, then first window is created. When timestamp is 20191111111120 then, this will also be added to the window. Finally if something comes like 20191111111122, the old window should be stopped and this code should print the result on the console. Problem is it's not working the same way. May be my understanding is not correct. please correct me if I'm wrong.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !


Reply | Threaded
Open this post in threaded view
|

Re: Window is not working in streaming

Abhijeet Kumar
Hi Taher,

Thanks for the quick response but, if you tell the problem in my code then it would be a great help.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !


On 26-Nov-2018, at 1:25 PM, Taher Koitawala <[hidden email]> wrote:

Hi Abhijeet,
                  Refer to this code

assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, String>>() {
long currentTimstamp = 0L;

@Override
public long extractTimestamp(Tuple4<String, String, String, String> tuple4, long timestamp) {
currentTimstamp = Long.parseLong(tuple4.f0.substring(0, 13));
return currentTimstamp;
}


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 1:05 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and I don't know why window is not working

DataStream<Tuple7<String, String, String, String, String, String, Long>> window2 = stream2.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>() {
public long extractAscendingTimestamp(
Tuple7<String, String, String, String, String, String, Long> t) {
return t.f6;
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new Reducer2());

window2.print();

So, this is the code that I've written for 2 sec window and I'm using event time for processing window

My data format is like 

13,234234,34,32445,3423fsdf,20191111111119

The last value in csv is time(YYYYMMDDHHmmss)

Definition of Reducer2:

public static final class Reducer2
implements ReduceFunction<Tuple7<String, String, String, String, String, String, Long>> {
public Tuple7<String, String, String, String, String, String, Long> reduce(
Tuple7<String, String, String, String, String, String, Long> t_new,
Tuple7<String, String, String, String, String, String, Long> t_old) {
return new Tuple7<String, String, String, String, String, String, Long>(t_new.f0, t_new.f1, t_new.f2,
t_new.f3, t_new.f4, t_new.f5, t_new.f6);
}
}

With my understanding when the data comes similar to above sample, then first window is created. When timestamp is 20191111111120 then, this will also be added to the window. Finally if something comes like 20191111111122, the old window should be stopped and this code should print the result on the console. Problem is it's not working the same way. May be my understanding is not correct. please correct me if I'm wrong.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !



Reply | Threaded
Open this post in threaded view
|

Re: Window is not working in streaming

Taher Koitawala
Which version of Flink are you using? The method was deprecated in your code, as per the new syntax this is the correct way to extract timestamp with periodic watermarks.

On Mon 26 Nov, 2018, 1:45 PM Abhijeet Kumar <[hidden email] wrote:
Hi Taher,

Thanks for the quick response but, if you tell the problem in my code then it would be a great help.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !


On 26-Nov-2018, at 1:25 PM, Taher Koitawala <[hidden email]> wrote:

Hi Abhijeet,
                  Refer to this code

assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, String>>() {
long currentTimstamp = 0L;

@Override
public long extractTimestamp(Tuple4<String, String, String, String> tuple4, long timestamp) {
currentTimstamp = Long.parseLong(tuple4.f0.substring(0, 13));
return currentTimstamp;
}


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Mon, Nov 26, 2018 at 1:05 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and I don't know why window is not working

DataStream<Tuple7<String, String, String, String, String, String, Long>> window2 = stream2.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>() {
public long extractAscendingTimestamp(
Tuple7<String, String, String, String, String, String, Long> t) {
return t.f6;
}
}).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new Reducer2());

window2.print();

So, this is the code that I've written for 2 sec window and I'm using event time for processing window

My data format is like 

13,234234,34,32445,3423fsdf,20191111111119

The last value in csv is time(YYYYMMDDHHmmss)

Definition of Reducer2:

public static final class Reducer2
implements ReduceFunction<Tuple7<String, String, String, String, String, String, Long>> {
public Tuple7<String, String, String, String, String, String, Long> reduce(
Tuple7<String, String, String, String, String, String, Long> t_new,
Tuple7<String, String, String, String, String, String, Long> t_old) {
return new Tuple7<String, String, String, String, String, String, Long>(t_new.f0, t_new.f1, t_new.f2,
t_new.f3, t_new.f4, t_new.f5, t_new.f6);
}
}

With my understanding when the data comes similar to above sample, then first window is created. When timestamp is 20191111111120 then, this will also be added to the window. Finally if something comes like 20191111111122, the old window should be stopped and this code should print the result on the console. Problem is it's not working the same way. May be my understanding is not correct. please correct me if I'm wrong.

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !