How to stream intermediate data that is stored in external storage?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

How to stream intermediate data that is stored in external storage?

kant kodali
Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Piotr Nowojski-3
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used queryable state)

Piotrek

[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

On 29 Oct 2019, at 16:42, kant kodali <[hidden email]> wrote:

Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

kant kodali
Hi Piotr,

I am talking about the internal state. How often this state gets checkpointed? if it is every few seconds then it may not meet our real-time requirement(sub second).

The question really is can I read this internal state in a streaming fashion in an update mode? The state processor API seems to expose DataSet but not DataStream so I am not sure how to read internal state in streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used queryable state)

Piotrek

[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

On 29 Oct 2019, at 16:42, kant kodali <[hidden email]> wrote:

Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Piotr Nowojski-3
Hi Kant,

Checkpointing interval is configurable, but I wouldn’t count on it working well with even 10s intervals. 
 
I think what you are this is not supported by Flink generically. Maybe Queryable state I mentioned before? But I have never used it.

I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output.

Piotrek

On 30 Oct 2019, at 16:14, kant kodali <[hidden email]> wrote:

Hi Piotr,

I am talking about the internal state. How often this state gets checkpointed? if it is every few seconds then it may not meet our real-time requirement(sub second).

The question really is can I read this internal state in a streaming fashion in an update mode? The state processor API seems to expose DataSet but not DataStream so I am not sure how to read internal state in streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used queryable state)

Piotrek

[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

On 29 Oct 2019, at 16:42, kant kodali <[hidden email]> wrote:

Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

kant kodali
"I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output"

Yes, I am looking for this but I am not sure how to do this? Should I use the processFunction(like the event-driven applications) ?

On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski <[hidden email]> wrote:
Hi Kant,

Checkpointing interval is configurable, but I wouldn’t count on it working well with even 10s intervals. 
 
I think what you are this is not supported by Flink generically. Maybe Queryable state I mentioned before? But I have never used it.

I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output.

Piotrek

On 30 Oct 2019, at 16:14, kant kodali <[hidden email]> wrote:

Hi Piotr,

I am talking about the internal state. How often this state gets checkpointed? if it is every few seconds then it may not meet our real-time requirement(sub second).

The question really is can I read this internal state in a streaming fashion in an update mode? The state processor API seems to expose DataSet but not DataStream so I am not sure how to read internal state in streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used queryable state)

Piotrek

[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

On 29 Oct 2019, at 16:42, kant kodali <[hidden email]> wrote:

Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!


Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Averell
Hi Kant,

I wonder why you need to "source" your intermediate state from files? Why
not "source" it from the previous operator? I.e. instead of (A join B) ->
State -> files -> (C), why not do (A join B) -> State -> (files + C)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

kant kodali
Hi Averell,

I want to write intermediate results (A join B) incrementally and in real-time to some external storage so I can query it using SQL.
I am new to Flink so I am trying to find out if 1) such mechanism exists? 2) If not, what are the alternatives?

Thanks

On Thu, Oct 31, 2019 at 1:42 AM Averell <[hidden email]> wrote:
Hi Kant,

I wonder why you need to "source" your intermediate state from files? Why
not "source" it from the previous operator? I.e. instead of (A join B) ->
State -> files -> (C), why not do (A join B) -> State -> (files + C)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Averell
Hi Kant,

Not sure about what you meant in "query it using SQL"? Do you mean running
ad-hoc SQL queries on that joined data? If that's what you meant, then
you'll need some SQL server first, then write the joined data to that SQL
server. ElasticSearch and Cassandra are ready-to-use options. Writing a
custom sink function to write to your own SQL server is also a
not-so-difficult solution.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

kant kodali
Hi Averell,

yes,  I want to run ad-hoc SQL queries on the joined data as well as data that may join in the future. For example, let's say if you take datasets A and B in streaming mode a row in A can join with a row B in some time in future let's say but meanwhile if I query the intermediate state using SQL I want the row in A that have not yet joined with B to also be available to Query. so not just joined results alone but also data that might be join in the future as well since its all streaming.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Piotr Nowojski-3
In reply to this post by kant kodali
Hi,

Generally speaking it’s a good question, why do you need to do this? What information do you need from the outer join’s internal state? Can not you just process the result to obtain the same information in another way?


> Yes, I am looking for this but I am not sure how to do this? Should I use the processFunction(like the event-driven applications) ?

Another basic question, are you using DataStream API, TableAPI or SQL?

Assuming TableAPI or SQL, you would have to split your query into three:

1. Left side of the join
2. Right side of the join
3. Downstream of the join (if any)

Next you would have to write your own DataStream API outer join operator (implement your own or copy/paste or inherit from the SQL’s/Table API operator), which has an additional side output [0] of the state changes that you want. To do this, you probably can go with two different approaches:

a) define CoProcessFunction 
b) define TwoInputStreamOperator

After that, you have to convert the queries from 1. And 2. Into two separate DataStream’s [1], connect them [2] and process [3] with yours CoProcessFunction (a) or transform [4] with yours TwoInputStreamOperator, and convert the result back from a DataStream to a Table [5]

[1] for example StreamTableEnvironment#toRetractStream() or #toAppendStream()
[3] ConnectedStreams#process()
[4] ConnectedStreams#transform()
[5] StreamTableEnvironment#fromDataStream(org.apache.flink.streaming.api.datastream.DataStream<T>)

However keep in mind that in [5], there is probably no way to convert a DataStream with retraction/updates back into a Table, so your join operator would have to produce append only output.

Piotrek

On 30 Oct 2019, at 20:45, kant kodali <[hidden email]> wrote:

"I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output"

Yes, I am looking for this but I am not sure how to do this? Should I use the processFunction(like the event-driven applications) ?

On Wed, Oct 30, 2019 at 8:53 AM Piotr Nowojski <[hidden email]> wrote:
Hi Kant,

Checkpointing interval is configurable, but I wouldn’t count on it working well with even 10s intervals. 
 
I think what you are this is not supported by Flink generically. Maybe Queryable state I mentioned before? But I have never used it.

I think you would have to implement your own custom operator that would output changes to it’s internal state as a side output.

Piotrek

On 30 Oct 2019, at 16:14, kant kodali <[hidden email]> wrote:

Hi Piotr,

I am talking about the internal state. How often this state gets checkpointed? if it is every few seconds then it may not meet our real-time requirement(sub second).

The question really is can I read this internal state in a streaming fashion in an update mode? The state processor API seems to expose DataSet but not DataStream so I am not sure how to read internal state in streaming fashion in an update made?

Thanks!

On Wed, Oct 30, 2019 at 7:25 AM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure what are you trying to achieve. What do you mean by “state of full outer join”? The result of it? Or it’s internal state? Also keep in mind, that internal state of the operators in Flink is already snapshoted/written down to an external storage during checkpointing mechanism.

The result should be simple, just write it to some Sink.

For the internal state, it sounds like you are doing something not the way it was intended… having said that, you can try one of the following options:
a) Implement your own outer join operator (might not be as easy if you are using Table API/SQL) and just create a side output for the state changes.
b) Use state processor API to read the content of a savepoint/checkpoint [1][2]
c) Use queryable state [3] (I’m not sure about this, I have never used queryable state)

Piotrek

[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

On 29 Oct 2019, at 16:42, kant kodali <[hidden email]> wrote:

Hi All,

I want to do a full outer join on two streaming data sources and store the state of full outer join in some external storage like rocksdb or something else. And then want to use this intermediate state as a streaming source again, do some transformation and write it to some external store. is that possible with Flink 1.9?

Also what storage systems support push mechanism for the intermediate data? For example, In the use case above does rocksdb support push/emit events in a streaming fashion?

Thanks!



Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

Averell
In reply to this post by kant kodali
Hi Kant,

So your problem statement is "ingest 2 streams into a data warehouse". The main component of the solution, from my view, is that SQL server. You can have a sink function to insert records in your two streams into two different tables (A and B), or upsert into one single table C. That upsert action itself serves as a join function, there's no need to join in Flink at all.

There are many tools out there can be used for that ingestion. Flink, of course, can be used for that purpose. But for me, it's an overkill.

Regards,
Averell

On Thu, 31 Oct. 2019, 8:19 pm kant kodali, <[hidden email]> wrote:
Hi Averell,

yes,  I want to run ad-hoc SQL queries on the joined data as well as data that may join in the future. For example, let's say if you take datasets A and B in streaming mode a row in A can join with a row B in some time in future let's say but meanwhile if I query the intermediate state using SQL I want the row in A that have not yet joined with B to also be available to Query. so not just joined results alone but also data that might be join in the future as well since its all streaming.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: How to stream intermediate data that is stored in external storage?

kant kodali
Hi Huyen,

That is not my problem statement.  If it is just ingesting from A to B I am sure there are enough tutorials for me to get it done. I also feel like the more I elaborate the more confusing it gets and I am not sure why.

I want to join two streams and I want to see/query the results of that join in real-time but I also have some constraints.

I come from Spark world and in Spark, if I do an inner join of two streams A & B  then I can see results only when there are matching rows between A & B (By definition of inner join this makes sense) but if I do an outer join of two streams A & B I need to specify the time constraint and only when the time elapses fully I can see the rows that did not match. This means if my time constraint is one hour I cannot query the intermediate results until one hour and this is not the behavior I want. I want to be able to do all sorts of SQL queries on intermediate results.

I like Flink's idea of externalizing the state that way I don't have to worry about the memory but I am also trying to avoid writing a separate microservice that needs to poll and display the intermediate results of the join in real-time. Instead, I am trying to see if there is a way to treat that constantly evolving intermediate results as a streaming source, and maybe do some more transformations and push out to another sink.

Hope that makes sense.

Thanks,
Kant



On Thu, Oct 31, 2019 at 2:43 AM Huyen Levan <[hidden email]> wrote:
Hi Kant,

So your problem statement is "ingest 2 streams into a data warehouse". The main component of the solution, from my view, is that SQL server. You can have a sink function to insert records in your two streams into two different tables (A and B), or upsert into one single table C. That upsert action itself serves as a join function, there's no need to join in Flink at all.

There are many tools out there can be used for that ingestion. Flink, of course, can be used for that purpose. But for me, it's an overkill.

Regards,
Averell

On Thu, 31 Oct. 2019, 8:19 pm kant kodali, <[hidden email]> wrote:
Hi Averell,

yes,  I want to run ad-hoc SQL queries on the joined data as well as data that may join in the future. For example, let's say if you take datasets A and B in streaming mode a row in A can join with a row B in some time in future let's say but meanwhile if I query the intermediate state using SQL I want the row in A that have not yet joined with B to also be available to Query. so not just joined results alone but also data that might be join in the future as well since its all streaming.

Thanks!