Deterministic rescale for test

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Deterministic rescale for test

Martin Frank Hansen
Hi, 

I am trying to make a test-suite for our Flink jobs, and are having problems making the input-data deterministic. 

We are reading a file-input with parallelism 1 and want to rescale to a higher parallelism, such that the ordering of the data is the same every time. 

I have tried using rebalance, rescale but it seems to randomly distribute data between partitions. We don't need something optimized, we just need the same distribution for every run.  
Is this possible? 

Some code:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
rawStream.rescale
...
best regards 

--

Martin Frank Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Deterministic rescale for test

Jaffe, Julian

Martin,

 

You can use `.partitionCustom` and provide a partitioner if you want to control explicitly how elements are distributed to downstream tasks.

 

From: Martin Frank Hansen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, January 14, 2021 at 1:48 AM
To: user <[hidden email]>
Subject: Deterministic rescale for test

 

Hi, 

I am trying to make a test-suite for our Flink jobs, and are having problems making the input-data deterministic. 

We are reading a file-input with parallelism 1 and want to rescale to a higher parallelism, such that the ordering of the data is the same every time. 

I have tried using rebalance, rescale but it seems to randomly distribute data between partitions. We don't need something optimized, we just need the same distribution for every run.  

Is this possible? 


Some code:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
rawStream.rescale
...

best regards 

 

--

Martin Frank Hansen

 

Reply | Threaded
Open this post in threaded view
|

Re: Deterministic rescale for test

Martin Frank Hansen
Hi Jaffe, 

Thanks for your reply, I will try to use a Custom Partioner. 

Den tor. 14. jan. 2021 kl. 19.39 skrev Jaffe, Julian <[hidden email]>:

Martin,

 

You can use `.partitionCustom` and provide a partitioner if you want to control explicitly how elements are distributed to downstream tasks.

 

From: Martin Frank Hansen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, January 14, 2021 at 1:48 AM
To: user <[hidden email]>
Subject: Deterministic rescale for test

 

Hi, 

I am trying to make a test-suite for our Flink jobs, and are having problems making the input-data deterministic. 

We are reading a file-input with parallelism 1 and want to rescale to a higher parallelism, such that the ordering of the data is the same every time. 

I have tried using rebalance, rescale but it seems to randomly distribute data between partitions. We don't need something optimized, we just need the same distribution for every run.  

Is this possible? 


Some code:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
rawStream.rescale
...

best regards 

 

--

Martin Frank Hansen

 



--

Martin Frank Hansen


Reply | Threaded
Open this post in threaded view
|

Re: Deterministic rescale for test

Arvid Heise-3
Hi Martin,

Rebalance and Rescale use round robin and are deterministic in your case (assuming same task manager / slots). You just need to stay clear of ShufflePartitioner.

If you are seeing something non-deterministic, could you please share an example?

On Fri, Jan 15, 2021 at 7:19 AM Martin Frank Hansen <[hidden email]> wrote:
Hi Jaffe, 

Thanks for your reply, I will try to use a Custom Partioner. 

Den tor. 14. jan. 2021 kl. 19.39 skrev Jaffe, Julian <[hidden email]>:

Martin,

 

You can use `.partitionCustom` and provide a partitioner if you want to control explicitly how elements are distributed to downstream tasks.

 

From: Martin Frank Hansen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, January 14, 2021 at 1:48 AM
To: user <[hidden email]>
Subject: Deterministic rescale for test

 

Hi, 

I am trying to make a test-suite for our Flink jobs, and are having problems making the input-data deterministic. 

We are reading a file-input with parallelism 1 and want to rescale to a higher parallelism, such that the ordering of the data is the same every time. 

I have tried using rebalance, rescale but it seems to randomly distribute data between partitions. We don't need something optimized, we just need the same distribution for every run.  

Is this possible? 


Some code:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
rawStream.rescale
...

best regards 

 

--

Martin Frank Hansen

 



--

Martin Frank Hansen




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng