Flink takes 40ms ~ 100ms to proceed from one operator to another?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink takes 40ms ~ 100ms to proceed from one operator to another?

James Yu
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976
Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

James Yu
The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976

Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Stefan Richter
Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976


Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Stefan Richter
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976



Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Hequn Cheng
Hi Stefan,

Do you mean enable object reuse?
If you want to reduce latency between chained operators, you can also try to disable object-reuse:

On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976



Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Stefan Richter
Oh yes exactly, enable is right.

Am 20.09.2018 um 17:48 schrieb Hequn Cheng <[hidden email]>:

Hi Stefan,

Do you mean enable object reuse?
If you want to reduce latency between chained operators, you can also try to disable object-reuse:

On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976




Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Paul Lam
Hi Stefan,

Sorry for jumping in the discussion. 

I’ve seen a blog post [1] of dataArtisans which says that object reuse has not much influence on data streams.

>  For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in avoiding additional object copying on the way, which happens by default as an additional safety net for users.

So I’m a bit confused here. Could you please give more details about the object reuse in data streams? Thanks a lot!


Best,
Paul Lam


在 2018年9月20日,23:52,Stefan Richter <[hidden email]> 写道:

Oh yes exactly, enable is right.

Am 20.09.2018 um 17:48 schrieb Hequn Cheng <[hidden email]>:

Hi Stefan,

Do you mean enable object reuse?
If you want to reduce latency between chained operators, you can also try to disable object-reuse:

On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976





Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Stefan Richter
Hi Paul,

sure, what I mean is basically what this comment in the blogpost says: „For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in *avoiding additional object copying* on the way, which happens by default as an additional safety net for users.“. In the context of my previous response, this avoiding of copies happens in the case where Flink chains operators together. Operator chaining runs operators in the same TM (JVM) and not going through network connections for the chain but instead just passes the output object of one operator as the input object to the next. Without object reuse, Flink will create a deep copy of the output object before passing the copy down to the next operator as input to avoid problems that come from two operators sharing the same object. With object reuse, the same output object instance becomes the input, no deep copy happens. Deep copies, at least for mutable objects, go through de/serialization and this introduces latency. That is what my comment was about.

Best,
Stefan 

Am 26.09.2018 um 11:01 schrieb Paul Lam <[hidden email]>:

Hi Stefan,

Sorry for jumping in the discussion. 

I’ve seen a blog post [1] of dataArtisans which says that object reuse has not much influence on data streams.

>  For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in avoiding additional object copying on the way, which happens by default as an additional safety net for users.

So I’m a bit confused here. Could you please give more details about the object reuse in data streams? Thanks a lot!


Best,
Paul Lam


在 2018年9月20日,23:52,Stefan Richter <[hidden email]> 写道:

Oh yes exactly, enable is right.

Am 20.09.2018 um 17:48 schrieb Hequn Cheng <[hidden email]>:

Hi Stefan,

Do you mean enable object reuse?
If you want to reduce latency between chained operators, you can also try to disable object-reuse:

On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976






Reply | Threaded
Open this post in threaded view
|

Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

Paul Lam
Hi Stefan,

Thanks for your detailed explanation! It helps a lot!

I think I misunderstood the sentence. I thought “avoiding additional object copying” was the default behavior. 

Best,
Paul Lam

在 2018年9月26日,17:22,Stefan Richter <[hidden email]> 写道:

Hi Paul,

sure, what I mean is basically what this comment in the blogpost says: „For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in *avoiding additional object copying* on the way, which happens by default as an additional safety net for users.“. In the context of my previous response, this avoiding of copies happens in the case where Flink chains operators together. Operator chaining runs operators in the same TM (JVM) and not going through network connections for the chain but instead just passes the output object of one operator as the input object to the next. Without object reuse, Flink will create a deep copy of the output object before passing the copy down to the next operator as input to avoid problems that come from two operators sharing the same object. With object reuse, the same output object instance becomes the input, no deep copy happens. Deep copies, at least for mutable objects, go through de/serialization and this introduces latency. That is what my comment was about.

Best,
Stefan 

Am 26.09.2018 um 11:01 schrieb Paul Lam <[hidden email]>:

Hi Stefan,

Sorry for jumping in the discussion. 

I’ve seen a blog post [1] of dataArtisans which says that object reuse has not much influence on data streams.

>  For Flink’s DataStream API, this setting does in fact not even result in reusing of objects, but only in avoiding additional object copying on the way, which happens by default as an additional safety net for users.

So I’m a bit confused here. Could you please give more details about the object reuse in data streams? Thanks a lot!


Best,
Paul Lam


在 2018年9月20日,23:52,Stefan Richter <[hidden email]> 写道:

Oh yes exactly, enable is right.

Am 20.09.2018 um 17:48 schrieb Hequn Cheng <[hidden email]>:

Hi Stefan,

Do you mean enable object reuse?
If you want to reduce latency between chained operators, you can also try to disable object-reuse:

On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter <[hidden email]> wrote:
Sorry, forgot the link for reference [1], which is https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency

Am 20.09.2018 um 16:36 schrieb Stefan Richter <[hidden email]>:

Hi,

you provide not very much information for this question, e.g. what and how exactly your measure, if this is a local or distributed setting etc. I assume that it is distributed and that the cause for your observation is the buffer timeout, i.e. the maximum time that Flink waits until sending a buffer with just one element, which happens to be 100ms by default. You can decrease this value to some extend, at to cost of potential loss in throughput, but I think even values around 5-10ms are ok-ish. See [1] for more details. If you want to reduce latency between chained operators, you can also try to disable object-reuse:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

Best,
Stefan

Am 20.09.2018 um 16:03 schrieb James Yu <[hidden email]>:

The previous email seems unable to display embedded images, let me put on the links.
Hi,

My team and I try to measure total time spent on our flink job and found out that Flink takes 40ms ~ 100ms to proceed from one operator to another. I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275
+8615618429976