Hi,
Thanks |
Hi, AFAIK, that's not possible. The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application. 2018-03-16 3:11 GMT+01:00 林德强 <[hidden email]>:
|
Hi, Best, Stefan
|
In reply to this post by Fabian Hueske-2
Hi
Fabian
, Reduce the number of timers is a good idea. But in my application the timer is different from the key registered follow the keyBy . May be it can't work with an upper and lower bound. I try modify the flink resource and start a
thread to clean the expired keyed sate, but it doesn't work well because of concurrency issues. Best, Deqiang 2018-03-16 16:03 GMT+08:00 Fabian Hueske <[hidden email]>:
|
Hi, Yes, you cannot start a separate thread to cleanup the state. State is managed by Flink and can only be accessed at certain points in time when the user code is called. 2018-03-16 13:56 GMT+01:00 林德强 <[hidden email]>:
|
This post was updated on .
I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first: a) for each keygroup, invoke InternalTimeServiceManager.snapshotStateForKeyGroup b) InternalTimeServiceManager create a InternalTimerServiceSerializationProxy to write snapshot c) InternalTimerServiceSerializationProxy iterat <String,//which is service name, HeapInternalTimerService> tuple and write service name and snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to writeTimersSnapshot d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first write keyserializer and namespaceserializer, then get eventTimers and processingTimers of InternalTimersSnapshot, which is Set of InternalTimer and serializer them. 2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples and then shallow copy the eventTimers and processingTimers, then use another thread to snapshot them without blocking the event processing thread. but it turns out that shallow copy of the eventTimers and processingTimers are time consumed and this solution failed 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and try to manage timers with it. but after digging more, I found out that there is a more easy way to achieve asynchronous snapshot timers due to one fact: InternalTimer is immutable. we can achieve asynchronous with a more easy way based on this fact: a)maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion b)for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. c)each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot d)shallow copy <String,HeapInternalTimerService> tuples e)then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot. f)when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock. g)last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it. h) some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key. the code is done and test is still runnng. I post this comments not only try to hear u guys voice, but also try to figure out some more questios related to currently timer snapshot code path. my questions are below: 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by another thread of ProcessingTimeService, and in this thread, it will remove timer in HeapInternalTimerService. while in current timer snapshot path, I haven't found there is any shallow copy of processingTimeTimers and eventTimeTimers. how could this won't cause concurrent modification exception? 2. since onProcessingTime is trigged in another thread, when timers are snapshot in working thread, what if then a timer is fired and triggerTarget is processed, which could cause state changed, then asynchronous keyedstatsnapshot is trigged. won't this cause state inconsistent? let's image this case: all kedyed state is only chaned by timer. so Add timer1, timer2, timer3, timer4 and timer5 and since no timer is processed, keyed state is nothing. then timer1 and timer2 is processed, keyed state is k2. and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3, timer4 and timer5 in synchronous way. then try to snapshot keyed state asynchronous while timer3 is processed and keyed state is k3. the eventually snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should be timer3, timer4, timer5 and k2. please help me out this. thanks very much by the way, if u guys won't mind, can anyone of u open a jira issue to track this and when time is ok, I'll make contribution on this issue. |
the test is very promising.
the time sync part takes from couple of seconds to couple of mill-seconds. 1000x time reduce(overall time not save since it is just move from sync to async) are u guys interested in this change? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by makeyang
Hi Stefan , Fabian ,
Keyang is engineer in our team, he has do a lot of efforts on the timers' snapshot async. What do you think of his idea? Best, Deqiang
|
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some thoughts on it? help to review/comments on my desgin? or give us a design so that I can help to implement it. thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi everybody, Thanks so much for looking into this issue and posting the detailed description of your approach.As said before, improving the checkpointing performance for timers is a very important improvement for Flink. If you have the code ready, you can also open a PR and reference the JIRA. 2018-04-16 9:03 GMT+02:00 makeyang <[hidden email]>: since flink forward SF has done. |
Fabian:
thanks for u replay. I have create a jira issue: https://issues.apache.org/jira/browse/FLINK-9182?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Improvement%20AND%20created%20%3E%3D%20-10m I'll pull the code ASAP.
MaKeyang
TIG.JD.COM
发件人: Fabian Hueske <[hidden email]>
发送时间: 2018年4月16日 16:21 收件人: makeyang 抄送: user; Aljoscha Krettek 主题: Re: Slow flink checkpoint Hi everybody,
Thanks so much for looking into this issue and posting the detailed description of your approach.As said before, improving the checkpointing performance for timers is a very important improvement for Flink.
If you have the code ready, you can also open a PR and reference the JIRA.
2018-04-16 9:03 GMT+02:00 makeyang <[hidden email]>:
since flink forward SF has done. |
Thanks MaKeyang! I've given you contributor permissions and assigned the issue to you.2018-04-16 13:19 GMT+02:00 ma ky <[hidden email]>:
|
Free forum by Nabble | Edit this page |