Hi,
I am trying to use Flink for data ingestion. Input is a Kafka topic with strings - paths to incoming archive files. The job is unpacking the archives, reads data in them, parses and stores data in another format. Everything works fine if the topic is empty at the beginning of execution and then archives income with regular intervals. But if the queue contains several thousands of paths when the job starts - the checkpont durations become too long and write transactions either fail or take hours to complete. As Kafka messages are very small (just paths) - Kafka source manages to read all of them almost instantly before backpressure is detected. And then it tries to process all these entries within a single checkpoint. As archives might be pretty large - it takes hours. Do you know a solution for this problem? Is it possible to ask Flink source to read data slowly before the correct processing speed is detected? I decreased "fetch.max.bytes" kafka source property to 1kb and set buffer timeout to 1ms. It seems to work for the current data set, but it does not look like a good solution... -- Best regards, Andrei Shumanski |
Andrei Shumanski:
which source are u using? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Andrei Shumanski
Hi, Right now it is a Kafka source, but I had the same issue when reading data from local FS.-- Best regards, Andrei Shumanski On Fri, May 18, 2018 at 11:44 AM, makeyang <[hidden email]> wrote: Andrei Shumanski: |
Hi Andrei, With the current version of Flink, there is no general solution to this problem.2018-05-18 11:59 GMT+02:00 Andrei Shumanski <[hidden email]>:
|
Hi,
Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely you will be able to limit the “in flight” data, by controlling the number of assigned credits per channel/input gate. Even without any configuring Flink 1.5.0 will out of the box buffer less data, thus mitigating the problem. There are some tweaks that you could use to make 1.4.x work better. With small records that require heavy processing, generally speaking you do not need huge buffers sizes to keep max throughput. You can try to both reduce the buffer pool and reduce the memory segment sizes: • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1), • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB), • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and • taskmanager.memory.segment-size: Size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)). Reducing those values will reduce amount of in-flight data that will be caught between checkpoints. But keep in mind that smaller values can lead to smaller throughput, but as I said, with small number of heavy processing records this is not an issue. In an extreme example, if your records are lets say 8 bytes each and require 1 hour to process, there is almost no need for any buffering. Piotrek
|
Hi Piotr,
Thank you very much for your response. I will try the new feature of Flink 1.5 when it is released. But I am not sure minimising buffers sizes will work in all scenarios. If I understand correctly these settings are affecting the whole Flink instance. We might have a flow like this: Source: Read file paths --> Unpack and parse files --> Analyse parsed data -> …. So it will be a very small amount of data at first step but quite a lot of parsed data later. Changing buffer sizes globally will probably affect throughput of later steps, as you wrote.
|
Hi,
Yes if you have mixed workload in your pipeline, it is matter of finding a right balance. Situation will be better in Flink 1.5, but the underlying issue will remain as well - in 1.5.0 there also will be no way to change network buffers configuration between stages of the single job. Currently such explosion of records (one small records producing huge bunch of new records) is kind of anti pattern in Flink. Besides the problem that we were discussing, the other problem is that you can not checkpoint in the middle of processing the big record. I hope that this will change in future Flink releases, but currently those are the limitations. For your case, with initial records being file paths, it might be better to embed this logic within a data source, so your data source is already producing parsed records. For example FlinkKafkaConsumer is discovering topics/partitions on the fly, and the smallest transport unit is still “parsed record” and not a “topic” (“file path” in your case). With proper offsets implementation this also handles the problem of checkpointing in the middle of processing large file. Piotrek
|
One more remark. Currently there is unwritten assumption in Flink, that time to process records is proportional number of bytes. As you noted, this brakes in case of mixed workloads (especially with file paths sent as records).
There is interesting workaround this problem though. You could use custom serializer for the file paths to artificially blow the record size, for example to "segment-size” (32KB), or even more. This is easy to do - for example just pad the string with spaces. It would ensure that there is at most one file path to process per network buffer and would even out the imbalance of the assumption of record size being proportional to number of bytes.
Piotrek
|
Hi Piotr,
Thanks! I will try it. It is a bit ugly solution, but it may work :)
|
Free forum by Nabble | Edit this page |