Tumbling Windows with Processing Time

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling Windows with Processing Time

yutao sun
Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be 4/second, but I got 7 windows/second,  I wonder if is there any error the windows is defined?  

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parallelism = 4

env.setParallelism(parallelism)
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.getConfig.setAutoWatermarkInterval(-1L)
env.getConfig.disableTimestamps()

env.addSource(
new FlinkKafkaConsumer082[String](
  "test_topic",
 new SimpleStringSchema,
 properties for connection KAFKA
  )
)
.rebalance
.map(do some thing)
.map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS))
.reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(
new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,
TARGET_TOPIC,
new SerializationSchema[(Payload, Long), Array[Byte]] {
override def serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes
}
}
)
)

env.execute("test")



Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

Stephan Ewen
The definition looks correct.
Because the windows are by-key, you should get one window result per key per second.

Can you turn off object-reuse? That is a pretty experimental thing and works with the batch operations quite well, but not so much with the streaming windows, yet.
I would only enable object reuse after the program works well and correctly without.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <[hidden email]> wrote:
Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be 4/second, but I got 7 windows/second,  I wonder if is there any error the windows is defined?  

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parallelism = 4

env.setParallelism(parallelism)
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.getConfig.setAutoWatermarkInterval(-1L)
env.getConfig.disableTimestamps()

env.addSource(
new FlinkKafkaConsumer082[String](
  "test_topic",
 new SimpleStringSchema,
 properties for connection KAFKA
  )
)
.rebalance
.map(do some thing)
.map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS))
.reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(
new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,
TARGET_TOPIC,
new SerializationSchema[(Payload, Long), Array[Byte]] {
override def serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes
}
}
)
)

env.execute("test")




Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

yutao sun
Thanks for your help,  I retest by disable the object reuse and got the same result (please see the picture attached).





2016-02-03 10:51 GMT+01:00 Stephan Ewen <[hidden email]>:
The definition looks correct.
Because the windows are by-key, you should get one window result per key per second.

Can you turn off object-reuse? That is a pretty experimental thing and works with the batch operations quite well, but not so much with the streaming windows, yet.
I would only enable object reuse after the program works well and correctly without.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <[hidden email]> wrote:
Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be 4/second, but I got 7 windows/second,  I wonder if is there any error the windows is defined?  

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parallelism = 4

env.setParallelism(parallelism)
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.getConfig.setAutoWatermarkInterval(-1L)
env.getConfig.disableTimestamps()

env.addSource(
new FlinkKafkaConsumer082[String](
  "test_topic",
 new SimpleStringSchema,
 properties for connection KAFKA
  )
)
.rebalance
.map(do some thing)
.map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS))
.reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(
new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,
TARGET_TOPIC,
new SerializationSchema[(Payload, Long), Array[Byte]] {
override def serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes
}
}
)
)

env.execute("test")





Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

Stephan Ewen
Do you have 7 distinct keys? You get as many result tuples as you have keys, because the window is per key.

On Wed, Feb 3, 2016 at 12:12 PM, yutao sun <[hidden email]> wrote:
Thanks for your help,  I retest by disable the object reuse and got the same result (please see the picture attached).





2016-02-03 10:51 GMT+01:00 Stephan Ewen <[hidden email]>:
The definition looks correct.
Because the windows are by-key, you should get one window result per key per second.

Can you turn off object-reuse? That is a pretty experimental thing and works with the batch operations quite well, but not so much with the streaming windows, yet.
I would only enable object reuse after the program works well and correctly without.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <[hidden email]> wrote:
Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver 0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at the last operator, by using a Tumbling processing Time windows with a size of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be 4/second, but I got 7 windows/second,  I wonder if is there any error the windows is defined?  

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parallelism = 4

env.setParallelism(parallelism)
env.getConfig.enableObjectReuse()
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.getConfig.setAutoWatermarkInterval(-1L)
env.getConfig.disableTimestamps()

env.addSource(
new FlinkKafkaConsumer082[String](
  "test_topic",
 new SimpleStringSchema,
 properties for connection KAFKA
  )
)
.rebalance
.map(do some thing)
.map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS))
.reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(
new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,
TARGET_TOPIC,
new SerializationSchema[(Payload, Long), Array[Byte]] {
override def serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes
}
}
)
)

env.execute("test")






Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

Aljoscha Krettek
In reply to this post by yutao sun
There should be 4 windows because there are only 4 distinct keys, if I understand this line correctly:

 .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)

> On 02 Feb 2016, at 19:31, yutao sun <[hidden email]> wrote:
>
>  .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

Aljoscha Krettek
How long did you run the job? Could it be an artifact of the timing and it hasn’t yet averaged out.

> On 03 Feb 2016, at 14:32, Aljoscha Krettek <[hidden email]> wrote:
>
> There should be 4 windows because there are only 4 distinct keys, if I understand this line correctly:
>
> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
>
>> On 02 Feb 2016, at 19:31, yutao sun <[hidden email]> wrote:
>>
>> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
>

Reply | Threaded
Open this post in threaded view
|

Re: Tumbling Windows with Processing Time

yutao sun
Exactly, I have more than 4 keys because the "nenative modulo",  after thange this line from
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
to

.keyBy(mappedPayload => Math.abs(mappedPayload._1.id.hashcode % parallelism))

or just profit Flink's dataStream.partitionByHash(Field)



Thanks for your help!  Cheers :)


2016-02-03 14:35 GMT+01:00 Aljoscha Krettek <[hidden email]>:
How long did you run the job? Could it be an artifact of the timing and it hasn’t yet averaged out.
> On 03 Feb 2016, at 14:32, Aljoscha Krettek <[hidden email]> wrote:
>
> There should be 4 windows because there are only 4 distinct keys, if I understand this line correctly:
>
> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
>
>> On 02 Feb 2016, at 19:31, yutao sun <[hidden email]> wrote:
>>
>> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
>