Reverse of KeyBy

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Reverse of KeyBy

Aggarwal, Ajay

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Congxian Qiu
Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?


Best,
Congxian


Aggarwal, Ajay <[hidden email]> 于2019年2月2日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Aggarwal, Ajay

Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?

 

 

From: Congxian Qiu <[hidden email]>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

 

Best,

Congxian

 

 

Aggarwal, Ajay <[hidden email]> 201922日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Fabian Hueske-2
Hi,

Calling keyBy twice will not work, because the second call overrides the first.
You can keyBy on a composite key (MyKey, LargeMessageId).

You can do the following

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())
  .keyBy(MyKey)
  .???

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

Best, Fabian


Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?

 

 

From: Congxian Qiu <[hidden email]>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

 

Best,

Congxian

 

 

Aggarwal, Ajay <[hidden email]> 201922日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Aggarwal, Ajay

Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

 

So both of you are suggesting I do the following

 

InputStream
  (1)   .keyBy (LargeMessageId)    

  (2)   .flatMap(new MyReassemblyFunction())

  (3)   .keyBy(MyKey)                     

  (4)   .???

 

Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below:

 

First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already?

Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage.

Third operator (3):  At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order.

 

Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream?

 

Ajay

 

 

From: Fabian Hueske <[hidden email]>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: Congxian Qiu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

 

Hi,

 

Calling keyBy twice will not work, because the second call overrides the first.

You can keyBy on a composite key (MyKey, LargeMessageId).

 

You can do the following

 

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())

  .keyBy(MyKey)

  .???

 

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

 

Best, Fabian

 

 

Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?

 

 

From: Congxian Qiu <[hidden email]>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

 

Best,

Congxian

 

 

Aggarwal, Ajay <[hidden email]> 201922日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Fabian Hueske-2
Hi,

Subpartitions are just a logical concept. When you keyBy a stream, the next operator will be applied in a keyed context. After that, the data might still be partitioned, but the keyed context is gone.
Is this what you mean by automatic "joining of partitioned sub-streams"?

With the program that you shared before, the following happens:

(1) The records are partitioned on the LargeMessageId, i.e., all records with the same LargeMessageId are sent to the same task.
(2) The task collects all fragements of a large message in keyed state. The state is always scoped to the key (LargeMessageId). Once it collected all fragments, it emits a complete message.
(3) The completed messages are partitioned on MyKey, i.e., all messages with the same MyKey are sent to the same task.
(4) A function can collect and sort the messages to process them in order.

Since you shuffle the records twice you cannot (in general) expect the records to be still in order.

Best, Fabian


Am Mo., 4. Feb. 2019 um 16:08 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

 

So both of you are suggesting I do the following

 

InputStream
  (1)   .keyBy (LargeMessageId)    

  (2)   .flatMap(new MyReassemblyFunction())

  (3)   .keyBy(MyKey)                     

  (4)   .???

 

Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below:

 

First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already?

Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage.

Third operator (3):  At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order.

 

Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream?

 

Ajay

 

 

From: Fabian Hueske <[hidden email]>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: Congxian Qiu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

 

Hi,

 

Calling keyBy twice will not work, because the second call overrides the first.

You can keyBy on a composite key (MyKey, LargeMessageId).

 

You can do the following

 

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())

  .keyBy(MyKey)

  .???

 

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

 

Best, Fabian

 

 

Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?

 

 

From: Congxian Qiu <[hidden email]>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

 

Best,

Congxian

 

 

Aggarwal, Ajay <[hidden email]> 201922日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay

 

Reply | Threaded
Open this post in threaded view
|

Re: Reverse of KeyBy

Aggarwal, Ajay

Thanks Fabian for the explanation. Let me do some more reading so what you said can sync-in little more.

 

From: Fabian Hueske <[hidden email]>
Date: Monday, February 4, 2019 at 10:22 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: Congxian Qiu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy


Hi,

 

Subpartitions are just a logical concept. When you keyBy a stream, the next operator will be applied in a keyed context. After that, the data might still be partitioned, but the keyed context is gone.

Is this what you mean by automatic "joining of partitioned sub-streams"?

 

With the program that you shared before, the following happens:

 

(1) The records are partitioned on the LargeMessageId, i.e., all records with the same LargeMessageId are sent to the same task.

(2) The task collects all fragements of a large message in keyed state. The state is always scoped to the key (LargeMessageId). Once it collected all fragments, it emits a complete message.

(3) The completed messages are partitioned on MyKey, i.e., all messages with the same MyKey are sent to the same task.

(4) A function can collect and sort the messages to process them in order.

 

Since you shuffle the records twice you cannot (in general) expect the records to be still in order.

 

Best, Fabian

 

 

Am Mo., 4. Feb. 2019 um 16:08 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Yes, LargeMessageId is globally unique, so I shouldn’t need composite key.

 

So both of you are suggesting I do the following

 

InputStream
  (1)   .keyBy (LargeMessageId)    

  (2)   .flatMap(new MyReassemblyFunction())

  (3)   .keyBy(MyKey)                     

  (4)   .???

 

Let me explain my doubt (perhaps due to lack of understanding). By the way I am expecting to run this job with parallelism > 1. My understanding of above is as below:

 

First operator (1): First KeyBy (LargeMessageId) will partition the input stream by LargeMessageId. Right here messages with same MyKey value will be spread across these partitions. Is it not a problem already?

Second operator (2) : run flatMap(new MyReassemblyFunction()) on these partitions. Here each one will produce exactly one LargeMessage.

Third operator (3):  At this point I don’t understand the point of second KeyBy(MyKey)? My understanding is that this will further partition the already partitioned input stream (from 1 above) and will not help me, as I need to process all LargeMessages for a given MyKey in order.

 

Is there an implicit assumption here that the flatMap operation (2) above will automatically join the partitioned sub-streams from first KeyBy into a single stream?

 

Ajay

 

 

From: Fabian Hueske <[hidden email]>
Date: Monday, February 4, 2019 at 9:17 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: Congxian Qiu <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

 

Hi,

 

Calling keyBy twice will not work, because the second call overrides the first.

You can keyBy on a composite key (MyKey, LargeMessageId).

 

You can do the following

 

InputStream
  .keyBy ( (MyKey, LargeMessageId) ) \\ use composite key
  .flatMap(new MyReassemblyFunction())

  .keyBy(MyKey)

  .???

 

If LargeMessageId is unique across MyKey (there are not two large messages with the same LargeMessageId and different MyKey values), you don't need a composite key but can use keyBy(LargeMessageId).

 

Best, Fabian

 

 

Am Mo., 4. Feb. 2019 um 15:05 Uhr schrieb Aggarwal, Ajay <[hidden email]>:

Thank you for your suggestion. But per my understanding if I KeyBy (LargeMessageId) first then I can’t guarantee order of LargeMessages per MyKey. Because MyKey messages will get spread over multiple partitions by LargeMessageId. Am I correct?

 

 

From: Congxian Qiu <[hidden email]>
Date: Sunday, February 3, 2019 at 6:40 AM
To: "Aggarwal, Ajay" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Reverse of KeyBy

Hi Aggarwal

   How about keyBy(LargeMessageID) first, then assemble these fragments back into LargeMessages, then keyBy(MeyKey)?

 

Best,

Congxian

 

 

Aggarwal, Ajay <[hidden email]> 201922日周六 上午5:42写道:

I am new to Flink. I am trying to figure out if there is an operator that provides reverse functionality of KeyBy.  Using KeyBy you can split a stream into disjoint partitions. Is there a way to bring those partitions back into a single stream?

 

Let me explain using my use case below.

 

My Input stream contains messages with following information

{

    MyKey

    LargeMessageId 

    LargeMessageFragment

    LargeMessageTimestamp // yes same timestamp repeated with each fragment

 

    (… there are other fields, but I am leaving them out as they are not important for this discussion)

}

 

 

My LargeMessage is fragmented at source into fragments. I have 2 main requirements

  1. Reassemble these fragments back into LargeMessages
  2. For each MyKey value, process the LargeMessages in the order based on time associated with them.

 

 

So I am thinking

 

InputStream

  .KeyBy (MyKey)

  .KeyBy (LargeMessageId)

  .flatMap(new MyReassemblyFunction())

  . ???

 

At this point I need to throw all assembled LargeMessages for a given MyKey back into a common partition, so I can try to process them in order.  This is where I am stuck. Any help from the experts will be much appreciated.

 

Ajay