{"keys":[{"name":"campaignId","type":"integer"},
{"name":"adId","type":"integer"},
{"name":"creativeId","type":"integer"},
{"name":"publisherId","type":"integer"},
{"name":"adOrderId","type":"integer"}],
"timeBuckets":["1h","1d"],
"values":
[{"name":"impressions","type":"integer","aggregators":["SUM"]}
,
{"name":"clicks","type":"integer","aggregators":["SUM"]},
{"name":"revenue","type":"integer"}],
"dimensions":
[{"combination":["campaignId","adId"]},
{"combination":["creativeId","campaignId"]},
{"combination":["campaignId"]},
{"combination":["publisherId","adOrderId","campaignId"],
"additionalValues":["revenue:SUM"]}]
}
I have been able to do this by the following and repeating this for every key + window combination. So in the above case there would be 8 blocks like below. (4 combinations and 2 window period for each combination)
modelDataStream.keyBy("campaiginId","addId")
.timeWindow(Time.minutes(1))
.trigger(CountTrigger.of(2))
.reduce(..)