static/dynamic lookups in flink streaming

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

static/dynamic lookups in flink streaming

sandeep6
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep

Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

Fabian Hueske-2
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep


Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

sandeep6
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep


Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

Fabian Hueske-2
OK, I see. Yes, you can do that with Flink. It's actually a very common use case.

You can store the names in operator state and Flink takes care of checkpointing the state and restoring it in case of a failure.
In fact, the operator state is persisted in the state backends you mentioned before.

Best, Fabian

2016-12-21 15:02 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep



Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

sandeep6
As a follow up question, can we populate the operator state from an external source?

My use case is as follows: I have a flink streaming process with Kafka as a source. I only have ids coming from kafka messages. My look ups (<id,name>) which is a static map come from a different source. I would like to use those lookups while applying operators on stream from Kafka.

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 6:17 AM, Fabian Hueske <[hidden email]> wrote:
OK, I see. Yes, you can do that with Flink. It's actually a very common use case.

You can store the names in operator state and Flink takes care of checkpointing the state and restoring it in case of a failure.
In fact, the operator state is persisted in the state backends you mentioned before.

Best, Fabian

2016-12-21 15:02 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep




Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

Fabian Hueske-2
You could read the map from a file in the open method of a RichMapFunction.
The open method is called before the first record is processed and can put data into the operator state.

The downside of this approach is that the data is replicated in each operator, i.e., each operator holds a full copy of the map.
On the other hand, you do not need to shuffle the data because each parallel task can do the look-up.
If your <id, name> map is small, this would be the preferred approach.

Best, Fabian

2016-12-21 18:46 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
As a follow up question, can we populate the operator state from an external source?

My use case is as follows: I have a flink streaming process with Kafka as a source. I only have ids coming from kafka messages. My look ups (<id,name>) which is a static map come from a different source. I would like to use those lookups while applying operators on stream from Kafka.

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 6:17 AM, Fabian Hueske <[hidden email]> wrote:
OK, I see. Yes, you can do that with Flink. It's actually a very common use case.

You can store the names in operator state and Flink takes care of checkpointing the state and restoring it in case of a failure.
In fact, the operator state is persisted in the state backends you mentioned before.

Best, Fabian

2016-12-21 15:02 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep





Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

sandeep6
Thanks for the solution Fab. My map would be substantially large. So I wouldn't want to replicate it in each operator. I will probably add a layer of redis cache adn use it in streaming process. Do you foresee any problems with that? 

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 9:52 AM, Fabian Hueske <[hidden email]> wrote:
You could read the map from a file in the open method of a RichMapFunction.
The open method is called before the first record is processed and can put data into the operator state.

The downside of this approach is that the data is replicated in each operator, i.e., each operator holds a full copy of the map.
On the other hand, you do not need to shuffle the data because each parallel task can do the look-up.
If your <id, name> map is small, this would be the preferred approach.

Best, Fabian

2016-12-21 18:46 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
As a follow up question, can we populate the operator state from an external source?

My use case is as follows: I have a flink streaming process with Kafka as a source. I only have ids coming from kafka messages. My look ups (<id,name>) which is a static map come from a different source. I would like to use those lookups while applying operators on stream from Kafka.

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 6:17 AM, Fabian Hueske <[hidden email]> wrote:
OK, I see. Yes, you can do that with Flink. It's actually a very common use case.

You can store the names in operator state and Flink takes care of checkpointing the state and restoring it in case of a failure.
In fact, the operator state is persisted in the state backends you mentioned before.

Best, Fabian

2016-12-21 15:02 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep






Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

Fabian Hueske-2
That approach should work as well.
The upcoming Flink 1.2.0 release will feature a function for asynchronous operations, i.e., you can have multiple concurrent Redis requests, without losing the fault tolerance guarantees.

Another alternative is to store the map in key-partitioned operator state of a CoFlatMapFunction. One input is the stream with the IDs, the other stream are the entries of the map which are inserted into the operator state.
This approach would require some initial buffering of the ID stream though.

Best, Fabian

2016-12-21 19:06 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Thanks for the solution Fab. My map would be substantially large. So I wouldn't want to replicate it in each operator. I will probably add a layer of redis cache adn use it in streaming process. Do you foresee any problems with that? 

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 9:52 AM, Fabian Hueske <[hidden email]> wrote:
You could read the map from a file in the open method of a RichMapFunction.
The open method is called before the first record is processed and can put data into the operator state.

The downside of this approach is that the data is replicated in each operator, i.e., each operator holds a full copy of the map.
On the other hand, you do not need to shuffle the data because each parallel task can do the look-up.
If your <id, name> map is small, this would be the preferred approach.

Best, Fabian

2016-12-21 18:46 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
As a follow up question, can we populate the operator state from an external source?

My use case is as follows: I have a flink streaming process with Kafka as a source. I only have ids coming from kafka messages. My look ups (<id,name>) which is a static map come from a different source. I would like to use those lookups while applying operators on stream from Kafka.

Thanks,
Sandeep

On Wed, Dec 21, 2016 at 6:17 AM, Fabian Hueske <[hidden email]> wrote:
OK, I see. Yes, you can do that with Flink. It's actually a very common use case.

You can store the names in operator state and Flink takes care of checkpointing the state and restoring it in case of a failure.
In fact, the operator state is persisted in the state backends you mentioned before.

Best, Fabian

2016-12-21 15:02 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi Fabian,

I meant look ups like IDs to names. For example if I have IDs coming through the stream and if I want to replace them with corresponding names stored in cache or somewhere within flink. 

Thanks,
Sandeep

On Dec 21, 2016 12:35 AM, "Fabian Hueske" <[hidden email]> wrote:
Hi Sandeep,

I'm sorry but I think I do not understand your question.
What do you mean by static or dynamic look ups? Do you want to access an external data store and cache data?

Can you give a bit more detail about your use?

Best, Fabian

2016-12-20 23:07 GMT+01:00 Meghashyam Sandeep V <[hidden email]>:
Hi there,

I know that there are various state backends to persist state. Is there a similar way to persist static/dynamic look ups and use them while streaming the data in Flink?

Thanks,
Sandeep







Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

ghostmickey
In reply to this post by Fabian Hueske-2
If your <id, name> map is small, this would be the preferred approach.
============================================
If it is very big(>100000000 records), do you have any suggestion?
Reply | Threaded
Open this post in threaded view
|

Re: static/dynamic lookups in flink streaming

Jain, Ankit
In reply to this post by sandeep6
What if we copy the big data set to HDFS on start of cluster (eg EMR if using AWS) and then use that to build distributed operatot state in Flink instead of calling the external store?

How does flink contributors feel about that?

Thanks
Ankit

On 5/14/17, 8:17 PM, "yunfan123" <[hidden email]> wrote:

    The 1.2.0 is released. Can you give an example for the feature function
    asynchronous operations?
   
   
   
    --
    View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-tp10726p13133.html
    Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.