Implementation of setBufferTimeout(timeoutMillis)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementation of setBufferTimeout(timeoutMillis)

Pankaj Chand
Hello,

The documentation gives the following two sample lines for setting the buffer timeout for the streaming environment or transformation.

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

I have been trying to find where (file and method) in the Flink source code are the buffers being flushed by iteratively referring to the value of timeoutMillis (or the default value), but have been unsuccessful. Please help.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Implementation of setBufferTimeout(timeoutMillis)

Yun Gao
Hi Pankaj,

    I think it should be in org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.

Best,
 Yun



------------------------------------------------------------------
Sender:Pankaj Chand<[hidden email]>
Date:2020/08/31 02:40:15
Recipient:user<[hidden email]>
Theme:Implementation of setBufferTimeout(timeoutMillis)

Hello,

The documentation gives the following two sample lines for setting the buffer timeout for the streaming environment or transformation.

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

I have been trying to find where (file and method) in the Flink source code are the buffers being flushed by iteratively referring to the value of timeoutMillis (or the default value), but have been unsuccessful. Please help.

Thanks,

Pankaj

Reply | Threaded
Open this post in threaded view
|

Re: Implementation of setBufferTimeout(timeoutMillis)

Pankaj Chand
Thank you so much, Yun! It is exactly what I needed.

On Mon, Aug 31, 2020 at 1:50 AM Yun Gao <[hidden email]> wrote:
Hi Pankaj,

    I think it should be in org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.

Best,
 Yun



------------------------------------------------------------------
Sender:Pankaj Chand<[hidden email]>
Date:2020/08/31 02:40:15
Recipient:user<[hidden email]>
Theme:Implementation of setBufferTimeout(timeoutMillis)

Hello,

The documentation gives the following two sample lines for setting the buffer timeout for the streaming environment or transformation.

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

I have been trying to find where (file and method) in the Flink source code are the buffers being flushed by iteratively referring to the value of timeoutMillis (or the default value), but have been unsuccessful. Please help.

Thanks,

Pankaj