Hi,
My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time. Following DAG represents our job: at 19:37:04.605, the job is entering "Co-Flat Map" at 19:37:04.605, the job is leaving "Co-Flat Map" at
19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..." both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea? This is a UTF-8 formatted mail ----------------------------------------------- James C.-C.Yu +886988713275 +8615618429976 |
The previous email seems unable to display embedded images, let me put on the links.
|
Hi,
you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Best, Stefan
|
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency
|
Hi Stefan, Do you mean enable object reuse? If you want to reduce latency between chained operators, you can also try to disable object-reuse: On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
|
Oh yes exactly, enable is right.
|
Hi Stefan,
Sorry for jumping in the discussion. I’ve seen a blog post [1] of dataArtisans which says that object reuse has not much influence on data streams. > For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in avoiding additional object copying on the way, which happens by default as an additional safety net for users. So I’m a bit confused here. Could you please give more details about the object reuse in data streams? Thanks a lot! Best, Paul Lam
|
Hi Paul,
sure, what I mean is basically what this comment in the blogpost says: „For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in *avoiding additional object copying* on the way, which happens by default as an additional safety net for users.“. In the context of my previous response, this avoiding of copies happens in the case where Flink chains operators together. Operator chaining runs operators in the same TM (JVM) and not going through network connections for the chain but instead just passes the output object of one operator as the input object to the next. Without object reuse, Flink will create a deep copy of the output object before passing the copy down to the next operator as input to avoid problems that come from two operators sharing the same object. With object reuse, the same output object instance becomes the input, no deep copy happens. Deep copies, at least for mutable objects, go through de/serialization and this introduces latency. That is what my comment was about. Best, Stefan
|
Hi Stefan,
Thanks for your detailed explanation! It helps a lot! I think I misunderstood the sentence. I thought “avoiding additional object copying” was the default behavior. Best, Paul Lam
|
Free forum by Nabble | Edit this page |