Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not a big problem for us, if some files are remained opened in case of job reloading. Periodically, those jobs fail with OutOfMemory exception, and seems, that I found a strange thing in the implementation of BucketingSink. During the sink lifecycle, we have a state object, implemented as a map, where key is a bucket path, and value is a state, that contains information about opened files and list of pending files. After researching of the heap dump, I found, that those state stores information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 Mb. I’ve looked through the code, and found, that we removing the buckets from the state, in notifyCheckpointComplete method.
@Override Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator(); if (!bucketState.isWriterOpen && bucketState.pendingFiles.isEmpty() && } } So, this looks like an issue, when you are using this sink in checkpointless environment, because the data always added to the state, but never removed. Of course, we could enabled checkpointing, and use one of available backends, but as for me, it seems like a non expected behaviour, like I have an opportunity to run the job without checkpointing, but really, if I do so, I got an exception in sink component. What do you think about this ? Do anyone got the same problem, and how’ve you solved it ? Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
Hi,
BucketingSink is designed to provide exactly-once writes to file system, which is inherently tied to checkpointing. As you just saw, without checkpointing, BucketingSink is never notified that it can commit pending files. If you do not want to use checkpointing for some reasons, you could always use for example org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat and write your own simple `OutputFormat` or look if one of the existing ones meet your needs. Piotrek
|
Piotr, thx for your reply, for now everything is pretty clear. But from my point of view, it’s better to add some information about leaks in case of disabled checkpointing into BucketingSink documentation
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
I agree, if the sink doesn't properly
work without checkpointing we should make sure that it fails early
if it used that way.
It would be great if you could open a JIRA. On 08.06.2018 10:08, Rinat wrote: Piotr, thx for your reply, for now everything is pretty clear. But from my point of view, it’s better to add some information about leaks in case of disabled checkpointing into BucketingSink documentation
|
Chesnay, thx for your reply, I’ve created one https://issues.apache.org/jira/browse/FLINK-9558
Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: [hidden email] mobile: +7 (925) 416-37-26 CleverDATA make your data clever |
Free forum by Nabble | Edit this page |