I am curious about how operator state is repartitioned to subtasks when a job is resumed from a checkpoint or savepoint. The reason is that I am having issues with the ContinuousFileReaderOperator when recovering
from a failure. I consume most of my data from files off S3. I have a custom file monitor that understands how to walk my directory structure and outputs TimestampedFileSplits downstream in chronological order to the stock
ContinuousFileReaderOperator. The reader consumes those splits and stores them a priority queue based on their last modified time ensuring that files are read in chronological order which is exactly what I want. The problem is when recovering, the unread splits
being partitioned out to each of the subtasks seem to be heavily skewed in terms of last modified time.
While each task may have a similar number of files I find then one or two will have a disproportionate number of old files. This in turn holds back my watermark (sometimes for several hours depending on the
number of unread splits) which keeps timers from firing, windows from purging, etc. I was hoping there were some way I could add a custom partitioner to ensure that splits are uniformly distributed in a temporal manner or if someone had other ideas of how I could mitigate the problem. Thank you, Seth Wiesman |
Hi Seth,
Upon restoring, splits will be re-shuffled among the new tasks, and I believe that state is repartitioned in a round robin way (although I am not 100% sure so I am also including Stefan and Aljoscha in this). The priority queues will be reconstructed based on the restored elements. So task managers may get a relatively equal number of splits, but “recent” ones may be concentrated on a few nodes. This may also have to do with how your monitor sends them to the reader (e.g. all splits of a recent file go to the same node). As far as I know, we do not have an option for custom state re-partitioner. To see what is restored, you can enable DEBUG logging and this will print upon restoring sth like: "ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored {restoredReaderState}" with the restoredReaderState containing the restored splits. And something similar upon checkpointing. This will give you a better look in what may be happening. Thanks, Kostas On May 4, 2017, at 3:45 PM, Seth Wiesman <[hidden email]> wrote: |
Hi,
the repartitoning happens indeed as some round-robin algorithm (see RoundRobinOperatorStateRepartitioner). This repartitioning happens at the level of the checkpoint coordinator in the master on restore, by redistrubution of state handles. The state that those handles are pointing to is a black box in this place, so all assumptions that we can make is that all partitions can be redistributed freely. If we want additional constraints to the repartitioning, the user has to apply those when handing over the state partitions, i.e. the partitioning into the list state must happen in a way that already groups together state partitions that should not end up on separate machines after a restore. Best, Stefan
|
Free forum by Nabble | Edit this page |