Slow flink checkpoint

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Slow flink checkpoint

林德强
Hi,
I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.




Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

Fabian Hueske-2
Hi,

AFAIK, that's not possible.
The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application.
For example, if you use timers to clean up state, you can work with an upper and lower bound and only register one timer for each (upper - lower) interval.

Best, Fabian

2018-03-16 3:11 GMT+01:00 林德强 <[hidden email]>:
Hi,
I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.




Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

Stefan Richter
Hi,

yes, that is correct, the timer service is currently only available in main-memory and only with synchronous snapshots. this topic is on our TODO list for after the Flink 1.5 release.

Best,
Stefan

Am 16.03.2018 um 09:03 schrieb Fabian Hueske <[hidden email]>:

Hi,

AFAIK, that's not possible.
The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application.
For example, if you use timers to clean up state, you can work with an upper and lower bound and only register one timer for each (upper - lower) interval.

Best, Fabian

2018-03-16 3:11 GMT+01:00 林德强 <[hidden email]>:
Hi,
I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.




Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

林德强
In reply to this post by Fabian Hueske-2
Hi Fabian ,
   Reduce the number of timers is a good idea. 
   But in my application the timer is different from the key  registered follow the keyBy . May be it can't work with an upper and lower bound.

   I try modify the flink resource and start a thread to clean the expired keyed sate, but it doesn't work well because of concurrency issues.






Best,
Deqiang

2018-03-16 16:03 GMT+08:00 Fabian Hueske <[hidden email]>:
Hi,

AFAIK, that's not possible.
The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application.
For example, if you use timers to clean up state, you can work with an upper and lower bound and only register one timer for each (upper - lower) interval.

Best, Fabian

2018-03-16 3:11 GMT+01:00 林德强 <[hidden email]>:
Hi,
I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.




Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

Fabian Hueske-2
Hi,

Yes, you cannot start a separate thread to cleanup the state.
State is managed by Flink and can only be accessed at certain points in time when the user code is called.

If you are using event time, another trick you could play is to only register all timers on (currentWatermark + 1).
That will cause the trigger to fire whenever the watermark advances. You could store all relevant timestamps in a ListState and act on all timers that are less than the currentWatermark.
Also, since there is only a single timer per timestamp (currentWM + 1) there will be only one watermark per key.

Best, Fabian

2018-03-16 13:56 GMT+01:00 林德强 <[hidden email]>:
Hi Fabian ,
   Reduce the number of timers is a good idea. 
   But in my application the timer is different from the key  registered follow the keyBy . May be it can't work with an upper and lower bound.

   I try modify the flink resource and start a thread to clean the expired keyed sate, but it doesn't work well because of concurrency issues.






Best,
Deqiang

2018-03-16 16:03 GMT+08:00 Fabian Hueske <[hidden email]>:
Hi,

AFAIK, that's not possible.
The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application.
For example, if you use timers to clean up state, you can work with an upper and lower bound and only register one timer for each (upper - lower) interval.

Best, Fabian

2018-03-16 3:11 GMT+01:00 林德强 <[hidden email]>:
Hi,
I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.




Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

makeyang
This post was updated on .
I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
    a) for each keygroup, invoke InternalTimeServiceManager.snapshotStateForKeyGroup
    b) InternalTimeServiceManager create a InternalTimerServiceSerializationProxy to write snapshot
    c) InternalTimerServiceSerializationProxy iterat <String,//which is service name,
    HeapInternalTimerService> tuple and write service name and snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to writeTimersSnapshot
    d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first write keyserializer and namespaceserializer, then get eventTimers and processingTimers of InternalTimersSnapshot, which is Set of InternalTimer and serializer them.

2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples and then shallow copy the eventTimers and processingTimers, then use another thread to snapshot them without blocking the event processing thread. but it turns out that shallow copy of the eventTimers and processingTimers are time consumed and this solution failed

3. then I try to borrow the idea of data structure CopyOnWriteStateTable and try to manage timers with it. but after digging more, I found out that there is a more easy way to achieve asynchronous snapshot timers due to one fact: InternalTimer is immutable. we can achieve asynchronous with a more easy way based on this fact:
    a)maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
    b)for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice.
    c)each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot
    d)shallow copy <String,HeapInternalTimerService> tuples
    e)then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot.
    f)when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock.
    g)last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it.
    h) some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key.

the code is done and test is still runnng. I post this comments not only try to hear u guys voice, but also try to figure out some more questios related to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by another thread of ProcessingTimeService, and in this thread, it will remove timer in HeapInternalTimerService. while in current timer snapshot path, I haven't found there is any shallow copy of processingTimeTimers and eventTimeTimers. how could this won't cause concurrent modification exception?
2. since onProcessingTime is trigged in another thread, when timers are snapshot in working thread, what if then a timer is fired and triggerTarget is processed, which could cause state changed, then asynchronous keyedstatsnapshot is trigged. won't this cause state inconsistent? let's image this case: all kedyed state is only chaned by timer. so Add timer1, timer2, timer3, timer4 and timer5 and since no timer is processed, keyed state is nothing. then timer1 and timer2 is processed, keyed state is k2. and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3, timer4 and timer5 in synchronous way. then try to snapshot keyed state asynchronous while timer3 is processed and keyed state is k3. the eventually snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should be timer3, timer4, timer5 and k2. please help me out this.

thanks very much
by the way, if u guys won't mind, can anyone of u open a jira issue to track this and when time is ok, I'll make contribution on this issue.
Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

makeyang
the test is very promising.
the time sync part takes from couple of seconds to couple of mill-seconds.
1000x time reduce(overall time not save since it is just move from sync to
async)
are u guys interested in this change?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

林德强
In reply to this post by makeyang
Hi Stefan , Fabian ,
Keyang  is engineer in our team, he has do a lot of efforts on the timers' snapshot async. What do you think of his idea?


Best,
Deqiang


在 2018年4月1日,下午7:21,makeyang <[hidden email]> 写道:

I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
   a) for each keygroup, invoke
InternalTimeServiceManager.snapshotStateForKeyGroup
   b) InternalTimeServiceManager create a
InternalTimerServiceSerializationProxy to write snapshot
   c) InternalTimerServiceSerializationProxy iterat <String,//which is
service name,
   HeapInternalTimerService> tuple and write service name and
snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
writeTimersSnapshot
   d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
write keyserializer and namespaceserializer, then get eventTimers and
processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
and serializer them.

2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples
and then shallow copy the eventTimers and processingTimers, then use another
thread to snapshot them without blocking the event processing thread. but it
turns out that shallow copy of the eventTimers and processingTimers are time
consumed and this solution failed

3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
try to manage timers with it. but after digging more, I found out that there
is a more easy way to achieve asynchronous snapshot timers due to one fact:
InternalTimer is immutable. we can achieve asynchronous with a more easy way
based on this fact:
   a)maintain a stateTableVersion, which is exactly the same thing as
CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
readwrite lock, which is used to protect snapshotVersions and
stateTableVersion
   b)for each InternalTimer, add 2 more properties: create version and
delete version beside 3 existing properties: timestamp, key and namespace.
each time a Timer is registered in timerservice, it is created with
stateTableVersion as its create version while delete version is -1. each
time when timer is deleted in timerservice, it is marked delete for giving
it a delete verison equals to stateTableVersion without physically delete it
from timerservice.
   c)each time when try to snapshot timers, InternalTimeServiceManager
increase its stateTableVersion and add this stateTableVersion in
snapshotVersions. these 2 operators are protected by write lock of
InternalTimeServiceManager. that current stateTableVersion take as snapshot
version of this snapshot
   d)shallow copy <String,HeapInternalTimerService> tuples
   e)then use a another thread asynchronous snapshot whole things:
keyserialized, namespaceserializer and timers. for timers which is not
deleted(delete version is -1) and create version less than snapshot version,
serialized it. for timers whose delete version is not -1 and is bigger than
or equals snapshot version, serialized it. otherwise, it will not be
serialized by this snapshot.
   f)when everything is serialized, remove snapshot version in
snapshotVersions, which is still in another thread and this action is
guarded by write lock.
   g)last thing: timer physical deletion. 2 places to physically delete
timers: each time when timer is deleted in timerservice, it is marked delete
for giving it a delete verison equals to stateTableVersion without
physically delete it from timerservice. after this, check if timer's delete
version is less than min value of snapshotVersions with read lock
guarded(which means there is no active timer snapshot running) and if that
is true, physically delete it. the other place to delete is in snapshot
timer's iterat: when timer's delete version is less than min value of
snapshotVersions, which means the timer is deleted and no running snapshot
should keep it.
   h) some more additions: processingTimeTimers and eventTimeTimers for
each group used to be hashset and now it is changed to concurrenthashmap
with key+namesapce+timestamp as its hash key.

the code is done and test is still runnng. I post this comments not only try
to hear u guys voice, but also try to figure out some more questios related
to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
another thread of ProcessingTimeService, and in this thread, it will remove
timer in HeapInternalTimerService. while in current timer snapshot path, I
haven't found there is any shallow copy of processingTimeTimers and
eventTimeTimers. how could this won't cause concurrent modification
exception?
2. since onProcessingTime is trigged in another thread, when timers are
snapshot in working thread, what if then a timer is fired and triggerTarget
is processed, which could cause state changed, then asynchronous
keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
image this case: all kedyed state is only chaned by timer. so Add timer1,
timer2, timer3, timer4 and timer5 and since no timer is processed, keyed
state is nothing. then timer1 and timer2 is processed, keyed state is k2.
and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3,
timer4 and timer5 in synchronous way. then try to snapshot keyed state
asynchronous while timer3 is processed and keyed state is k3. the eventually
snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should
be timer3, timer4, timer5 and k2. please help me out this.

thanks very much
by the way, if u guys won't mind, can anyone of u open a jira issue to track
this and when time is ok, I'll make contribution on this issue.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

makeyang
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Slow flink checkpoint

Fabian Hueske-2
Hi everybody,

Thanks so much for looking into this issue and posting the detailed description of your approach.
As said before, improving the checkpointing performance for timers is a very important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but adding an add and delete version field and perform async checkpoints based on these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing. He might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the description of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang <[hidden email]>:
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.

Reply | Threaded
Open this post in threaded view
|

答复: Slow flink checkpoint

makeyang

Fabian:
    thanks for u replay.
    I have create a jira issue:
https://issues.apache.org/jira/browse/FLINK-9182?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Improvement%20AND%20created%20%3E%3D%20-10m

   I'll pull the code ASAP.




MaKeyang
TIG.JD.COM



发件人: Fabian Hueske <[hidden email]>
发送时间: 2018年4月16日 16:21
收件人: makeyang
抄送: user; Aljoscha Krettek
主题: Re: Slow flink checkpoint
 
Hi everybody,

Thanks so much for looking into this issue and posting the detailed description of your approach.
As said before, improving the checkpointing performance for timers is a very important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but adding an add and delete version field and perform async checkpoints based on these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing. He might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the description of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang <[hidden email]>:
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.

Reply | Threaded
Open this post in threaded view
|

Re: 答复: Slow flink checkpoint

Fabian Hueske-2
Thanks MaKeyang!

I've given you contributor permissions and assigned the issue to you.

Best, Fabian

2018-04-16 13:19 GMT+02:00 ma ky <[hidden email]>:

Fabian:
    thanks for u replay.
    I have create a jira issue:
https://issues.apache.org/jira/browse/FLINK-9182?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Improvement%20AND%20created%20%3E%3D%20-10m

   I'll pull the code ASAP.




MaKeyang



发件人: Fabian Hueske <[hidden email]>
发送时间: 2018年4月16日 16:21
收件人: makeyang
抄送: user; Aljoscha Krettek
主题: Re: Slow flink checkpoint
 
Hi everybody,

Thanks so much for looking into this issue and posting the detailed description of your approach.
As said before, improving the checkpointing performance for timers is a very important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but adding an add and delete version field and perform async checkpoints based on these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing. He might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the description of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang <[hidden email]>:
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.