Two operators consuming from same stream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Two operators consuming from same stream

Sofer, Tovi

Hi group,

 

We have the following graph below, on which we added metrics for latency calculation.

We have two streams which are consumed by two operators:

·         ordersStream and pricesStream – they are both consumed by two operators: CoMapperA and CoMapperB, each using connect.

 

Initially we thought that for stream consumed by two operators – that we need to duplicate the stream to two separate streams, so we did it using split as below.

Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part.

However – when checking latency – we found that latency with duplicated streams was much better than without duplication (about twice).

 

My questions:

·         Is the improved latency related to check pointing separately on those streams ?

·         What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way?

 

Additional Info:

The two graphs configuration appear exactly the same in execution plan\web UI:

 

sourceOrders.keyBy,CoMapperA,OrdersStreams
 

 


prsssssssssssssss

sourcePrices.keyBy,CoMapperB,pricesStreams
 

 

 

 

 

 

 

 


Code without duplication looks something like:

KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);

KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

 

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

 

 

Code used for duplication:

(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above).

//duplicate prices streams  

        SplitStream<Price> pricesStream = pricesStream

                .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") );

 

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");

        DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB");

 

 

Thanks,

Tovi

 

 

 

 

 


image001.emz (1K) Download Attachment
image008.emz (1K) Download Attachment
image009.emz (1K) Download Attachment
image023.emz (1K) Download Attachment
image024.emz (1K) Download Attachment
image025.emz (1K) Download Attachment
image026.emz (1K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Two operators consuming from same stream

Timo Walther
Hi Tovi,

I think your code without duplication performs two separate shuffle operations whereas the other code only performs one shuffle.

Further latency impacts might be due to the overhead involved in maintaining the partitioning for a keyed stream/key groups and switching key contexts in the operator.

Did you check the latency of the following?

DataStream<> ds = orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :

Hi group,

 

We have the following graph below, on which we added metrics for latency calculation.

We have two streams which are consumed by two operators:

·         ordersStream and pricesStream – they are both consumed by two operators: CoMapperA and CoMapperB, each using connect.

 

Initially we thought that for stream consumed by two operators – that we need to duplicate the stream to two separate streams, so we did it using split as below.

Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part.

However – when checking latency – we found that latency with duplicated streams was much better than without duplication (about twice).

 

My questions:

·         Is the improved latency related to check pointing separately on those streams ?

·         What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way?

 

Additional Info:

The two graphs configuration appear exactly the same in execution plan\web UI:

 



sourceOrders.keyBy,CoMapperA,OrdersStreams
 

 


prsssssssssssssss









sourcePrices.keyBy,CoMapperB,pricesStreams
 

 

 

 

 

 

 

 


Code without duplication looks something like:

KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);

KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

 

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

 

 

Code used for duplication:

(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above).

//duplicate prices streams  

        SplitStream<Price> pricesStream = pricesStream

                .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") );

 

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");

        DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB");

 

 

Thanks,

Tovi

 

 

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Two operators consuming from same stream

Sofer, Tovi

Hi Timo,

 

Actually I do keyBy in both cases, and in split\duplicate case I do it on both splitted streams.

 

I did do the connect below twice and not once, but connect only calls ctor of ConnectedStreams, and doesn’t do any real operation.

So I don’t see how it will make a difference.  

I can try it if you see a reason.

 

 

More detailed code including all keyBy:

 

Code without duplication looks something like:  

KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);

KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

 

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

 

 

Code with duplication (better latency):  

(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above).

//duplicate prices streams  

SplitStream<Price> pricesSplitStream = pricesStream

                .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") );

 DataStream<Price> pricesStreamA = pricesSplitStream.select("pricesStreamA");

DataStream< Price > pricesStreamB= pricesSplitStream.select("pricesStreamB");

//duplicate orders streams  

 SplitStream< Order > ordersSplitStream = ordersStream

                .split( order -> ImmutableList.of("orderStreamA "," orderStreamB ") );

 DataStream<Order> orderStreamA = ordersSplitStream.select("orderStreamA ");

DataStream<Order> orderStreamB = ordersSplitStream.select("orderStreamB ");

 

DataStream<Order> priceOrdersConnectedStream = orderStreamA.connect(pricesStreamA).keyBy(“priceId”,“ priceId”)

.flatMap(mapperA);

DataStream<Price> orderPricesConnectedStream = orderStreamB.connect(pricesStreamB).keyBy(“orderId”,“ orderId”)

.flatMap(mapperB);

 

 

 

From: Timo Walther [mailto:[hidden email]]
Sent:
יום ד 03 ינואר 2018 11:02
To: [hidden email]
Subject: Re: Two operators consuming from same stream

 

Hi Tovi,

I think your code without duplication performs two separate shuffle operations whereas the other code only performs one shuffle.

Further latency impacts might be due to the overhead involved in maintaining the partitioning for a keyed stream/key groups and switching key contexts in the operator.

Did you check the latency of the following?

DataStream<> ds = orderKeyedStream.connect(pricesKeyedStream).flatMap(identityMapper);
ds.flatMap(mapperA);
ds.flatMap(mapperB);

Regards,
Timo


Am 1/1/18 um 2:50 PM schrieb Sofer, Tovi :

Hi group,

 

We have the following graph below, on which we added metrics for latency calculation.

We have two streams which are consumed by two operators:

·         ordersStream and pricesStream – they are both consumed by two operators: CoMapperA and CoMapperB, each using connect.

 

Initially we thought that for stream consumed by two operators – that we need to duplicate the stream to two separate streams, so we did it using split as below.

Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part.

However – when checking latency – we found that latency with duplicated streams was much better than without duplication (about twice).

 

My questions:

·         Is the improved latency related to check pointing separately on those streams ?

·         What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way?

 

Additional Info:

The two graphs configuration appear exactly the same in execution plan\web UI:

 



sourceOrders.keyBy,CoMapperA,OrdersStreams 

 

prsssssssssssssss









sourcePrices.keyBy,CoMapperB,pricesStreams 

 

 

 

 

 

 

 

Code without duplication looks something like:

KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);

KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

 

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

 

 

Code used for duplication:

(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above).

//duplicate prices streams  

        SplitStream<Price> pricesStream = pricesStream

                .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") );

 

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");

        DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB");

 

 

Thanks,

Tovi

 

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Two operators consuming from same stream

Newport, Billy
In reply to this post by Sofer, Tovi

We’ve seen the same thing here. We read files twice for the same reason, it’s simply faster to do that than to connect the two pipes to the same input.

 

 

From: Sofer, Tovi [mailto:[hidden email]]
Sent: Monday, January 01, 2018 8:51 AM
To: [hidden email]
Subject: Two operators consuming from same stream

 

Hi group,

 

We have the following graph below, on which we added metrics for latency calculation.

We have two streams which are consumed by two operators:

·         ordersStream and pricesStream – they are both consumed by two operators: CoMapperA and CoMapperB, each using connect.

 

Initially we thought that for stream consumed by two operators – that we need to duplicate the stream to two separate streams, so we did it using split as below.

Then we understood it is not a must , and two operators can consume same stream, so we removed the duplicate part.

However – when checking latency – we found that latency with duplicated streams was much better than without duplication (about twice).

 

My questions:

·         Is the improved latency related to check pointing separately on those streams ?

·         What is the cons of using the duplication if it has better latency? Are we harming the state correctness in any way?

 

Additional Info:

The two graphs configuration appear exactly the same in execution plan\web UI:

 

sourceOrders.keyBy,CoMapperA,OrdersStreams
 

 


prsssssssssssssss

sourcePrices.keyBy,CoMapperB,pricesStreams
 

 

 

 

 

 

 

 


Code without duplication looks something like:

KeyedStream<Order> orderKeyedStream = ordersStream.keyBy(field);

KeyedStream<Price> pricesKeyedStream = pricesStream.keyBy(field);

 

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperA);

orderKeyedStream.connect(pricesKeyedStream).flatMap(mapperB);

 

 

Code used for duplication:

(We duplicate streams and then do connect of pricesStreamA with ordersStreamA, and pricesStreamA with ordersStreamB, and keyBy as part of connect, similar to above).

//duplicate prices streams  

        SplitStream<Price> pricesStream = pricesStream

                .split( price -> ImmutableList.of("pricesStreamA "," pricesStreamB ") );

 

        DataStream<Price> pricesStreamA = pricesStreams.select("pricesStreamA");

        DataStream< Price > pricesStreamB= pricesStreams.select("pricesStreamB");

 

 

Thanks,

Tovi

 

 

 

 

 


image002.emz (1K) Download Attachment
image003.emz (1K) Download Attachment
image004.emz (1K) Download Attachment
image016.emz (1K) Download Attachment
image017.emz (1K) Download Attachment
image018.emz (1K) Download Attachment
image019.emz (1K) Download Attachment