Hello. I am using Flink 1.12.1 in EMR. I am processing historical time-series data with the DataStream API in Batch execution mode. I must average time series data into a fifteen minute interval and forward fill missing values. For example, this input: name, timestamp, value a,2019-06-23T00:07:30Z,10 b,2019-06-23T00:05:30Z,7 a,2019-06-23T00:09:30Z,10 a,2019-06-23T00:37:30Z,10 would yield this output: name, timestamp, value, is_forward_fill a,2019-06-23T00:15:00Z,20,false b,2019-06-23T00:15:00Z,7,false a,2019-06-23T00:30:00Z,20,true b,2019-06-23T00:30:00Z,7,true a,2019-06-23T00:45:00Z,5,false b,2019-06-23T00:30:00Z,7,true My stream code looks something like this STREAM PSEUDO CODE stream<TimeSeries>.keyBy(Tuple with step,name) .window(TumblingEventTimeWindows.of(Time.minutes(15))) .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction()) .name("aggregate") .keyBy(Tuple with name) .process(new FillKeyedProcessFunction()) .name("fill"); The documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html) suggests that the stream might be sorted by key. If that's true, my fill function can be greatly simplified if I were able to leverage that somehow. I tried implementing a custom pojo key for the fill function like this: public class FillKey implement Comparable<FillKey>, Serializable { String name; Instant timestamp; equals // only checks name hashcode // only hash name compareTo // compares name, timestamp } Notice that my key only checks equality on the name, and hashes only the name, but when it performs comparisons it orders by name, timestamp. My stream now looks like this: stream<TimeSeries>.keyBy(Tuple with step,name) .window(TumblingEventTimeWindows.of(Time.minutes(15))) .aggregate(new AggregateFunction(), new AggregateProcessWindowFunction()) .name("aggregate") .keyBy(new KeySelector<TimeSeries, FillKey>() { @Override public FillKey getKey(TimeSeries value) throws Exception { return new FillKey(value.name, value.timestamp); } }).process(new FillKeyedProcessFunction()) .name("fill"); and it seemed to arrive sorted, but I am getting the woong output because my keyed state no longer seems to work. I expected the stream to arrive in the order of name, step ascending. However, keyed state behaved as though each element that I thought would share the name was different. Is there an issue with Pojo Keys that break the keyed state in batch execution mode? Is it possible to take advantage of the sort order within the business logic as I am trying to do? |
I am afraid it is not possible to leverage the sorting for business logic. The sorting is applied on binary representation of the key as it is not necessary sorting per se, but rather grouping by the same keys. You can find more information in the FLIP of this feature e.g. here[1] Best, Dawid On 21/05/2021 09:58, Marco Villalobos
wrote:
OpenPGP_signature (855 bytes) Download Attachment |
Free forum by Nabble | Edit this page |