State TTL in Flink 1.6.0

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

State TTL in Flink 1.6.0

Juho Autio
First, I couldn't find anything about State TTL in Flink docs, is there anything like that? I can manage based on Javadocs & source code, but just wondering.

Then to main main question, why doesn't the TTL support event time, and is there any sensible use case for the TTL if the streaming charateristic of my job is event time?

I have a job that is cleaning up old entries from a keyed MapState by calling registerEventTimeTimer & implementing the onTimer method. This way I can keep the state for a certain time in _event time_.

That's more complicated code than it would have to be, so I wanted to convert by function to use Flink's own state TTL. I started writing this:

        MapStateDescriptor<String, String> stateDesc = new MapStateDescriptor<>(
                "deviceState", String.class, String.class);
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.milliseconds(stateRetentionMillis))
                // TODO EventTime is not supported?
                .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
                .build();
        stateDesc.enableTimeToLive(ttlConfig);

So, I realized that ProcessingTime is the only existing TimeCharacteristic in StateTtlConfig.

Based on some comments in Flink tickets it seems that it was a conscious choice, because supporting EventTime TTL would be much heavier:


So I can't exactly match the current behaviour that guarantees to keep the state available for 24 hours (or whatever is passed as --stateRetentionMillis).

However, if we accept the restriction and switch to processing time in state cleanup, what does it mean?

- As long as stream keeps up with the input rate (from kafka), there's no big difference, because 24 hours in processing time ~= 24 hours in even time.
- If the stream is lagging behind a lot, then it would be possible that the state is cleaned "too early". However we aim at not having a lot of lag, so this is not a real issue – job would be scaled up to catch up before it starts lagging too much to get misses because of cleared state. Still, if we fail to scale up quickly enough, the state might be cleared too early and cause real trouble.
- One problem is that if the stream is quickly processing a long backlog (say, start streaming 7 days back in event time), then the state size can temporarily grow bigger than usual – maybe this wouldn't be a big problem, but it could at least require extraneous upscaling of resources.
- After restoring from a savepoint, the processing time on the state is as much older than what was the time of downtime due to job restart. Even this is not a huge issue as long as the deployment downtime is short compared to the 24 hour TTL.

Any way, all these issues combined, I'm a bit confused on the whole TTL feature. Can it be used in event time based streaming in any sensible way? It seems like it would be more like a cache then, and can't be relied on well enough.

Thanks.

Juho
Reply | Threaded
Open this post in threaded view
|

Re: State TTL in Flink 1.6.0

Chesnay Schepler
Just a quick note for the docs:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl

On 22.08.2018 10:53, Juho Autio wrote:

> First, I couldn't find anything about State TTL in Flink docs, is
> there anything like that? I can manage based on Javadocs & source
> code, but just wondering.
>
> Then to main main question, why doesn't the TTL support event time,
> and is there any sensible use case for the TTL if the streaming
> charateristic of my job is event time?
>
> I have a job that is cleaning up old entries from a keyed MapState by
> calling registerEventTimeTimer & implementing the onTimer method. This
> way I can keep the state for a certain time in _event time_.
>
> That's more complicated code than it would have to be, so I wanted to
> convert by function to use Flink's own state TTL. I started writing this:
>
>         MapStateDescriptor<String, String> stateDesc = new
> MapStateDescriptor<>(
>                 "deviceState", String.class, String.class);
>         StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.milliseconds(stateRetentionMillis))
>                 // TODO EventTime is not supported?
> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>                 .build();
>         stateDesc.enableTimeToLive(ttlConfig);
>
> So, I realized that ProcessingTime is the only existing
> TimeCharacteristic in StateTtlConfig.
>
> Based on some comments in Flink tickets it seems that it was a
> conscious choice, because supporting EventTime TTL would be much heavier:
>
> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>
> So I can't exactly match the current behaviour that guarantees to keep
> the state available for 24 hours (or whatever is passed as
> --stateRetentionMillis).
>
> However, if we accept the restriction and switch to processing time in
> state cleanup, what does it mean?
>
> - As long as stream keeps up with the input rate (from kafka), there's
> no big difference, because 24 hours in processing time ~= 24 hours in
> even time.
> - If the stream is lagging behind a lot, then it would be possible
> that the state is cleaned "too early". However we aim at not having a
> lot of lag, so this is not a real issue – job would be scaled up to
> catch up before it starts lagging too much to get misses because of
> cleared state. Still, if we fail to scale up quickly enough, the state
> might be cleared too early and cause real trouble.
> - One problem is that if the stream is quickly processing a long
> backlog (say, start streaming 7 days back in event time), then the
> state size can temporarily grow bigger than usual – maybe this
> wouldn't be a big problem, but it could at least require extraneous
> upscaling of resources.
> - After restoring from a savepoint, the processing time on the state
> is as much older than what was the time of downtime due to job
> restart. Even this is not a huge issue as long as the deployment
> downtime is short compared to the 24 hour TTL.
>
> Any way, all these issues combined, I'm a bit confused on the whole
> TTL feature. Can it be used in event time based streaming in any
> sensible way? It seems like it would be more like a cache then, and
> can't be relied on well enough.
>
> Thanks.
>
> Juho


Reply | Threaded
Open this post in threaded view
|

Re: State TTL in Flink 1.6.0

Aljoscha Krettek
Hi Juho,

The main motivation for the initial implementation of TTL was compliance with new GDPR rules. I.e. data cannot be accessible and must be dropped according to time in the real world, i.e. processing time. The behaviour you describe, with data being dropped if you keep a savepoint for too long, is actually what is required for this use case.

I do see that also having this for event time can also be useful and it might get implemented in the future. Maybe Stefan can chime in here.

Best,
Aljoscha

> On 22. Aug 2018, at 11:01, Chesnay Schepler <[hidden email]> wrote:
>
> Just a quick note for the docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>
> On 22.08.2018 10:53, Juho Autio wrote:
>> First, I couldn't find anything about State TTL in Flink docs, is there anything like that? I can manage based on Javadocs & source code, but just wondering.
>>
>> Then to main main question, why doesn't the TTL support event time, and is there any sensible use case for the TTL if the streaming charateristic of my job is event time?
>>
>> I have a job that is cleaning up old entries from a keyed MapState by calling registerEventTimeTimer & implementing the onTimer method. This way I can keep the state for a certain time in _event time_.
>>
>> That's more complicated code than it would have to be, so I wanted to convert by function to use Flink's own state TTL. I started writing this:
>>
>>        MapStateDescriptor<String, String> stateDesc = new MapStateDescriptor<>(
>>                "deviceState", String.class, String.class);
>>        StateTtlConfig ttlConfig = StateTtlConfig
>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>                // TODO EventTime is not supported?
>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>                .build();
>>        stateDesc.enableTimeToLive(ttlConfig);
>>
>> So, I realized that ProcessingTime is the only existing TimeCharacteristic in StateTtlConfig.
>>
>> Based on some comments in Flink tickets it seems that it was a conscious choice, because supporting EventTime TTL would be much heavier:
>>
>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>>
>> So I can't exactly match the current behaviour that guarantees to keep the state available for 24 hours (or whatever is passed as --stateRetentionMillis).
>>
>> However, if we accept the restriction and switch to processing time in state cleanup, what does it mean?
>>
>> - As long as stream keeps up with the input rate (from kafka), there's no big difference, because 24 hours in processing time ~= 24 hours in even time.
>> - If the stream is lagging behind a lot, then it would be possible that the state is cleaned "too early". However we aim at not having a lot of lag, so this is not a real issue – job would be scaled up to catch up before it starts lagging too much to get misses because of cleared state. Still, if we fail to scale up quickly enough, the state might be cleared too early and cause real trouble.
>> - One problem is that if the stream is quickly processing a long backlog (say, start streaming 7 days back in event time), then the state size can temporarily grow bigger than usual – maybe this wouldn't be a big problem, but it could at least require extraneous upscaling of resources.
>> - After restoring from a savepoint, the processing time on the state is as much older than what was the time of downtime due to job restart. Even this is not a huge issue as long as the deployment downtime is short compared to the 24 hour TTL.
>>
>> Any way, all these issues combined, I'm a bit confused on the whole TTL feature. Can it be used in event time based streaming in any sensible way? It seems like it would be more like a cache then, and can't be relied on well enough.
>>
>> Thanks.
>>
>> Juho
>
>

Reply | Threaded
Open this post in threaded view
|

Re: State TTL in Flink 1.6.0

Andrey Zagrebin
Hi Juho,

As Aljoscha mentioned the current TTL implementation was mostly targeted to data privacy applications
where only processing time matters.

I think the event time can be also useful for TTL and should address your concerns.
The event time extension is on the road map for the future Flink releases.

Cheers,
Andrey

> On 22 Aug 2018, at 11:57, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi Juho,
>
> The main motivation for the initial implementation of TTL was compliance with new GDPR rules. I.e. data cannot be accessible and must be dropped according to time in the real world, i.e. processing time. The behaviour you describe, with data being dropped if you keep a savepoint for too long, is actually what is required for this use case.
>
> I do see that also having this for event time can also be useful and it might get implemented in the future. Maybe Stefan can chime in here.
>
> Best,
> Aljoscha
>
>> On 22. Aug 2018, at 11:01, Chesnay Schepler <[hidden email]> wrote:
>>
>> Just a quick note for the docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>>
>> On 22.08.2018 10:53, Juho Autio wrote:
>>> First, I couldn't find anything about State TTL in Flink docs, is there anything like that? I can manage based on Javadocs & source code, but just wondering.
>>>
>>> Then to main main question, why doesn't the TTL support event time, and is there any sensible use case for the TTL if the streaming charateristic of my job is event time?
>>>
>>> I have a job that is cleaning up old entries from a keyed MapState by calling registerEventTimeTimer & implementing the onTimer method. This way I can keep the state for a certain time in _event time_.
>>>
>>> That's more complicated code than it would have to be, so I wanted to convert by function to use Flink's own state TTL. I started writing this:
>>>
>>>       MapStateDescriptor<String, String> stateDesc = new MapStateDescriptor<>(
>>>               "deviceState", String.class, String.class);
>>>       StateTtlConfig ttlConfig = StateTtlConfig
>>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>>               // TODO EventTime is not supported?
>>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>>               .build();
>>>       stateDesc.enableTimeToLive(ttlConfig);
>>>
>>> So, I realized that ProcessingTime is the only existing TimeCharacteristic in StateTtlConfig.
>>>
>>> Based on some comments in Flink tickets it seems that it was a conscious choice, because supporting EventTime TTL would be much heavier:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>>>
>>> So I can't exactly match the current behaviour that guarantees to keep the state available for 24 hours (or whatever is passed as --stateRetentionMillis).
>>>
>>> However, if we accept the restriction and switch to processing time in state cleanup, what does it mean?
>>>
>>> - As long as stream keeps up with the input rate (from kafka), there's no big difference, because 24 hours in processing time ~= 24 hours in even time.
>>> - If the stream is lagging behind a lot, then it would be possible that the state is cleaned "too early". However we aim at not having a lot of lag, so this is not a real issue – job would be scaled up to catch up before it starts lagging too much to get misses because of cleared state. Still, if we fail to scale up quickly enough, the state might be cleared too early and cause real trouble.
>>> - One problem is that if the stream is quickly processing a long backlog (say, start streaming 7 days back in event time), then the state size can temporarily grow bigger than usual – maybe this wouldn't be a big problem, but it could at least require extraneous upscaling of resources.
>>> - After restoring from a savepoint, the processing time on the state is as much older than what was the time of downtime due to job restart. Even this is not a huge issue as long as the deployment downtime is short compared to the 24 hour TTL.
>>>
>>> Any way, all these issues combined, I'm a bit confused on the whole TTL feature. Can it be used in event time based streaming in any sensible way? It seems like it would be more like a cache then, and can't be relied on well enough.
>>>
>>> Thanks.
>>>
>>> Juho
>>
>>
>