Testing CoFlatMap correctness

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Testing CoFlatMap correctness

Sofer, Tovi

Hi group,

 

What is the best practice for testing CoFlatMap operator correctness?

We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.

How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
 
Thanks in advance,
Tovi

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Testing CoFlatMap correctness

Kostas Kloudas
Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:

TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against the expected one.

If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example.


Hope this helps,
Kostas


On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <[hidden email]> wrote:

Hi group,

 

What is the best practice for testing CoFlatMap operator correctness?

We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.

How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
 
Thanks in advance,
Tovi

Reply | Threaded
Open this post in threaded view
|

RE: Testing CoFlatMap correctness

Sofer, Tovi

Hi Kostas,

 

Thank you for the suggestion.

But in our case we want to do either a component test that involves several steps, where the CoFlatMap is one step in the middle, or integration test that test the whole flow, which involves also the CoFlatMap.

And we trying to understand how to test such scenario so that results are predictable, and that elements from main stream arrive after elements from control stream, or other way around.

 

Thanks again,

Tovi

 

From: Kostas Kloudas [mailto:[hidden email]]
Sent:
יום ה 07 דצמבר 2017 19:11
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: [hidden email]
Subject: Re: Testing CoFlatMap correctness

 

Hi Tovi,

 

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:

 

TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
      new
TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

 

and then use testHarness.getOutput() to get the output and compare it against the expected one.

 

If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example.

 

 

Hope this helps,

Kostas

 



On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <[hidden email]> wrote:

 

Hi group,

 

What is the best practice for testing CoFlatMap operator correctness?

We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.

How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
 
Thanks in advance,
Tovi

 

Reply | Threaded
Open this post in threaded view
|

Re: Testing CoFlatMap correctness

Fabian Hueske-2
Hi Tovi,

testing the behavior of a data flow with respect to the order of records from different sources is tricky.
Source functions are working independently of each other and it is not easily possible to control the order in which records is shipped (and received) across source functions.

You could implement source functions that only emit records (and possibly watermarks) when being triggered from your test code.
Having two of these sources in a program, you could choose in which order the sources emit records and watermarks.
However, you would need to ensure that a record is completely processed (the program came to a hold), before you emit the next record (from the same or another source) to avoid race conditions.
You could do this with timeouts, but this is very fragile and I would not recommend it.

Btw. watermarks have no effect on the order in which records are processed. They only determine the event-time of an operator.
In case of a co-operator, this is the the smaller watermark time of both input streams. So, the input records of a co-operator are not aligned or hold back based on their timestamps or watermarks.
Instead, an operator can put "early" records into state and process them when a later watermark arrives, which means that all relevant records from both inputs have been received.

I hope this helps,
Fabian


2017-12-10 10:04 GMT+01:00 Sofer, Tovi <[hidden email]>:

Hi Kostas,

 

Thank you for the suggestion.

But in our case we want to do either a component test that involves several steps, where the CoFlatMap is one step in the middle, or integration test that test the whole flow, which involves also the CoFlatMap.

And we trying to understand how to test such scenario so that results are predictable, and that elements from main stream arrive after elements from control stream, or other way around.

 

Thanks again,

Tovi

 

From: Kostas Kloudas [mailto:[hidden email]]
Sent:
יום ה 07 דצמבר 2017 19:11
To: Sofer, Tovi [ICG-IT] <[hidden email]>
Cc: [hidden email]
Subject: Re: Testing CoFlatMap correctness

 

Hi Tovi,

 

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:

 

TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
      new
TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

 

and then use testHarness.getOutput() to get the output and compare it against the expected one.

 

If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example.

 

 

Hope this helps,

Kostas

 



On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <[hidden email]> wrote:

 

Hi group,

 

What is the best practice for testing CoFlatMap operator correctness?

We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.

How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
 
Thanks in advance,
Tovi