Keep Model in Operator instance up to date

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Keep Model in Operator instance up to date

tambunanw
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers

--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra-2
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;gyfora@apache.org&#39;);" target="_blank">gyfora@...> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;if05041@gmail.com&#39;);" target="_blank">if05041@...> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula
On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

I have another question. So if i cache something on the operator, to keep it up to date,  i will always need to add and connect another stream of changes to the operator ?

Is this right for every case ? 

Cheers

On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra
Hi,

I don't think I fully understand your question, could you please try to be a little more specific?

I assume by caching you mean that you keep the current model as an operator state. Why would you need to add new streams in this case?

I might be slow to answer as I am currently on vacation without stable internet connection.

Cheers,
Gyula
On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have another question. So if i cache something on the operator, to keep it up to date,  i will always need to add and connect another stream of changes to the operator ?

Is this right for every case ? 

Cheers

On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

I have a couple of operator on the pipeline. Filter, mapper, flatMap, and each of these operator contains some cache data. 

So i think that means for every other operator on the pipeline, i will need to add a new stream to update each cache data. 


Cheers

On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra <[hidden email]> wrote:
Hi,

I don't think I fully understand your question, could you please try to be a little more specific?

I assume by caching you mean that you keep the current model as an operator state. Why would you need to add new streams in this case?

I might be slow to answer as I am currently on vacation without stable internet connection.

Cheers,
Gyula

On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have another question. So if i cache something on the operator, to keep it up to date,  i will always need to add and connect another stream of changes to the operator ?

Is this right for every case ? 

Cheers

On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

Gyula Fóra
Hi

You are right, if all operators need continuous updates than the most straightforward way is to push all the updates to the operators like you described.

If the cached data is the same for all operators and is small enough you can centralize the updates in a dedicated operator and push the updated data to the operators every once in a while.

Cheers
Gyula


On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have a couple of operator on the pipeline. Filter, mapper, flatMap, and each of these operator contains some cache data. 

So i think that means for every other operator on the pipeline, i will need to add a new stream to update each cache data. 


Cheers

On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra <[hidden email]> wrote:
Hi,

I don't think I fully understand your question, could you please try to be a little more specific?

I assume by caching you mean that you keep the current model as an operator state. Why would you need to add new streams in this case?

I might be slow to answer as I am currently on vacation without stable internet connection.

Cheers,
Gyula

On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have another question. So if i cache something on the operator, to keep it up to date,  i will always need to add and connect another stream of changes to the operator ?

Is this right for every case ? 

Cheers

On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--



--



--
Reply | Threaded
Open this post in threaded view
|

Re: Keep Model in Operator instance up to date

tambunanw
Hi Gyula, 

Thanks a lot. That's really help a lot ! 

Have a great vacation

Cheers

On Fri, Aug 21, 2015 at 7:47 PM, Gyula Fóra <[hidden email]> wrote:
Hi

You are right, if all operators need continuous updates than the most straightforward way is to push all the updates to the operators like you described.

If the cached data is the same for all operators and is small enough you can centralize the updates in a dedicated operator and push the updated data to the operators every once in a while.

Cheers
Gyula



On Thu, Aug 20, 2015 at 4:31 PM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have a couple of operator on the pipeline. Filter, mapper, flatMap, and each of these operator contains some cache data. 

So i think that means for every other operator on the pipeline, i will need to add a new stream to update each cache data. 


Cheers

On Thu, Aug 20, 2015 at 5:33 PM, Gyula Fóra <[hidden email]> wrote:
Hi,

I don't think I fully understand your question, could you please try to be a little more specific?

I assume by caching you mean that you keep the current model as an operator state. Why would you need to add new streams in this case?

I might be slow to answer as I am currently on vacation without stable internet connection.

Cheers,
Gyula

On Thu, Aug 20, 2015 at 5:36 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

I have another question. So if i cache something on the operator, to keep it up to date,  i will always need to add and connect another stream of changes to the operator ?

Is this right for every case ? 

Cheers

On Wed, Aug 19, 2015 at 3:21 PM, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

That's really helpful. The docs is improving so much since the last time (0.9). 

Thanks a lot !

Cheers

On Wed, Aug 19, 2015 at 3:07 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

If it is always better to check the events against a more up-to-date model (even if the events we are checking arrived before the update) then it is fine to keep the model outside of the system.

In this case we need to make sure that we can push the updates to the external system consistently. If you are using the PersistentKafkaSource for instance it can happen that some messages are replayed in case of failure. In this case you need to make sure that you remove duplicate updates or have idempotent updates.

You can read about the checkpoint mechanism in the Flink website: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

Cheers,
Gyula

On Wed, Aug 19, 2015 at 9:56 AM Welly Tambunan <[hidden email]> wrote:
Thanks Gyula, 

Another question i have.. 

... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Could you please elaborate more about this one ? 

Cheers

On Wed, Aug 19, 2015 at 2:52 PM, Gyula Fóra <[hidden email]> wrote:
In that case I would apply a map to wrap in some common type, like a n Either<t1,t2> before the union.

And then in the coflatmap you can unwrap it.
On Wed, Aug 19, 2015 at 9:50 AM Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks.

However update1 and update2 have a different type. Based on my understanding, i don't think we can use union. How can we handle this one ? 

We like to create our event strongly type to get the domain language captured. 


Cheers

On Wed, Aug 19, 2015 at 2:37 PM, Gyula Fóra <[hidden email]> wrote:
Hey,

One input of your co-flatmap would be model updates and the other input would be events to check against the model if I understand correctly.

This means that if your model updates come from more than one stream you need to union them into a single stream before connecting them with the event stream and applying the coatmap.

DataStream updates1 = ....
DataStream updates2 = ....
DataStream events = ....

events.connect(updates1.union(updates2).broadcast()).flatMap(...)

Does this answer your question?

Gyula


On Wednesday, August 19, 2015, Welly Tambunan <[hidden email]> wrote:
Hi Gyula, 

Thanks for your response. 

However the model can received multiple event for update. How can we do that with co-flatmap as i can see the connect API only received single datastream ?


... while external model updates would be tricky to keep consistent. 
Is that still the case if the Operator treat the external model as read-only ? We create another stream that will update the external model separately. 

Cheers

On Wed, Aug 19, 2015 at 2:05 PM, Gyula Fóra <[hidden email]> wrote:
Hey!

I think it is safe to say that the best approach in this case is creating a co-flatmap that will receive updates on one input. The events should probably be broadcasted in this case so you can check in parallel.

This approach can be used effectively with Flink's checkpoint mechanism, while external model updates would be tricky to keep consistent.

Cheers,
Gyula




On Wed, Aug 19, 2015 at 8:44 AM Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a streaming computation that required to validate the data stream against the model provided by the user.  

Right now what I have done is to load the model into flink operator and then validate against it. However the model can be updated and changed frequently. Fortunately we always publish this event to RabbitMQ. 

I think we can 

  1. Create RabbitMq listener for model changed event from inside the operator, then update the model if event arrived. 

    But i think this will create race condition if not handle correctly and it seems odd to keep this

  2. We can move the model into external in external memory cache storage and keep the model up to date using flink. So the operator will retrieve that from memory cache

  3. Create two stream and using co operator for managing the shared state. 

What is your suggestion on keeping the state up to date from external event ? Is there some kind of best practice for maintaining model up to date on streaming operator ? 

Thanks a lot


Cheers



--



--



--



--



--



--



--