Joining data in Streaming

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Joining data in Streaming

Marchant, Hayden
We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.


Thanks,
Hayden Marchant


Reply | Threaded
Open this post in threaded view
|

Re: Joining data in Streaming

Stefan Richter
Hi,

as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this.

Best,
Stefan

> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <[hidden email]>:
>
> We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.
>
>
> Thanks,
> Hayden Marchant
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Joining data in Streaming

Xingcan Cui
In reply to this post by Marchant, Hayden
Hi Hayden,

To perform a full-history join on two streams has not been natively supported now. 

As a workaround, you may implement a CoProcessFunction and cache the records from both sides in states until the stream with fewer data has been fully cached. Then you could safely clear the cache for the "larger stream", which should have produced completed results, and perform a nested loop join (i.e., whenever comes a new record, join it with the fully cached set).

Hope this helps.

Best,
Xingcan

On Tue, Jan 30, 2018 at 7:42 PM, Marchant, Hayden <[hidden email]> wrote:
We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.


Thanks,
Hayden Marchant



Reply | Threaded
Open this post in threaded view
|

RE: Joining data in Streaming

Marchant, Hayden
In reply to this post by Stefan Richter
Stefan,

So are we essentially saying that in this case, for now, I should stick to DataSet / Batch Table API?

Thanks,
Hayden

-----Original Message-----
From: Stefan Richter [mailto:[hidden email]]
Sent: Tuesday, January 30, 2018 4:18 PM
To: Marchant, Hayden [ICG-IT] <[hidden email]>
Cc: [hidden email]; Aljoscha Krettek <[hidden email]>
Subject: Re: Joining data in Streaming

Hi,

as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this.

Best,
Stefan

> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <[hidden email]>:
>
> We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.
>
>
> Thanks,
> Hayden Marchant
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Joining data in Streaming

Stefan Richter
Hi,

if the workarounds that Xingcan and me mentioned are no options for your use-case, then I think this might currently be the better option. But I would expect some better support for stream joins in the near future.

Best,
Stefan

> Am 31.01.2018 um 07:04 schrieb Marchant, Hayden <[hidden email]>:
>
> Stefan,
>
> So are we essentially saying that in this case, for now, I should stick to DataSet / Batch Table API?
>
> Thanks,
> Hayden
>
> -----Original Message-----
> From: Stefan Richter [mailto:[hidden email]]
> Sent: Tuesday, January 30, 2018 4:18 PM
> To: Marchant, Hayden [ICG-IT] <[hidden email]>
> Cc: [hidden email]; Aljoscha Krettek <[hidden email]>
> Subject: Re: Joining data in Streaming
>
> Hi,
>
> as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this.
>
> Best,
> Stefan
>
>> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <[hidden email]>:
>>
>> We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.
>>
>>
>> Thanks,
>> Hayden Marchant
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Joining data in Streaming

Steven Wu
There is also a discussion of side input

I would load the smaller data set as static reference data set. Then you can just do single source streaming of the larger data set.

On Wed, Jan 31, 2018 at 1:09 AM, Stefan Richter <[hidden email]> wrote:
Hi,

if the workarounds that Xingcan and me mentioned are no options for your use-case, then I think this might currently be the better option. But I would expect some better support for stream joins in the near future.

Best,
Stefan

> Am 31.01.2018 um 07:04 schrieb Marchant, Hayden <[hidden email]>:
>
> Stefan,
>
> So are we essentially saying that in this case, for now, I should stick to DataSet / Batch Table API?
>
> Thanks,
> Hayden
>
> -----Original Message-----
> From: Stefan Richter [mailto:[hidden email]]
> Sent: Tuesday, January 30, 2018 4:18 PM
> To: Marchant, Hayden [ICG-IT] <[hidden email]>
> Cc: [hidden email]; Aljoscha Krettek <[hidden email]>
> Subject: Re: Joining data in Streaming
>
> Hi,
>
> as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this.
>
> Best,
> Stefan
>
>> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <[hidden email]>:
>>
>> We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.
>>
>>
>> Thanks,
>> Hayden Marchant
>>
>>
>


Reply | Threaded
Open this post in threaded view
|

RE: Joining data in Streaming

Marchant, Hayden

Thanks for all the ideas!!

 

From: Steven Wu [mailto:[hidden email]]
Sent: Tuesday, February 06, 2018 3:46 AM
To: Stefan Richter <[hidden email]>
Cc: Marchant, Hayden [ICG-IT] <[hidden email]>; [hidden email]; Aljoscha Krettek <[hidden email]>
Subject: Re: Joining data in Streaming

 

There is also a discussion of side input

 

I would load the smaller data set as static reference data set. Then you can just do single source streaming of the larger data set.

 

On Wed, Jan 31, 2018 at 1:09 AM, Stefan Richter <[hidden email]> wrote:

Hi,

if the workarounds that Xingcan and me mentioned are no options for your use-case, then I think this might currently be the better option. But I would expect some better support for stream joins in the near future.

Best,
Stefan


> Am 31.01.2018 um 07:04 schrieb Marchant, Hayden <[hidden email]>:
>
> Stefan,
>
> So are we essentially saying that in this case, for now, I should stick to DataSet / Batch Table API?
>
> Thanks,
> Hayden
>
> -----Original Message-----
> From: Stefan Richter [mailto:[hidden email]]
> Sent: Tuesday, January 30, 2018 4:18 PM
> To: Marchant, Hayden [ICG-IT] <[hidden email]>
> Cc: [hidden email]; Aljoscha Krettek <[hidden email]>
> Subject: Re: Joining data in Streaming
>
> Hi,
>
> as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this.
>
> Best,
> Stefan
>
>> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden <[hidden email]>:
>>
>> We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available.  In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key.   Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet.
>>
>>
>> Thanks,
>> Hayden Marchant
>>
>>
>