Is it possible to leverage the sort order in DataStream Batch Execution Mode?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Is it possible to leverage the sort order in DataStream Batch Execution Mode?

Marco Villalobos-2
Hello. I am using Flink 1.12.1 in EMR.

I am processing historical time-series data with the DataStream
API in Batch execution mode.

I must average time series data into a fifteen minute interval
and forward fill missing values.

For example, this input:

name, timestamp, value
a,2019-06-23T00:07:30Z,10
b,2019-06-23T00:05:30Z,7
a,2019-06-23T00:09:30Z,10
a,2019-06-23T00:37:30Z,10

would yield this output:

name, timestamp, value, is_forward_fill
a,2019-06-23T00:15:00Z,20,false
b,2019-06-23T00:15:00Z,7,false
a,2019-06-23T00:30:00Z,20,true
b,2019-06-23T00:30:00Z,7,true
a,2019-06-23T00:45:00Z,5,false
b,2019-06-23T00:30:00Z,7,true

My stream code looks something like this

STREAM PSEUDO CODE

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(Tuple with name)
    .process(new FillKeyedProcessFunction())
    .name("fill");

suggests that the stream might be sorted by key.

If that's true, my fill function can be greatly simplified if I were able to leverage that somehow.

I tried implementing a custom pojo key for the fill function like this:

public class FillKey implement Comparable<FillKey>, Serializable  {
    String name;
    Instant timestamp;
    equals // only checks name
    hashcode // only hash name
    compareTo // compares name, timestamp
}

Notice that my key only checks equality on the name, and hashes only the name, but when
it performs comparisons it orders by name, timestamp.

My stream now looks like this:

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(new KeySelector<TimeSeries, FillKey>() {
        @Override
        public FillKey getKey(TimeSeries value) throws Exception {
            return new FillKey(value.name, value.timestamp);
        }
    }).process(new FillKeyedProcessFunction())
    .name("fill");

and it seemed to arrive sorted, but I am getting the woong output because my keyed state no longer seems to work. I expected the stream to arrive in the order of name, step ascending. However, keyed state behaved as though each element that I thought would share the name was different.

Is there an issue with Pojo Keys that break the keyed state in batch execution mode?
Is it possible to take advantage of the sort order within the business logic as I am trying to do?
Reply | Threaded
Open this post in threaded view
|

Re: Is it possible to leverage the sort order in DataStream Batch Execution Mode?

Dawid Wysakowicz-2

I am afraid it is not possible to leverage the sorting for business logic. The sorting is applied on binary representation of the key as it is not necessary sorting per se, but rather grouping by the same keys. You can find more information in the FLIP of this feature e.g. here[1]

Best,

Dawid

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys

On 21/05/2021 09:58, Marco Villalobos wrote:
Hello. I am using Flink 1.12.1 in EMR.

I am processing historical time-series data with the DataStream
API in Batch execution mode.

I must average time series data into a fifteen minute interval
and forward fill missing values.

For example, this input:

name, timestamp, value
a,2019-06-23T00:07:30Z,10
b,2019-06-23T00:05:30Z,7
a,2019-06-23T00:09:30Z,10
a,2019-06-23T00:37:30Z,10

would yield this output:

name, timestamp, value, is_forward_fill
a,2019-06-23T00:15:00Z,20,false
b,2019-06-23T00:15:00Z,7,false
a,2019-06-23T00:30:00Z,20,true
b,2019-06-23T00:30:00Z,7,true
a,2019-06-23T00:45:00Z,5,false
b,2019-06-23T00:30:00Z,7,true

My stream code looks something like this

STREAM PSEUDO CODE

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(Tuple with name)
    .process(new FillKeyedProcessFunction())
    .name("fill");

suggests that the stream might be sorted by key.

If that's true, my fill function can be greatly simplified if I were able to leverage that somehow.

I tried implementing a custom pojo key for the fill function like this:

public class FillKey implement Comparable<FillKey>, Serializable  {
    String name;
    Instant timestamp;
    equals // only checks name
    hashcode // only hash name
    compareTo // compares name, timestamp
}

Notice that my key only checks equality on the name, and hashes only the name, but when
it performs comparisons it orders by name, timestamp.

My stream now looks like this:

stream<TimeSeries>.keyBy(Tuple with step,name)
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction())
    .name("aggregate")
    .keyBy(new KeySelector<TimeSeries, FillKey>() {
        @Override
        public FillKey getKey(TimeSeries value) throws Exception {
            return new FillKey(value.name, value.timestamp);
        }
    }).process(new FillKeyedProcessFunction())
    .name("fill");

and it seemed to arrive sorted, but I am getting the woong output because my keyed state no longer seems to work. I expected the stream to arrive in the order of name, step ascending. However, keyed state behaved as though each element that I thought would share the name was different.

Is there an issue with Pojo Keys that break the keyed state in batch execution mode?
Is it possible to take advantage of the sort order within the business logic as I am trying to do?

OpenPGP_signature (855 bytes) Download Attachment