Combining streams with static data and using REST API as a sink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Combining streams with static data and using REST API as a sink

Josh

Hi all,

I am new to Flink and have a couple of questions which I've had trouble finding answers to online. Any advice would be much appreciated!

  1. What's a typical way of handling the scenario where you want to join streaming data with a (relatively) static data source? For example, if I have a stream 'orders' where each order has an 'item_id', and I want to join this stream with my database of 'items'. The database of items is mostly static (with perhaps a few new items added every day). The database can be retrieved either directly from a standard SQL database (postgres) or via a REST call. I guess one way to handle this would be to distribute the database of items with the Flink tasks, and to redeploy the entire job if the items database changes. But I think there's probably a better way to do it?
  2. I'd like my Flink job to output state to a REST API. (i.e. using the REST API as a sink). Updates would be incremental, e.g. the job would output tumbling window counts which need to be added to some property on a REST resource, so I'd probably implement this as a PATCH. I haven't found much evidence that anyone else has used a REST API as a Flink sink - is there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh

Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Rami Al-Isawi
Hi Josh,

I am no expert in Flink yet, but here are my thoughts on this:

1. what about you stream an event to flink everytime the DB of items have an update? then in some background thread you get the new data from the DB let it be through REST (if it is only few updates a day) then load the results in memory and there is your updated static data.

2. REST API are over HTTP, how that is possible to be a sink? does not sound like flink job at all to serve http requests. simply sink the results to some DB and have some component to read from DB and serve it as REST API.

-Rami

On 23 May 2016, at 16:22, Josh <[hidden email]> wrote:

Hi all,

I am new to Flink and have a couple of questions which I've had trouble finding answers to online. Any advice would be much appreciated!

  1. What's a typical way of handling the scenario where you want to join streaming data with a (relatively) static data source? For example, if I have a stream 'orders' where each order has an 'item_id', and I want to join this stream with my database of 'items'. The database of items is mostly static (with perhaps a few new items added every day). The database can be retrieved either directly from a standard SQL database (postgres) or via a REST call. I guess one way to handle this would be to distribute the database of items with the Flink tasks, and to redeploy the entire job if the items database changes. But I think there's probably a better way to do it?
  2. I'd like my Flink job to output state to a REST API. (i.e. using the REST API as a sink). Updates would be incremental, e.g. the job would output tumbling window counts which need to be added to some property on a REST resource, so I'd probably implement this as a PATCH. I haven't found much evidence that anyone else has used a REST API as a Flink sink - is there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Josh

Hi Rami,

Thanks for the fast reply.

  1. In your solution, would I need to create a new stream for 'item updates', and add it as a source of my Flink job? Then I would need to ensure item updates get broadcast to all nodes that are running my job and use them to update the in-memory items database? This sounds like it might be a good solution, but I'm not sure how the broadcast would work - it sounds like I'd need Flink broadcast variables, but it looks like there's no support for changing datasets at the moment: https://issues.apache.org/jira/browse/FLINK-3514 
  2. I don't understand why an HTTP sink isn't possible. Say the output of my job is 'number of items ordered per customer', then for each output I want to update a 'customer' in my database, incrementing their 'item_order_count'. What's wrong with doing that update in the Flink job via an HTTP REST call (updating the customer resource), rather than writing directly to a database? The reason I'd like to do it this way is to decouple the underlying database from Flink.

Josh


On Mon, May 23, 2016 at 2:35 PM, Al-Isawi Rami <[hidden email]> wrote:
Hi Josh,

I am no expert in Flink yet, but here are my thoughts on this:

1. what about you stream an event to flink everytime the DB of items have an update? then in some background thread you get the new data from the DB let it be through REST (if it is only few updates a day) then load the results in memory and there is your updated static data.

2. REST API are over HTTP, how that is possible to be a sink? does not sound like flink job at all to serve http requests. simply sink the results to some DB and have some component to read from DB and serve it as REST API.

-Rami

On 23 May 2016, at 16:22, Josh <[hidden email]> wrote:

Hi all,

I am new to Flink and have a couple of questions which I've had trouble finding answers to online. Any advice would be much appreciated!

  1. What's a typical way of handling the scenario where you want to join streaming data with a (relatively) static data source? For example, if I have a stream 'orders' where each order has an 'item_id', and I want to join this stream with my database of 'items'. The database of items is mostly static (with perhaps a few new items added every day). The database can be retrieved either directly from a standard SQL database (postgres) or via a REST call. I guess one way to handle this would be to distribute the database of items with the Flink tasks, and to redeploy the entire job if the items database changes. But I think there's probably a better way to do it?
  2. I'd like my Flink job to output state to a REST API. (i.e. using the REST API as a sink). Updates would be incremental, e.g. the job would output tumbling window counts which need to be added to some property on a REST resource, so I'd probably implement this as a PATCH. I haven't found much evidence that anyone else has used a REST API as a Flink sink - is there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Rami Al-Isawi
Hi,

1. I have no experience in broadcast variables, I suggest you give it a try.

2. I misunderstood you, I thought you were calling for Flink to serve the results and become REST API provider, where others can call those API. What you are saying now is that you want a sink that does HTTP calls to some REST API hosted somewhere else. Well, if the number of results are not in the order of thousands/sec then it is feasible to make HTTP calls. If you even ended up writing suck sink, I would be happy to review and discuss it.

-Rami

On 23 May 2016, at 17:06, Josh <[hidden email]> wrote:

Hi Rami,

Thanks for the fast reply.

  1. In your solution, would I need to create a new stream for 'item updates', and add it as a source of my Flink job? Then I would need to ensure item updates get broadcast to all nodes that are running my job and use them to update the in-memory items database? This sounds like it might be a good solution, but I'm not sure how the broadcast would work - it sounds like I'd need Flink broadcast variables, but it looks like there's no support for changing datasets at the moment: https://issues.apache.org/jira/browse/FLINK-3514 
  2. I don't understand why an HTTP sink isn't possible. Say the output of my job is 'number of items ordered per customer', then for each output I want to update a 'customer' in my database, incrementing their 'item_order_count'. What's wrong with doing that update in the Flink job via an HTTP REST call (updating the customer resource), rather than writing directly to a database? The reason I'd like to do it this way is to decouple the underlying database from Flink.

Josh


On Mon, May 23, 2016 at 2:35 PM, Al-Isawi Rami <[hidden email]> wrote:
Hi Josh,

I am no expert in Flink yet, but here are my thoughts on this:

1. what about you stream an event to flink everytime the DB of items have an update? then in some background thread you get the new data from the DB let it be through REST (if it is only few updates a day) then load the results in memory and there is your updated static data.

2. REST API are over HTTP, how that is possible to be a sink? does not sound like flink job at all to serve http requests. simply sink the results to some DB and have some component to read from DB and serve it as REST API.

-Rami

On 23 May 2016, at 16:22, Josh <[hidden email]> wrote:

Hi all,

I am new to Flink and have a couple of questions which I've had trouble finding answers to online. Any advice would be much appreciated!

  1. What's a typical way of handling the scenario where you want to join streaming data with a (relatively) static data source? For example, if I have a stream 'orders' where each order has an 'item_id', and I want to join this stream with my database of 'items'. The database of items is mostly static (with perhaps a few new items added every day). The database can be retrieved either directly from a standard SQL database (postgres) or via a REST call. I guess one way to handle this would be to distribute the database of items with the Flink tasks, and to redeploy the entire job if the items database changes. But I think there's probably a better way to do it?
  2. I'd like my Flink job to output state to a REST API. (i.e. using the REST API as a sink). Updates would be incremental, e.g. the job would output tumbling window counts which need to be added to some property on a REST resource, so I'd probably implement this as a PATCH. I haven't found much evidence that anyone else has used a REST API as a Flink sink - is there a reason why this might be a bad idea?

Thanks for any advice on these,

Josh


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.


Disclaimer: This message and any attachments thereto are intended solely for the addressed recipient(s) and may contain confidential information. If you are not the intended recipient, please notify the sender by reply e-mail and delete the e-mail (including any attachments thereto) without producing, distributing or retaining any copies thereof. Any review, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient(s) is prohibited. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Maximilian Michels
In reply to this post by Josh
Hi Josh,

1) Use a RichFunction which has an `open()` method to load data (e.g. from a database) at runtime before the processing starts.

2) No that's fine. If you want your Rest API Sink to interplay with checkpointing (for fault-tolerance), this is a bit tricky though depending on the guarantees you want to have. Typically, you would have "at least once" or "exactly once" semantics on the state. In Flink, this is easy to achieve, it's a bit harder for outside systems.

"At Least Once"

For example, if you increment a counter in a database, this count will be off if you recover your job in the case of a failure. You can checkpoint the current value of the counter and restore this value on a failure (using the Checkpointed interface). However, your counter might decrease temporarily when you resume from a checkpoint (until the counter has caught up again).

"Exactly Once"

If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid) in your database.

| cid | count |
|-----+-------|
|   0 |     3 |
|   1 |    11 |
|   2 |    20 |
|   3 |   120 |
|   4 |   137 |
|   5 |   158 |


You would then always read the newest cid value for presentation. You would only write to the database once you know you have completed the checkpoint (CheckpointListener). You can still fail while doing that, so you need to keep the confirmation around in the checkpoint such that you can confirm again after restore. It is important that confirmation can be done multiple times without affecting the result (idempotent). On recovery from a checkpoint, you want to delete all rows higher with a cid higher than the one you resume from. For example, if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have failed before you could confirm) and then delete 4 and 5 before starting the computation again.

You see, that strong consistency guarantees can be a bit tricky. If you don't need strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at least once" using the Checkpointed interface or the KeyValue state if your counter is scoped by key.

Cheers,
Max

On Mon, May 23, 2016 at 3:22 PM, Josh <[hidden email]> wrote:

> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
> What's a typical way of handling the scenario where you want to join
> streaming data with a (relatively) static data source? For example, if I
> have a stream 'orders' where each order has an 'item_id', and I want to join
> this stream with my database of 'items'. The database of items is mostly
> static (with perhaps a few new items added every day). The database can be
> retrieved either directly from a standard SQL database (postgres) or via a
> REST call. I guess one way to handle this would be to distribute the
> database of items with the Flink tasks, and to redeploy the entire job if
> the items database changes. But I think there's probably a better way to do
> it?
> I'd like my Flink job to output state to a REST API. (i.e. using the REST
> API as a sink). Updates would be incremental, e.g. the job would output
> tumbling window counts which need to be added to some property on a REST
> resource, so I'd probably implement this as a PATCH. I haven't found much
> evidence that anyone else has used a REST API as a Flink sink - is there a
> reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh

Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Josh
Hi Max,

Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to the idempotent version in future!

For 1), that sounds like a simple/easy solution, but how would I handle occasional updates in that case, since I guess the open() function is only called once? Do I need to periodically restart the job, or periodically trigger tasks to restart and refresh their data? Ideally I would want this job to be running constantly.

Josh

On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <[hidden email]> wrote:
Hi Josh,

1) Use a RichFunction which has an `open()` method to load data (e.g. from a database) at runtime before the processing starts.

2) No that's fine. If you want your Rest API Sink to interplay with checkpointing (for fault-tolerance), this is a bit tricky though depending on the guarantees you want to have. Typically, you would have "at least once" or "exactly once" semantics on the state. In Flink, this is easy to achieve, it's a bit harder for outside systems.

"At Least Once"

For example, if you increment a counter in a database, this count will be off if you recover your job in the case of a failure. You can checkpoint the current value of the counter and restore this value on a failure (using the Checkpointed interface). However, your counter might decrease temporarily when you resume from a checkpoint (until the counter has caught up again).

"Exactly Once"

If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid) in your database.

| cid | count |
|-----+-------|
|   0 |     3 |
|   1 |    11 |
|   2 |    20 |
|   3 |   120 |
|   4 |   137 |
|   5 |   158 |


You would then always read the newest cid value for presentation. You would only write to the database once you know you have completed the checkpoint (CheckpointListener). You can still fail while doing that, so you need to keep the confirmation around in the checkpoint such that you can confirm again after restore. It is important that confirmation can be done multiple times without affecting the result (idempotent). On recovery from a checkpoint, you want to delete all rows higher with a cid higher than the one you resume from. For example, if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have failed before you could confirm) and then delete 4 and 5 before starting the computation again.

You see, that strong consistency guarantees can be a bit tricky. If you don't need strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at least once" using the Checkpointed interface or the KeyValue state if your counter is scoped by key.

Cheers,
Max


On Mon, May 23, 2016 at 3:22 PM, Josh <[hidden email]> wrote:

> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
> What's a typical way of handling the scenario where you want to join
> streaming data with a (relatively) static data source? For example, if I
> have a stream 'orders' where each order has an 'item_id', and I want to join
> this stream with my database of 'items'. The database of items is mostly
> static (with perhaps a few new items added every day). The database can be
> retrieved either directly from a standard SQL database (postgres) or via a
> REST call. I guess one way to handle this would be to distribute the
> database of items with the Flink tasks, and to redeploy the entire job if
> the items database changes. But I think there's probably a better way to do
> it?
> I'd like my Flink job to output state to a REST API. (i.e. using the REST
> API as a sink). Updates would be incremental, e.g. the job would output
> tumbling window counts which need to be added to some property on a REST
> resource, so I'd probably implement this as a PATCH. I haven't found much
> evidence that anyone else has used a REST API as a Flink sink - is there a
> reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh


Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Maximilian Michels
Hi Josh,

You can trigger an occasional refresh, e.g. on every 100 elements
received. Or, you could start a thread that does that every 100
seconds (possible with a lock involved to prevent processing in the
meantime).

Cheers,
Max

On Mon, May 23, 2016 at 7:36 PM, Josh <[hidden email]> wrote:

>
> Hi Max,
>
> Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to the idempotent version in future!
>
> For 1), that sounds like a simple/easy solution, but how would I handle occasional updates in that case, since I guess the open() function is only called once? Do I need to periodically restart the job, or periodically trigger tasks to restart and refresh their data? Ideally I would want this job to be running constantly.
>
> Josh
>
> On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Josh,
>>
>> 1) Use a RichFunction which has an `open()` method to load data (e.g. from a database) at runtime before the processing starts.
>>
>> 2) No that's fine. If you want your Rest API Sink to interplay with checkpointing (for fault-tolerance), this is a bit tricky though depending on the guarantees you want to have. Typically, you would have "at least once" or "exactly once" semantics on the state. In Flink, this is easy to achieve, it's a bit harder for outside systems.
>>
>> "At Least Once"
>>
>> For example, if you increment a counter in a database, this count will be off if you recover your job in the case of a failure. You can checkpoint the current value of the counter and restore this value on a failure (using the Checkpointed interface). However, your counter might decrease temporarily when you resume from a checkpoint (until the counter has caught up again).
>>
>> "Exactly Once"
>>
>> If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid) in your database.
>>
>> | cid | count |
>> |-----+-------|
>> |   0 |     3 |
>> |   1 |    11 |
>> |   2 |    20 |
>> |   3 |   120 |
>> |   4 |   137 |
>> |   5 |   158 |
>>
>> You would then always read the newest cid value for presentation. You would only write to the database once you know you have completed the checkpoint (CheckpointListener). You can still fail while doing that, so you need to keep the confirmation around in the checkpoint such that you can confirm again after restore. It is important that confirmation can be done multiple times without affecting the result (idempotent). On recovery from a checkpoint, you want to delete all rows higher with a cid higher than the one you resume from. For example, if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have failed before you could confirm) and then delete 4 and 5 before starting the computation again.
>>
>> You see, that strong consistency guarantees can be a bit tricky. If you don't need strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at least once" using the Checkpointed interface or the KeyValue state if your counter is scoped by key.
>>
>> Cheers,
>> Max
>>
>>
>> On Mon, May 23, 2016 at 3:22 PM, Josh <[hidden email]> wrote:
>> > Hi all,
>> >
>> > I am new to Flink and have a couple of questions which I've had trouble
>> > finding answers to online. Any advice would be much appreciated!
>> >
>> > What's a typical way of handling the scenario where you want to join
>> > streaming data with a (relatively) static data source? For example, if I
>> > have a stream 'orders' where each order has an 'item_id', and I want to join
>> > this stream with my database of 'items'. The database of items is mostly
>> > static (with perhaps a few new items added every day). The database can be
>> > retrieved either directly from a standard SQL database (postgres) or via a
>> > REST call. I guess one way to handle this would be to distribute the
>> > database of items with the Flink tasks, and to redeploy the entire job if
>> > the items database changes. But I think there's probably a better way to do
>> > it?
>> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
>> > API as a sink). Updates would be incremental, e.g. the job would output
>> > tumbling window counts which need to be added to some property on a REST
>> > resource, so I'd probably implement this as a PATCH. I haven't found much
>> > evidence that anyone else has used a REST API as a Flink sink - is there a
>> > reason why this might be a bad idea?
>> >
>> > Thanks for any advice on these,
>> >
>> > Josh
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Aljoscha Krettek
Hi Josh,
for the first part of your question you might be interested in our ongoing work of adding side inputs to Flink. I started this design doc: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing

It's still somewhat rough around the edges but could you see this being useful for your case? I also have some more stuff that I will shortly add to the document.

Cheers,
Aljoscha

On Tue, 24 May 2016 at 14:34 Maximilian Michels <[hidden email]> wrote:
Hi Josh,

You can trigger an occasional refresh, e.g. on every 100 elements
received. Or, you could start a thread that does that every 100
seconds (possible with a lock involved to prevent processing in the
meantime).

Cheers,
Max

On Mon, May 23, 2016 at 7:36 PM, Josh <[hidden email]> wrote:
>
> Hi Max,
>
> Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to the idempotent version in future!
>
> For 1), that sounds like a simple/easy solution, but how would I handle occasional updates in that case, since I guess the open() function is only called once? Do I need to periodically restart the job, or periodically trigger tasks to restart and refresh their data? Ideally I would want this job to be running constantly.
>
> Josh
>
> On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Josh,
>>
>> 1) Use a RichFunction which has an `open()` method to load data (e.g. from a database) at runtime before the processing starts.
>>
>> 2) No that's fine. If you want your Rest API Sink to interplay with checkpointing (for fault-tolerance), this is a bit tricky though depending on the guarantees you want to have. Typically, you would have "at least once" or "exactly once" semantics on the state. In Flink, this is easy to achieve, it's a bit harder for outside systems.
>>
>> "At Least Once"
>>
>> For example, if you increment a counter in a database, this count will be off if you recover your job in the case of a failure. You can checkpoint the current value of the counter and restore this value on a failure (using the Checkpointed interface). However, your counter might decrease temporarily when you resume from a checkpoint (until the counter has caught up again).
>>
>> "Exactly Once"
>>
>> If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid) in your database.
>>
>> | cid | count |
>> |-----+-------|
>> |   0 |     3 |
>> |   1 |    11 |
>> |   2 |    20 |
>> |   3 |   120 |
>> |   4 |   137 |
>> |   5 |   158 |
>>
>> You would then always read the newest cid value for presentation. You would only write to the database once you know you have completed the checkpoint (CheckpointListener). You can still fail while doing that, so you need to keep the confirmation around in the checkpoint such that you can confirm again after restore. It is important that confirmation can be done multiple times without affecting the result (idempotent). On recovery from a checkpoint, you want to delete all rows higher with a cid higher than the one you resume from. For example, if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have failed before you could confirm) and then delete 4 and 5 before starting the computation again.
>>
>> You see, that strong consistency guarantees can be a bit tricky. If you don't need strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at least once" using the Checkpointed interface or the KeyValue state if your counter is scoped by key.
>>
>> Cheers,
>> Max
>>
>>
>> On Mon, May 23, 2016 at 3:22 PM, Josh <[hidden email]> wrote:
>> > Hi all,
>> >
>> > I am new to Flink and have a couple of questions which I've had trouble
>> > finding answers to online. Any advice would be much appreciated!
>> >
>> > What's a typical way of handling the scenario where you want to join
>> > streaming data with a (relatively) static data source? For example, if I
>> > have a stream 'orders' where each order has an 'item_id', and I want to join
>> > this stream with my database of 'items'. The database of items is mostly
>> > static (with perhaps a few new items added every day). The database can be
>> > retrieved either directly from a standard SQL database (postgres) or via a
>> > REST call. I guess one way to handle this would be to distribute the
>> > database of items with the Flink tasks, and to redeploy the entire job if
>> > the items database changes. But I think there's probably a better way to do
>> > it?
>> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
>> > API as a sink). Updates would be incremental, e.g. the job would output
>> > tumbling window counts which need to be added to some property on a REST
>> > resource, so I'd probably implement this as a PATCH. I haven't found much
>> > evidence that anyone else has used a REST API as a Flink sink - is there a
>> > reason why this might be a bad idea?
>> >
>> > Thanks for any advice on these,
>> >
>> > Josh
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Josh
Hi Aljoscha, 

That sounds exactly like the kind of feature I was looking for, since my use-case fits the "Join stream with slowly evolving data" example. For now, I will do an implementation similar to Max's suggestion. Of course it's not as nice as the proposed feature, as there will be a delay in receiving updates since the updates aren't being continuously ingested by Flink. But it certainly sounds like it would be a nice feature to have!

Thanks,
Josh

On Tue, May 24, 2016 at 1:48 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi Josh,
for the first part of your question you might be interested in our ongoing work of adding side inputs to Flink. I started this design doc: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing

It's still somewhat rough around the edges but could you see this being useful for your case? I also have some more stuff that I will shortly add to the document.

Cheers,
Aljoscha

On Tue, 24 May 2016 at 14:34 Maximilian Michels <[hidden email]> wrote:
Hi Josh,

You can trigger an occasional refresh, e.g. on every 100 elements
received. Or, you could start a thread that does that every 100
seconds (possible with a lock involved to prevent processing in the
meantime).

Cheers,
Max

On Mon, May 23, 2016 at 7:36 PM, Josh <[hidden email]> wrote:
>
> Hi Max,
>
> Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to the idempotent version in future!
>
> For 1), that sounds like a simple/easy solution, but how would I handle occasional updates in that case, since I guess the open() function is only called once? Do I need to periodically restart the job, or periodically trigger tasks to restart and refresh their data? Ideally I would want this job to be running constantly.
>
> Josh
>
> On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Josh,
>>
>> 1) Use a RichFunction which has an `open()` method to load data (e.g. from a database) at runtime before the processing starts.
>>
>> 2) No that's fine. If you want your Rest API Sink to interplay with checkpointing (for fault-tolerance), this is a bit tricky though depending on the guarantees you want to have. Typically, you would have "at least once" or "exactly once" semantics on the state. In Flink, this is easy to achieve, it's a bit harder for outside systems.
>>
>> "At Least Once"
>>
>> For example, if you increment a counter in a database, this count will be off if you recover your job in the case of a failure. You can checkpoint the current value of the counter and restore this value on a failure (using the Checkpointed interface). However, your counter might decrease temporarily when you resume from a checkpoint (until the counter has caught up again).
>>
>> "Exactly Once"
>>
>> If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid) in your database.
>>
>> | cid | count |
>> |-----+-------|
>> |   0 |     3 |
>> |   1 |    11 |
>> |   2 |    20 |
>> |   3 |   120 |
>> |   4 |   137 |
>> |   5 |   158 |
>>
>> You would then always read the newest cid value for presentation. You would only write to the database once you know you have completed the checkpoint (CheckpointListener). You can still fail while doing that, so you need to keep the confirmation around in the checkpoint such that you can confirm again after restore. It is important that confirmation can be done multiple times without affecting the result (idempotent). On recovery from a checkpoint, you want to delete all rows higher with a cid higher than the one you resume from. For example, if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have failed before you could confirm) and then delete 4 and 5 before starting the computation again.
>>
>> You see, that strong consistency guarantees can be a bit tricky. If you don't need strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at least once" using the Checkpointed interface or the KeyValue state if your counter is scoped by key.
>>
>> Cheers,
>> Max
>>
>>
>> On Mon, May 23, 2016 at 3:22 PM, Josh <[hidden email]> wrote:
>> > Hi all,
>> >
>> > I am new to Flink and have a couple of questions which I've had trouble
>> > finding answers to online. Any advice would be much appreciated!
>> >
>> > What's a typical way of handling the scenario where you want to join
>> > streaming data with a (relatively) static data source? For example, if I
>> > have a stream 'orders' where each order has an 'item_id', and I want to join
>> > this stream with my database of 'items'. The database of items is mostly
>> > static (with perhaps a few new items added every day). The database can be
>> > retrieved either directly from a standard SQL database (postgres) or via a
>> > REST call. I guess one way to handle this would be to distribute the
>> > database of items with the Flink tasks, and to redeploy the entire job if
>> > the items database changes. But I think there's probably a better way to do
>> > it?
>> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
>> > API as a sink). Updates would be incremental, e.g. the job would output
>> > tumbling window counts which need to be added to some property on a REST
>> > resource, so I'd probably implement this as a PATCH. I haven't found much
>> > evidence that anyone else has used a REST API as a Flink sink - is there a
>> > reason why this might be a bad idea?
>> >
>> > Thanks for any advice on these,
>> >
>> > Josh
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Nancy Estrada
Hi Josh,

I have a use-case similar to yours. I need to join a stream with data from a database to which I have access via a REST API. Since the Side inputs API continues begin and ongoing work. I am wondering how did you approached it, Did you use the rich function updating it periodically?

Thank you in advance!
Nancy

Reply | Threaded
Open this post in threaded view
|

Re: Combining streams with static data and using REST API as a sink

Aljoscha Krettek
A quick note on this: the side-input API is still ongoing work and it turns out it’s more complicated (obviously … 😳) and we will need quite a bit more work on other parts of Flink before we can provide a good built-in solution.

In the meantime, you can check out the Async I/O operator [1]. I think this fits your use case of accessing an external system quite well because it allows firing off several request to external systems at the same time.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

> On 21. Jun 2017, at 18:44, Nancy Estrada <[hidden email]> wrote:
>
> Hi Josh,
>
> I have a use-case similar to yours. I need to join a stream with data from a
> database to which I have access via a REST API. Since the Side inputs API
> continues begin and ongoing work. I am wondering how did you approached it,
> Did you use the rich function updating it periodically?
>
> Thank you in advance!
> Nancy
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Combining-streams-with-static-data-and-using-REST-API-as-a-sink-tp7083p13902.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.