Join a datastream with tables stored in Hive

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Join a datastream with tables stored in Hive

Krzysztof Zarzycki
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Kurt Young
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Kurt Young

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <[hidden email]> wrote:
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Krzysztof Zarzycki
Very interesting, Kurt! Yes, I also imagined it's rather a very common case. In my company we currently have 3 clients wanting this functionality. 
I also just realized this slight difference between Temporal Join and Temporal Table Function Join, that there are actually two methods:)

Regarding option 1: 
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every let's say day, assigns event time column to rows and creates a stream of it. It does that and only that. 
* create a table (from Table API) out of it, assigning one of the columns as an event time column.
* then use table.createTemporalTableFunction(<all columns, including time column>)
* finally join my main data stream with the temporal table function (let me use short name TTF from now) from my dictionary, using Flink SQL and LATERAL TABLE (Rates(o.rowtime)) AS r construct. 
And so I should achieve my temporal event-time based join with versioned dictionaries!
Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

Question/worry 2: One thing that worried me is this comment in the docs:
Note: State retention defined in a query configuration is not yet implemented for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.  

On the other side, I find this comment: By definition of event time, watermarks allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.
So I believe that the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys. Which is fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of historical data. Let's say I would need to join data stream and (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time ) is that the join can happen only on processing time, which means join only with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory (or even to Flink state, if it is possible to use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed with Flink SQL. 
What do you think, is that feasible?  
Do I understand correctly, that this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a subject of a really important design for the community. 
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young <[hidden email]> napisał(a):

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <[hidden email]> wrote:
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Kurt Young
Hi Krzysztof, thanks for the discussion, you raised lots of good questions, I will try to reply them 
one by one. 

Re option 1:

Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

I'm not sure if you can reuse the logic of `HiveTableSource`. Currently `HiveTableSource` works
as batch mode, it will read all data at once and stop. But what you need is wait until next day after
finish. What you can try is reuse the logic of `HiveTableInputFormat`, and wrap the "monitoring" 
logic outside.

Question/worry 2:  the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys.

The temporal table function doesn't support watermark based state clean up yet, but what you can
try is idle state retention [1]. So even if you have infinite number of keys, for example say you have
different join keys every day, the old keys will not be touched in next days and become idle and will
be deleted by framework.

> Question 3: Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks?

Generally speaking, yes I think so. With event time based join, we should be able to reuse the logic
of normal stream processing and reprocessing historical data. Although there will definitely exists some
details should be addressed, like event time and watermarks.  

Re option 2:

> maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory 

You can do this manually but I would recommend you go with the first choice which loads hive table
to HBase periodically. It's much more easier and efficient. And this approach you mentioned also 
seems a little bit duplicate with the temporal table function solution. 

> this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

I'm afraid yes, you can only use it with SQL for now.

> do you think it would be possible to use the same logic / SQL for reprocessing?

Given the fact this solution is based on processing time, I don't think it can cover the use case of 
reprocessing, except if you can accept always joining with latest day even during backfilling. But we
are also aiming to resolve this shortcoming maybe in 1 or 2 releases. 

Best,
Kurt


On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <[hidden email]> wrote:
Very interesting, Kurt! Yes, I also imagined it's rather a very common case. In my company we currently have 3 clients wanting this functionality. 
I also just realized this slight difference between Temporal Join and Temporal Table Function Join, that there are actually two methods:)

Regarding option 1: 
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every let's say day, assigns event time column to rows and creates a stream of it. It does that and only that. 
* create a table (from Table API) out of it, assigning one of the columns as an event time column.
* then use table.createTemporalTableFunction(<all columns, including time column>)
* finally join my main data stream with the temporal table function (let me use short name TTF from now) from my dictionary, using Flink SQL and LATERAL TABLE (Rates(o.rowtime)) AS r construct. 
And so I should achieve my temporal event-time based join with versioned dictionaries!
Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

Question/worry 2: One thing that worried me is this comment in the docs:
Note: State retention defined in a query configuration is not yet implemented for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.  

On the other side, I find this comment: By definition of event time, watermarks allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.
So I believe that the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys. Which is fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of historical data. Let's say I would need to join data stream and (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time ) is that the join can happen only on processing time, which means join only with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory (or even to Flink state, if it is possible to use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed with Flink SQL. 
What do you think, is that feasible?  
Do I understand correctly, that this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a subject of a really important design for the community. 
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young <[hidden email]> napisał(a):

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <[hidden email]> wrote:
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Krzysztof Zarzycki
Thanks Kurt for your answers. 

Summing up, I feel like the option 1 (i.e. join with temporal table function) requires some coding around a source, that needs to pull data once a day. But otherwise, bring the following benefits:
* I don't have to put dicts in another store like Hbase. All stays in Hive + Flink.
* I'll be able to make a true temporal join - event-time based. 
* I believe I will be able to build a history reprocessing program based on the same logic (i.e. same SQL). At least for a particular day - processing multiple days would be tricky, because I will need to pull multiple versions of the dictionary. 
Plus, looking up dict values will be much faster and resource optimal when dict is stored in a state instead of uncached Hbase. It's especially important in a case when we want to reprocess historical, archived stream with a speed of millions of events/sec.

I understand that option 2 is easier to implement. I may do a PoC of it as well. 
OK, I believe I know enough to get my hands dirty with the code. I can share later on what I was able to accomplish. And probably more questions will show up when I finally start the implementation. 

Thanks
Krzysztof

pon., 16 gru 2019 o 03:14 Kurt Young <[hidden email]> napisał(a):
Hi Krzysztof, thanks for the discussion, you raised lots of good questions, I will try to reply them 
one by one. 

Re option 1:

Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

I'm not sure if you can reuse the logic of `HiveTableSource`. Currently `HiveTableSource` works
as batch mode, it will read all data at once and stop. But what you need is wait until next day after
finish. What you can try is reuse the logic of `HiveTableInputFormat`, and wrap the "monitoring" 
logic outside.

Question/worry 2:  the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys.

The temporal table function doesn't support watermark based state clean up yet, but what you can
try is idle state retention [1]. So even if you have infinite number of keys, for example say you have
different join keys every day, the old keys will not be touched in next days and become idle and will
be deleted by framework.

> Question 3: Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks?

Generally speaking, yes I think so. With event time based join, we should be able to reuse the logic
of normal stream processing and reprocessing historical data. Although there will definitely exists some
details should be addressed, like event time and watermarks.  

Re option 2:

> maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory 

You can do this manually but I would recommend you go with the first choice which loads hive table
to HBase periodically. It's much more easier and efficient. And this approach you mentioned also 
seems a little bit duplicate with the temporal table function solution. 

> this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

I'm afraid yes, you can only use it with SQL for now.

> do you think it would be possible to use the same logic / SQL for reprocessing?

Given the fact this solution is based on processing time, I don't think it can cover the use case of 
reprocessing, except if you can accept always joining with latest day even during backfilling. But we
are also aiming to resolve this shortcoming maybe in 1 or 2 releases. 

Best,
Kurt


On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <[hidden email]> wrote:
Very interesting, Kurt! Yes, I also imagined it's rather a very common case. In my company we currently have 3 clients wanting this functionality. 
I also just realized this slight difference between Temporal Join and Temporal Table Function Join, that there are actually two methods:)

Regarding option 1: 
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every let's say day, assigns event time column to rows and creates a stream of it. It does that and only that. 
* create a table (from Table API) out of it, assigning one of the columns as an event time column.
* then use table.createTemporalTableFunction(<all columns, including time column>)
* finally join my main data stream with the temporal table function (let me use short name TTF from now) from my dictionary, using Flink SQL and LATERAL TABLE (Rates(o.rowtime)) AS r construct. 
And so I should achieve my temporal event-time based join with versioned dictionaries!
Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

Question/worry 2: One thing that worried me is this comment in the docs:
Note: State retention defined in a query configuration is not yet implemented for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.  

On the other side, I find this comment: By definition of event time, watermarks allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.
So I believe that the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys. Which is fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of historical data. Let's say I would need to join data stream and (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time ) is that the join can happen only on processing time, which means join only with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory (or even to Flink state, if it is possible to use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed with Flink SQL. 
What do you think, is that feasible?  
Do I understand correctly, that this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a subject of a really important design for the community. 
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young <[hidden email]> napisał(a):

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <[hidden email]> wrote:
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof

Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Kurt Young
Great, looking forward to hearing from you again. 

Best,
Kurt


On Mon, Dec 16, 2019 at 10:22 PM Krzysztof Zarzycki <[hidden email]> wrote:
Thanks Kurt for your answers. 

Summing up, I feel like the option 1 (i.e. join with temporal table function) requires some coding around a source, that needs to pull data once a day. But otherwise, bring the following benefits:
* I don't have to put dicts in another store like Hbase. All stays in Hive + Flink.
* I'll be able to make a true temporal join - event-time based. 
* I believe I will be able to build a history reprocessing program based on the same logic (i.e. same SQL). At least for a particular day - processing multiple days would be tricky, because I will need to pull multiple versions of the dictionary. 
Plus, looking up dict values will be much faster and resource optimal when dict is stored in a state instead of uncached Hbase. It's especially important in a case when we want to reprocess historical, archived stream with a speed of millions of events/sec.

I understand that option 2 is easier to implement. I may do a PoC of it as well. 
OK, I believe I know enough to get my hands dirty with the code. I can share later on what I was able to accomplish. And probably more questions will show up when I finally start the implementation. 

Thanks
Krzysztof

pon., 16 gru 2019 o 03:14 Kurt Young <[hidden email]> napisał(a):
Hi Krzysztof, thanks for the discussion, you raised lots of good questions, I will try to reply them 
one by one. 

Re option 1:

Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

I'm not sure if you can reuse the logic of `HiveTableSource`. Currently `HiveTableSource` works
as batch mode, it will read all data at once and stop. But what you need is wait until next day after
finish. What you can try is reuse the logic of `HiveTableInputFormat`, and wrap the "monitoring" 
logic outside.

Question/worry 2:  the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys.

The temporal table function doesn't support watermark based state clean up yet, but what you can
try is idle state retention [1]. So even if you have infinite number of keys, for example say you have
different join keys every day, the old keys will not be touched in next days and become idle and will
be deleted by framework.

> Question 3: Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks?

Generally speaking, yes I think so. With event time based join, we should be able to reuse the logic
of normal stream processing and reprocessing historical data. Although there will definitely exists some
details should be addressed, like event time and watermarks.  

Re option 2:

> maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory 

You can do this manually but I would recommend you go with the first choice which loads hive table
to HBase periodically. It's much more easier and efficient. And this approach you mentioned also 
seems a little bit duplicate with the temporal table function solution. 

> this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

I'm afraid yes, you can only use it with SQL for now.

> do you think it would be possible to use the same logic / SQL for reprocessing?

Given the fact this solution is based on processing time, I don't think it can cover the use case of 
reprocessing, except if you can accept always joining with latest day even during backfilling. But we
are also aiming to resolve this shortcoming maybe in 1 or 2 releases. 

Best,
Kurt


On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki <[hidden email]> wrote:
Very interesting, Kurt! Yes, I also imagined it's rather a very common case. In my company we currently have 3 clients wanting this functionality. 
I also just realized this slight difference between Temporal Join and Temporal Table Function Join, that there are actually two methods:)

Regarding option 1: 
So I would need to:
* write a Datastream API source, that pulls Hive dictionary table every let's say day, assigns event time column to rows and creates a stream of it. It does that and only that. 
* create a table (from Table API) out of it, assigning one of the columns as an event time column.
* then use table.createTemporalTableFunction(<all columns, including time column>)
* finally join my main data stream with the temporal table function (let me use short name TTF from now) from my dictionary, using Flink SQL and LATERAL TABLE (Rates(o.rowtime)) AS r construct. 
And so I should achieve my temporal event-time based join with versioned dictionaries!
Question 1: do I need to write that Hive source or can I use something ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource class?

Question/worry 2: One thing that worried me is this comment in the docs:
Note: State retention defined in a query configuration is not yet implemented for temporal joins. This means that the required state to compute the query result might grow infinitely depending on the number of distinct primary keys for the history table.  

On the other side, I find this comment: By definition of event time, watermarks allow the join operation to move forward in time and discard versions of the build table that are no longer necessary because no incoming row with lower or equal timestamp is expected.
So I believe that the state would grow inifinitely if I had infinite number of keys, but not only infinite number of versions of all keys. Which is fine. Do you confirm?

Question 3: I need to be able to cover also reprocessing or backfilling of historical data. Let's say I would need to join data stream and (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I could use the same logic for both stream processing and reprocessing just by replacing sources and sinks? Maybe after some slight modifications?


Regarding option 2:
Here I understand the current limitation (which will stay for some time ) is that the join can happen only on processing time, which means join only with the latest version of dictionaries.
Accepting that, I understand I would need to do:
a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the whole dictionary to memory (or even to Flink state, if it is possible to use it from TableFunction).
Then use this table and my Kafka stream table in temporal join expressed with Flink SQL. 
What do you think, is that feasible?  
Do I understand correctly, that this option is available only with Blink engine and also only with use of Flink SQL, no Table API?

Same question comes up regarding reprocessing: do you think it would be possible to use the same logic / SQL for reprocessing?

Thank you for continuing discussion with me. I believe we're here on a subject of a really important design for the community. 
Krzysztof

pt., 13 gru 2019 o 09:39 Kurt Young <[hidden email]> napisał(a):

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young <[hidden email]> wrote:
Hi Krzysztof,

What you raised also interested us a lot to achieve in Flink. Unfortunately, there
is no in place solution in Table/SQL API yet, but you have 2 options which are both
close to this thus need some modifications. 

1. The first one is use temporal table function [1]. It needs you to write the logic of 
reading hive tables and do the daily update inside the table function. 
2. The second choice is to use temporal table join [2], which only works with processing
time now (just like the simple solution you mentioned), and need the table source has
look up capability (like hbase). Currently, hive connector doesn't support look up, so to
make this work, you need to sync the content to other storages which support look up,
like HBase. 

Both solutions are not ideal now, and we also aims to improve this maybe in the following
release. 

Best,
Kurt


On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki <[hidden email]> wrote:
Hello dear Flinkers, 
If this kind of question was asked on the groups, I'm sorry for a duplicate. Feel free to just point me to the thread.
I have to solve a probably pretty common case of joining a datastream to a dataset. 
Let's say I have the following setup:
* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

Now conceptually, I would like to join the stream of incoming events to the dimension tables (simple hash join). we can consider two cases: 
1) simpler, where I join the stream with the most recent version of the dictionaries. (So the result is accepted to be nondeterministic if the job is retried). 
2) more advanced, where I would like to do temporal join of the stream with dictionaries snapshots that were valid at the time of the event. (This result should be deterministic). 

The end goal is to do aggregation of that joined stream, store results in Hive or more real-time analytical store (Druid). 

Now, could you please help me understand is any of these cases implementable with declarative Table/SQL API? With use of temporal joins, catalogs, Hive integration, JDBC connectors, or whatever beta features there are now. (I've read quite a lot of Flink docs about each of those, but I have a problem to compile this information in the final design.)
Could you please help me understand how these components should cooperate? 
If that is impossible with Table API, can we come up with the easiest implementation using Datastream API ? 

Thanks a lot for any help!
Krzysztof

Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

maverick
Hi Kurt,
Is there any Jira task for tracking progress of adding event time support to
temporal joins ?

Regards,
Maciek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Chesnay Schepler
According to the documentation this is already implemented.

On 12/1/2020 3:53 PM, maverick wrote:
Hi Kurt,
Is there any Jira task for tracking progress of adding event time support to
temporal joins ?

Regards,
Maciek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

maverick
Hi,
There is an implementation only for temporal tables which needs some Java/Scala coding (no SQL-only implementation).
On the same page there is annotation:
Attention Flink does not support event time temporal table joins currently.

So this is the reason, I'm asking this question.
My use case: 
I want to join the Kafka stream with a table from JDBC source.
Every record in Kafka has event time. Also records in JDBC are versioned.
I didn't find a SQL solution to this problem.

Regards,
Maciek

wt., 1 gru 2020 o 20:31 Chesnay Schepler <[hidden email]> napisał(a):
According to the documentation this is already implemented.

On 12/1/2020 3:53 PM, maverick wrote:
Hi Kurt,
Is there any Jira task for tracking progress of adding event time support to
temporal joins ?

Regards,
Maciek



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




--
Maciek Bryński
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Leonard Xu
Hi, Maciej


I didn't find a SQL solution to this problem.


Now Flink provides the SQL solution, you can see the doc[1], the Flink-1.12 document link that posted by Chesnay should have updated but not..., I’ll check the document of 1.12.

Best,
Leonard
Reply | Threaded
Open this post in threaded view
|

Re: Join a datastream with tables stored in Hive

Leonard Xu
In reply to this post by Krzysztof Zarzycki
Hi, Krzysztof   

* I have a high pace stream of events coming in Kafka. 
* I have some dimension tables stored in Hive. These tables are changed daily. I can keep a snapshot for each day. 

For this use case, Flink supports temporal join the latest hive partition as temporal table now, you can refer the example in doc [1], and this feature will come soon with nearly Flink 1.12 release.

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#temporal-join-the-latest-partition