Join DataStream with dimension tables?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Join DataStream with dimension tables?

Srikanth
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files. 

As per this jira its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Join DataStream with dimension tables?

Aljoscha Krettek
Hi Srikanth,
that's an interesting use case. It's not possible to do something like this out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something like this can be programmed using the API that is currently available. It requires writing a custom operator but it is still somewhat succinct:

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <[hidden email]> wrote:
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files. 

As per this jira its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Join DataStream with dimension tables?

Lohith Samaga M

Hi,
Cassandra could be used as a distributed cache.

Lohith.

Sent from my Sony Xperia™ smartphone



---- Aljoscha Krettek wrote ----

Hi Srikanth,
that's an interesting use case. It's not possible to do something like this out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something like this can be programmed using the API that is currently available. It requires writing a custom operator but it is still somewhat succinct:

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <[hidden email]> wrote:
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files. 

As per this jira its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth


Information transmitted by this e-mail is proprietary to Mphasis, its associated companies and/ or its customers and is intended
for use only by the individual or entity to which it is addressed, and may contain information that is privileged, confidential or
exempt from disclosure under applicable law. If you are not the intended recipient or it appears that this mail has been forwarded
to you without proper authority, you are notified that any use or dissemination of this information in any manner is strictly
prohibited. In such cases, please notify us immediately at [hidden email] and delete this mail from your records.

Reply | Threaded
Open this post in threaded view
|

Re: Join DataStream with dimension tables?

Srikanth
Aljoscha,

Looks like a potential solution. Feels a bit hacky though.

Didn't quite understand why a list backed store is used to for static input buffer? Join(inner) should emit only one record if there is a key match.

Is it a property of the system to emit Long.MAX_VALUE watermark when a finite stream source ends?
If so can I do something like this to read static file in parallel?
    val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)

Shouldn't we also override checkpoint handling of custom operator? If so, should the checkpoint wait/fail during the initial read phase?

Lohith,
Adding a component like Cassandra just for this feels like a overkill. But if I can't find a suitable way to do this, I might use it( or Redis probably).

Srikanth



On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M <[hidden email]> wrote:

Hi,
Cassandra could be used as a distributed cache.

Lohith.

Sent from my Sony Xperia™ smartphone



---- Aljoscha Krettek wrote ----


Hi Srikanth,
that's an interesting use case. It's not possible to do something like this out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something like this can be programmed using the API that is currently available. It requires writing a custom operator but it is still somewhat succinct:

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <[hidden email]> wrote:
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files. 

As per this jira its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth


Information transmitted by this e-mail is proprietary to Mphasis, its associated companies and/ or its customers and is intended
for use only by the individual or entity to which it is addressed, and may contain information that is privileged, confidential or
exempt from disclosure under applicable law. If you are not the intended recipient or it appears that this mail has been forwarded
to you without proper authority, you are notified that any use or dissemination of this information in any manner is strictly
prohibited. In such cases, please notify us immediately at [hidden email] and delete this mail from your records.


Reply | Threaded
Open this post in threaded view
|

Re: Join DataStream with dimension tables?

Srikanth
Aljoscha,

Your thoughts on this?

Srikanth

On Mon, Apr 25, 2016 at 8:08 PM, Srikanth <[hidden email]> wrote:
Aljoscha,

Looks like a potential solution. Feels a bit hacky though.

Didn't quite understand why a list backed store is used to for static input buffer? Join(inner) should emit only one record if there is a key match.

Is it a property of the system to emit Long.MAX_VALUE watermark when a finite stream source ends?
If so can I do something like this to read static file in parallel?
    val meta = env.readTextFile("S3:///path/to/file").map(...).keyBy(...)

Shouldn't we also override checkpoint handling of custom operator? If so, should the checkpoint wait/fail during the initial read phase?

Lohith,
Adding a component like Cassandra just for this feels like a overkill. But if I can't find a suitable way to do this, I might use it( or Redis probably).

Srikanth



On Fri, Apr 22, 2016 at 12:20 PM, Lohith Samaga M <[hidden email]> wrote:

Hi,
Cassandra could be used as a distributed cache.

Lohith.

Sent from my Sony Xperia™ smartphone



---- Aljoscha Krettek wrote ----


Hi Srikanth,
that's an interesting use case. It's not possible to do something like this out-of-box but I'm actually working on API for such cases.

In the mean time, I programmed a short example that shows how something like this can be programmed using the API that is currently available. It requires writing a custom operator but it is still somewhat succinct:

Please let me know if you have any questions.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 03:06 Srikanth <[hidden email]> wrote:
Hello,

I have a fairly typical streaming use case but not able to figure how to implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension tables which are saved as flat files. 

As per this jira its not possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth


Information transmitted by this e-mail is proprietary to Mphasis, its associated companies and/ or its customers and is intended
for use only by the individual or entity to which it is addressed, and may contain information that is privileged, confidential or
exempt from disclosure under applicable law. If you are not the intended recipient or it appears that this mail has been forwarded
to you without proper authority, you are notified that any use or dissemination of this information in any manner is strictly
prohibited. In such cases, please notify us immediately at [hidden email] and delete this mail from your records.