Regarding ordering of events

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Regarding ordering of events

Abdul Salam Shaikh
Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​


Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Kostas Kloudas
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​



Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Fabian Hueske-2
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​




Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Abdul Salam Shaikh
Thanks Fabian and Kostas, 

How can I put to use the power of flink as a distributed system ? 

In cases where we have multiple windows, is one single window handled by one partition entirely or is it spread across several partitions ? 

On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​







--
Thanks & Regards,

Abdul Salam Shaikh

Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Kostas Kloudas
Hi Abdul,

Every window is handled by a single machine, if this is what you mean by “partition”.

Kostas

On Jan 5, 2017, at 9:21 PM, Abdul Salam Shaikh <[hidden email]> wrote:

Thanks Fabian and Kostas, 

How can I put to use the power of flink as a distributed system ? 

In cases where we have multiple windows, is one single window handled by one partition entirely or is it spread across several partitions ? 

On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​







--
Thanks & Regards,

Abdul Salam Shaikh


Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Aljoscha Krettek
Hi,
to clarify what Kostas said. A "single window" in this case is a window for a given key and time period so the window for "key1" in time t1 to t2 can be processed on a different machine from the window for "key2" in time t1 to t2.

Cheers,
Aljoscha

On Thu, 5 Jan 2017 at 21:56 Kostas Kloudas <[hidden email]> wrote:
Hi Abdul,

Every window is handled by a single machine, if this is what you mean by “partition”.

Kostas

On Jan 5, 2017, at 9:21 PM, Abdul Salam Shaikh <[hidden email]> wrote:

Thanks Fabian and Kostas, 

How can I put to use the power of flink as a distributed system ? 

In cases where we have multiple windows, is one single window handled by one partition entirely or is it spread across several partitions ? 

On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​







--
Thanks & Regards,

Abdul Salam Shaikh


Reply | Threaded
Open this post in threaded view
|

Re: Regarding ordering of events

Abdul Salam Shaikh
Thanks a lot Aljoshca, this was a perfect answer to my vague question. 

On 09-Jan-2017 4:52 pm, "Aljoscha Krettek" <[hidden email]> wrote:
Hi,
to clarify what Kostas said. A "single window" in this case is a window for a given key and time period so the window for "key1" in time t1 to t2 can be processed on a different machine from the window for "key2" in time t1 to t2.

Cheers,
Aljoscha

On Thu, 5 Jan 2017 at 21:56 Kostas Kloudas <[hidden email]> wrote:
Hi Abdul,

Every window is handled by a single machine, if this is what you mean by “partition”.

Kostas

On Jan 5, 2017, at 9:21 PM, Abdul Salam Shaikh <[hidden email]> wrote:

Thanks Fabian and Kostas, 

How can I put to use the power of flink as a distributed system ? 

In cases where we have multiple windows, is one single window handled by one partition entirely or is it spread across several partitions ? 

On Thu, Jan 5, 2017 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Flink is a distributed system and does not preserve order across partitions.
The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator.

You can set the parallelism to 1 to have the stream in order.

Fabian

2017-01-05 12:16 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Abdul,

Flink provides no ordering guarantees on the elements within a window.
The only “order” it guarantees is that the results referring to window-1 are
going to be emitted before those of window-2 (assuming that window-1 precedes window-2).

Thanks,
Kostas

On Jan 5, 2017, at 11:57 AM, Abdul Salam Shaikh <[hidden email]> wrote:

Hi, 

I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows: 

{"event":[{"Umlaufsekunde":115}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":135}]}
{"event":[{"Umlaufsekunde":145}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":155}]}
{"event":[{"Umlaufsekunde":185}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":195}]}
{"event":[{"Umlaufsekunde":205}]}
{"event":[{"Umlaufsekunde":245}]}

However, when I try to print the stream, it is unordered as given below: 
1> (115,null,1483517983252,1190)  -- The first value indicating Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as follows: 

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{
    
    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);   
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }
    
}

Could anyone suggest how do I handle this problem for the arrival of events in order ? 

​Thanks!​







--
Thanks & Regards,

Abdul Salam Shaikh