Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

Sameer Wadkar
Hi,

I am noticing this behavior with Event Time processing-

I have a Kafka topic with 10 partitions. Each Event Source sends data to any one of the partitions. Say I have only 1 event source active at this moment, which means only one partition is receiving data.

None of my windows will fire now because the 9 partitions (source function instances) are not sending any watermarks and Flink waits forever.

I go to topic with 1 partition but leave default parallelism intact. Only one Mapper instance contributes to the subsequent keyBy operation but other 7 (assuming 8 of default parallelism) are idle. I assign watermarks after the map function. Again the same behavior because the 7 other mappers are not sending watermarks.

How do I handle this? Not all of my partitions are going to be receiving data at all times using this partitioning strategy. Or I have to use random partitioning which will also work. 

Thanks,
Sameer
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

Sameer Wadkar
And this is happening in my local environment. As soon as I set the parallelism to 1 it all works fine.

Sameer

On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <[hidden email]> wrote:
Hi,

I am noticing this behavior with Event Time processing-

I have a Kafka topic with 10 partitions. Each Event Source sends data to any one of the partitions. Say I have only 1 event source active at this moment, which means only one partition is receiving data.

None of my windows will fire now because the 9 partitions (source function instances) are not sending any watermarks and Flink waits forever.

I go to topic with 1 partition but leave default parallelism intact. Only one Mapper instance contributes to the subsequent keyBy operation but other 7 (assuming 8 of default parallelism) are idle. I assign watermarks after the map function. Again the same behavior because the 7 other mappers are not sending watermarks.

How do I handle this? Not all of my partitions are going to be receiving data at all times using this partitioning strategy. Or I have to use random partitioning which will also work. 

Thanks,
Sameer

Reply | Threaded
Open this post in threaded view
|

Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

Sameer Wadkar
Sorry for replying to my own messages but this is super confusing and logical at the same time to me :-).

If I have Kafka Topic with 10 partitions. If I partition by device id when I write to the Topic, and use Event Time, my pipeline freezes (if fewer than 10 devices are active initially). Because if some partitions are inactive (only a few devices active at a time) they do not send watermarks and my pipeline waits forever for those partitions to send in their watermarks even if the keyBy is on the device id whose records are going to come from only one partition.

When I send records to Kafka randomly (to any partition) the pipeline works fine as all partitions (sources connected to them) are sending watermarks. 

This gets even more confusing if I apply watermarks and timestamps downstream after a KeyBy operation which is again followed by another keyBy which does not receive events for a key from all the upstream operators. Again nothing fires as Flink expects other map operators (to which the watermark assignment is piped) to send in the watermarks as well. 

My conclusion: Only produce watermarks at the source function. Is this valid or am I missing something? Because only when I do that (and random allocation of events to partitions in Kafka) the whole pipeline works reliably.

If there a way to set a timeout - If watermarks from source functions are not received within a certain time interval, fire the time windows.

Thanks,
Sameer




On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <[hidden email]> wrote:
And this is happening in my local environment. As soon as I set the parallelism to 1 it all works fine.

Sameer

On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <[hidden email]> wrote:
Hi,

I am noticing this behavior with Event Time processing-

I have a Kafka topic with 10 partitions. Each Event Source sends data to any one of the partitions. Say I have only 1 event source active at this moment, which means only one partition is receiving data.

None of my windows will fire now because the 9 partitions (source function instances) are not sending any watermarks and Flink waits forever.

I go to topic with 1 partition but leave default parallelism intact. Only one Mapper instance contributes to the subsequent keyBy operation but other 7 (assuming 8 of default parallelism) are idle. I assign watermarks after the map function. Again the same behavior because the 7 other mappers are not sending watermarks.

How do I handle this? Not all of my partitions are going to be receiving data at all times using this partitioning strategy. Or I have to use random partitioning which will also work. 

Thanks,
Sameer


Reply | Threaded
Open this post in threaded view
|

Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

Maximilian Michels
Hi Sameer,

If you use Event Time you should make sure to assign Watermarks and
Timestamps at the source. As you already observed, Flink may get stuck
otherwise because it waits for Watermarks to progress in time.

There is no timeout for windows. However, you can implement that logic
in your Watermark generation function.

You're already using
DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
assigner)

Your assigner has a `getCurrentWatermark()` method. This is called
every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
set this via ExecutionConfig#setAutoWatermarkInterval(long
milliseconds).

In your assigner, simply create a field to keep track of the last time
you emitted a Watermark. If you haven't emitted a Watermark for some
time, you can kick off a timeout and emit a Watermark.

Cheers,
Max

On Thu, Aug 11, 2016 at 1:05 AM, Sameer W <[hidden email]> wrote:

> Sorry for replying to my own messages but this is super confusing and
> logical at the same time to me :-).
>
> If I have Kafka Topic with 10 partitions. If I partition by device id when I
> write to the Topic, and use Event Time, my pipeline freezes (if fewer than
> 10 devices are active initially). Because if some partitions are inactive
> (only a few devices active at a time) they do not send watermarks and my
> pipeline waits forever for those partitions to send in their watermarks even
> if the keyBy is on the device id whose records are going to come from only
> one partition.
>
> When I send records to Kafka randomly (to any partition) the pipeline works
> fine as all partitions (sources connected to them) are sending watermarks.
>
> This gets even more confusing if I apply watermarks and timestamps
> downstream after a KeyBy operation which is again followed by another keyBy
> which does not receive events for a key from all the upstream operators.
> Again nothing fires as Flink expects other map operators (to which the
> watermark assignment is piped) to send in the watermarks as well.
>
> My conclusion: Only produce watermarks at the source function. Is this valid
> or am I missing something? Because only when I do that (and random
> allocation of events to partitions in Kafka) the whole pipeline works
> reliably.
>
> If there a way to set a timeout - If watermarks from source functions are
> not received within a certain time interval, fire the time windows.
>
> Thanks,
> Sameer
>
>
>
>
> On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <[hidden email]> wrote:
>>
>> And this is happening in my local environment. As soon as I set the
>> parallelism to 1 it all works fine.
>>
>> Sameer
>>
>> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I am noticing this behavior with Event Time processing-
>>>
>>> I have a Kafka topic with 10 partitions. Each Event Source sends data to
>>> any one of the partitions. Say I have only 1 event source active at this
>>> moment, which means only one partition is receiving data.
>>>
>>> None of my windows will fire now because the 9 partitions (source
>>> function instances) are not sending any watermarks and Flink waits forever.
>>>
>>> I go to topic with 1 partition but leave default parallelism intact. Only
>>> one Mapper instance contributes to the subsequent keyBy operation but other
>>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
>>> the map function. Again the same behavior because the 7 other mappers are
>>> not sending watermarks.
>>>
>>> How do I handle this? Not all of my partitions are going to be receiving
>>> data at all times using this partitioning strategy. Or I have to use random
>>> partitioning which will also work.
>>>
>>> Thanks,
>>> Sameer
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Partitions (Source Functions) -> Event Time -> Watermarks -> Trigger

Sameer Wadkar
Thanks Max - 

I will advance watermarks when no event arrives for a while.  But when using Kafka is it a good practice to assign events to partitions randomly instead say device id or region id where the devices are located. What I noticed is if devices sending to one of the partitions stop sending information, the pipeline completely freezes unless I manually keep moving the watermark. 

But the problem with sending events to random partitions is that when the devices come back online, they send events which are now registered as late events and the windows fire one element at a time. 

Thanks,
Sameer

On Fri, Aug 12, 2016 at 4:41 AM, Maximilian Michels <[hidden email]> wrote:
Hi Sameer,

If you use Event Time you should make sure to assign Watermarks and
Timestamps at the source. As you already observed, Flink may get stuck
otherwise because it waits for Watermarks to progress in time.

There is no timeout for windows. However, you can implement that logic
in your Watermark generation function.

You're already using
DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
assigner)

Your assigner has a `getCurrentWatermark()` method. This is called
every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
set this via ExecutionConfig#setAutoWatermarkInterval(long
milliseconds).

In your assigner, simply create a field to keep track of the last time
you emitted a Watermark. If you haven't emitted a Watermark for some
time, you can kick off a timeout and emit a Watermark.

Cheers,
Max

On Thu, Aug 11, 2016 at 1:05 AM, Sameer W <[hidden email]> wrote:
> Sorry for replying to my own messages but this is super confusing and
> logical at the same time to me :-).
>
> If I have Kafka Topic with 10 partitions. If I partition by device id when I
> write to the Topic, and use Event Time, my pipeline freezes (if fewer than
> 10 devices are active initially). Because if some partitions are inactive
> (only a few devices active at a time) they do not send watermarks and my
> pipeline waits forever for those partitions to send in their watermarks even
> if the keyBy is on the device id whose records are going to come from only
> one partition.
>
> When I send records to Kafka randomly (to any partition) the pipeline works
> fine as all partitions (sources connected to them) are sending watermarks.
>
> This gets even more confusing if I apply watermarks and timestamps
> downstream after a KeyBy operation which is again followed by another keyBy
> which does not receive events for a key from all the upstream operators.
> Again nothing fires as Flink expects other map operators (to which the
> watermark assignment is piped) to send in the watermarks as well.
>
> My conclusion: Only produce watermarks at the source function. Is this valid
> or am I missing something? Because only when I do that (and random
> allocation of events to partitions in Kafka) the whole pipeline works
> reliably.
>
> If there a way to set a timeout - If watermarks from source functions are
> not received within a certain time interval, fire the time windows.
>
> Thanks,
> Sameer
>
>
>
>
> On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <[hidden email]> wrote:
>>
>> And this is happening in my local environment. As soon as I set the
>> parallelism to 1 it all works fine.
>>
>> Sameer
>>
>> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I am noticing this behavior with Event Time processing-
>>>
>>> I have a Kafka topic with 10 partitions. Each Event Source sends data to
>>> any one of the partitions. Say I have only 1 event source active at this
>>> moment, which means only one partition is receiving data.
>>>
>>> None of my windows will fire now because the 9 partitions (source
>>> function instances) are not sending any watermarks and Flink waits forever.
>>>
>>> I go to topic with 1 partition but leave default parallelism intact. Only
>>> one Mapper instance contributes to the subsequent keyBy operation but other
>>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks after
>>> the map function. Again the same behavior because the 7 other mappers are
>>> not sending watermarks.
>>>
>>> How do I handle this? Not all of my partitions are going to be receiving
>>> data at all times using this partitioning strategy. Or I have to use random
>>> partitioning which will also work.
>>>
>>> Thanks,
>>> Sameer
>>
>>
>