Windowing isn't applied per key

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Windowing isn't applied per key

mclendenin


I have a job that is performing an aggregation over a time window. This windowing is supposed to be happening by key, but the output I am seeing is creating an overall window on everything coming in. Is this happening because I am doing a map of the data before I am running the keyBy command? This is a representation of what I am running

 

val stream = env
  .addSource(kafkaConsumer)

//filter out bad json

val jsonDeserializer = new JSONDeserializationSchema()
  val filteredStream = stream.filter(text => {
    try {
      jsonDeserializer.deserialize(text.getBytes)
      true
   
}
    catch {
      case e: Exception => false
   
}
  })
  val kafkaStream = filteredStream.map(text => jsonDeserializer.deserialize(text.getBytes))

//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())

 

 

I am pulling in data from Kafka as a String, mapping it to my data model and then pulling out the key, applying the time window with a 30 minute window, 5 minute slide and doing an aggregation. I am expecting that the aggregation is happening on a time window that is separate for each iteration of the key but it is happening every 5 minutes for all keys.

 


image001.png (14K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

Timo Walther
Hi Marcus,

from a first glance your pipeline looks correct. It should not be executed with a parallelism of one, if not specified explicitly. Which time semantics are you using? If it is event-time, I would check your timestamps and watermarks assignment. Maybe you can also check in the web frontend which operator is executed with which parallelism. Btw. according to the JavaDocs of reduce(): "Sliding time windows will aggregate on the granularity of the slide interval" so it is called multiple times.

Regards,
Timo


Am 9/29/17 um 8:56 PM schrieb Marcus Clendenin:


I have a job that is performing an aggregation over a time window. This windowing is supposed to be happening by key, but the output I am seeing is creating an overall window on everything coming in. Is this happening because I am doing a map of the data before I am running the keyBy command? This is a representation of what I am running

 

val stream = env
  .addSource(kafkaConsumer)

//filter out bad json

val jsonDeserializer = new JSONDeserializationSchema()
  val filteredStream = stream.filter(text => {
    try {
      jsonDeserializer.deserialize(text.getBytes)
      true
   
}
    catch {
      case e: Exception => false
   
}
  })
  val kafkaStream = filteredStream.map(text => jsonDeserializer.deserialize(text.getBytes))

//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())

 

 

I am pulling in data from Kafka as a String, mapping it to my data model and then pulling out the key, applying the time window with a 30 minute window, 5 minute slide and doing an aggregation. I am expecting that the aggregation is happening on a time window that is separate for each iteration of the key but it is happening every 5 minutes for all keys.

 


Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

mclendenin
I am using Processing Time, so it is using the default timestamps and
watermarks. I am running it with a parallelism of 3, I can see each operator
running at a parallelism of 3 on the Web UI. I am pulling data from a Kafka
topic with 12 partitions.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

Aljoscha Krettek
Hi,

Could you maybe give an example of what you expect as output and what you actually get?

Best,
Aljoscha

> On 9. Oct 2017, at 16:09, mclendenin <[hidden email]> wrote:
>
> I am using Processing Time, so it is using the default timestamps and
> watermarks. I am running it with a parallelism of 3, I can see each operator
> running at a parallelism of 3 on the Web UI. I am pulling data from a Kafka
> topic with 12 partitions.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

mclendenin
Sure, I'm going to use a name as key in this example and just a number as the
value aggregated. This is the sample input data

12:00
{"name": "Marcus", "value": 1}
12:01
{"name": "Suzy", "value": 2}
12:03
{"name": "Alicia", "value": 3}
12:04
{"name": "Ben", "value": 1}
12:06
{"name": "Alicia", "value": 1}
{"name": "Ben", "value": 5}

Expected Result in output
12:05
{"name": "Marcus", "total": 1}
12:06
{"name": "Suzy", "value": 2}
12:08
{"name": "Alicia", "value": 4}
12:09
{"name": "Ben", "value": 6}

Actual Result in output
12:05
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 3}
{"name": "Ben", "value": 1}
12:10
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 4}
{"name": "Ben", "value": 6}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

Tony Wei
Hi Marcus,

I think that is an expected result for sliding window in Flink. You can see the example in the document for more details. [1]
For your need, I will suggest to use ProcessFunction to implement the sliding window that you expected. You can use key state to buffer elements and onTimer to trigger each window, and all these will be done on each key. [2]

Best Regards,
Tony Wei


2017-10-11 1:52 GMT+08:00 mclendenin <[hidden email]>:
Sure, I'm going to use a name as key in this example and just a number as the
value aggregated. This is the sample input data

12:00
{"name": "Marcus", "value": 1}
12:01
{"name": "Suzy", "value": 2}
12:03
{"name": "Alicia", "value": 3}
12:04
{"name": "Ben", "value": 1}
12:06
{"name": "Alicia", "value": 1}
{"name": "Ben", "value": 5}

Expected Result in output
12:05
{"name": "Marcus", "total": 1}
12:06
{"name": "Suzy", "value": 2}
12:08
{"name": "Alicia", "value": 4}
12:09
{"name": "Ben", "value": 6}

Actual Result in output
12:05
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 3}
{"name": "Ben", "value": 1}
12:10
{"name": "Marcus", "value": 1}
{"name": "Suzy", "value": 2}
{"name": "Alicia", "value": 4}
{"name": "Ben", "value": 6}

Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

mclendenin
Hi Tony,

In the documentation on keyed windows vs non-keyed it says that it will
split the stream into parallel keyed streams with windows being executed in
parallel across the keys. I would think that this would mean that each key
has it's own window managed independently.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#keyed-vs-non-keyed-windows 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Windowing isn't applied per key

Tony Wei
Hi Marcus,

Yes, each key would  has it's own window managed, so the aggregation on window is sum of the value by each key, not sum of all element.
You can imagine that each key has its own sliding window assignor that decides each element in each keyed stream belong to which windows, but all keyed stream use the same strategy.
That is the definition of sliding window in Streaming API. 
And the definition of yours is not supported by Flink intuitively. One way is to implement it by yourself with ProcessFunction. 

Best Regards,
Tony Wei

mclendenin <[hidden email]>於 2017年10月11日 週三,下午8:52寫道:
Hi Tony,

In the documentation on keyed windows vs non-keyed it says that it will
split the stream into parallel keyed streams with windows being executed in
parallel across the keys. I would think that this would mean that each key
has it's own window managed independently.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#keyed-vs-non-keyed-windows




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/