Re: Slow flink checkpoint
Posted by makeyang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Slow-flink-checkpoint-tp18946p19258.html
I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
a) for each keygroup, invoke InternalTimeServiceManager.snapshotStateForKeyGroup
b) InternalTimeServiceManager create a InternalTimerServiceSerializationProxy to write snapshot
c) InternalTimerServiceSerializationProxy iterat <String,//which is service name,
HeapInternalTimerService> tuple and write service name and snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to writeTimersSnapshot
d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first write keyserializer and namespaceserializer, then get eventTimers and processingTimers of InternalTimersSnapshot, which is Set of InternalTimer and serializer them.
2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples and then shallow copy the eventTimers and processingTimers, then use another thread to snapshot them without blocking the event processing thread. but it turns out that shallow copy of the eventTimers and processingTimers are time consumed and this solution failed
3. then I try to borrow the idea of data structure CopyOnWriteStateTable and try to manage timers with it. but after digging more, I found out that there is a more easy way to achieve asynchronous snapshot timers due to one fact: InternalTimer is immutable. we can achieve asynchronous with a more easy way based on this fact:
a)maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
b)for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice.
c)each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot
d)shallow copy <String,HeapInternalTimerService> tuples
e)then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot.
f)when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock.
g)last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it.
h) some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key.
the code is done and test is still runnng. I post this comments not only try to hear u guys voice, but also try to figure out some more questios related to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by another thread of ProcessingTimeService, and in this thread, it will remove timer in HeapInternalTimerService. while in current timer snapshot path, I haven't found there is any shallow copy of processingTimeTimers and eventTimeTimers. how could this won't cause concurrent modification exception?
2. since onProcessingTime is trigged in another thread, when timers are snapshot in working thread, what if then a timer is fired and triggerTarget is processed, which could cause state changed, then asynchronous keyedstatsnapshot is trigged. won't this cause state inconsistent? let's image this case: all kedyed state is only chaned by timer. so Add timer1, timer2, timer3, timer4 and timer5 and since no timer is processed, keyed state is nothing. then timer1 and timer2 is processed, keyed state is k2. and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3, timer4 and timer5 in synchronous way. then try to snapshot keyed state asynchronous while timer3 is processed and keyed state is k3. the eventually snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should be timer3, timer4, timer5 and k2. please help me out this.
thanks very much
by the way, if u guys won't mind, can anyone of u open a jira issue to track this and when time is ok, I'll make contribution on this issue.