Flink + Hive + Compaction + Parquet?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink + Hive + Compaction + Parquet?

Theo
Hi there,

Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too many small files and writing to parquet fast but also desiring large files is a rather common problem and solutions were suggested like recently in the mailing list [1] or in flink forward talks [2]. Cloudera also posted two possible scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously compact the many small files into larger ones, at best non blocking and in an occasionally running batch job.

I am now about to implement something like suggested in the cloudera blog [4] but from parquet to parquet. For me, it seems to be not straight forward but rather involved, especially as my data is partitioned in eventtime and I need the compaction to be non blocking (my users query impala and expect near real time performance in each query). When starting the work on that, I noticed that Hive already has a compaction mechanism included and the Flink community works a lot in terms of integrating with hive in the latest releases. Some of my questions are not directly related to Flink, but I guess many of you have also experience with hive and writing from Flink to Hive is rather common nowadays.

I read online that Spark should integrate nicely with Hive tables, i.e. instead of querying HDFS files, querying a hive table has the same performance [5]. We also all know that Impala integrates nicely with Hive so that overall, I can expect writing to Hive internal tables instead of HDFS parquet doesn't have any disadvantages for me.

My questions:
1. Can I use Flink to "streaming write" to Hive?
2. For compaction, I need "transactional tables" and according to the hive docs, transactional tables must be fully managed by hive (i.e., they are not external). Does Flink support writing to those out of the box? (I only have Hive 2 available)
3. Does Flink use the "Hive Streaming Data Ingest" APIs?
4. Do you see any downsides in writing to hive compared to writing to parquet directly? (Especially in my usecase only having impala and spark consumers)
5. Not Flink related: Have you ever experienced performance issues when using hive transactional tables over writing parquet directly? I guess there must be a reason why "transactional" is off by default in Hive? I won't use any features except for compaction, i.e. there are only streaming inserts, no updates, no deletes. (Delete only after given retention and always delete entire partitions)


Best regards
Theo

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Hive + Compaction + Parquet?

Dawid Wysakowicz-2

Hi,

I know Jingsong worked on Flink/Hive filesystem integration in the Table/SQL API. Maybe he can shed some light on your questions.

Best,

Dawid

On 02/03/2021 21:03, Theo Diefenthal wrote:
Hi there,

Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too many small files and writing to parquet fast but also desiring large files is a rather common problem and solutions were suggested like recently in the mailing list [1] or in flink forward talks [2]. Cloudera also posted two possible scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously compact the many small files into larger ones, at best non blocking and in an occasionally running batch job.

I am now about to implement something like suggested in the cloudera blog [4] but from parquet to parquet. For me, it seems to be not straight forward but rather involved, especially as my data is partitioned in eventtime and I need the compaction to be non blocking (my users query impala and expect near real time performance in each query). When starting the work on that, I noticed that Hive already has a compaction mechanism included and the Flink community works a lot in terms of integrating with hive in the latest releases. Some of my questions are not directly related to Flink, but I guess many of you have also experience with hive and writing from Flink to Hive is rather common nowadays.

I read online that Spark should integrate nicely with Hive tables, i.e. instead of querying HDFS files, querying a hive table has the same performance [5]. We also all know that Impala integrates nicely with Hive so that overall, I can expect writing to Hive internal tables instead of HDFS parquet doesn't have any disadvantages for me.

My questions:
1. Can I use Flink to "streaming write" to Hive?
2. For compaction, I need "transactional tables" and according to the hive docs, transactional tables must be fully managed by hive (i.e., they are not external). Does Flink support writing to those out of the box? (I only have Hive 2 available)
3. Does Flink use the "Hive Streaming Data Ingest" APIs?
4. Do you see any downsides in writing to hive compared to writing to parquet directly? (Especially in my usecase only having impala and spark consumers)
5. Not Flink related: Have you ever experienced performance issues when using hive transactional tables over writing parquet directly? I guess there must be a reason why "transactional" is off by default in Hive? I won't use any features except for compaction, i.e. there are only streaming inserts, no updates, no deletes. (Delete only after given retention and always delete entire partitions)


Best regards
Theo


OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink + Hive + Compaction + Parquet?

Flavio Pompermaier
What about using Apache Hudi o Apache Iceberg? 

On Thu, Mar 4, 2021 at 10:15 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi,

I know Jingsong worked on Flink/Hive filesystem integration in the Table/SQL API. Maybe he can shed some light on your questions.

Best,

Dawid

On 02/03/2021 21:03, Theo Diefenthal wrote:
Hi there,

Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too many small files and writing to parquet fast but also desiring large files is a rather common problem and solutions were suggested like recently in the mailing list [1] or in flink forward talks [2]. Cloudera also posted two possible scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously compact the many small files into larger ones, at best non blocking and in an occasionally running batch job.

I am now about to implement something like suggested in the cloudera blog [4] but from parquet to parquet. For me, it seems to be not straight forward but rather involved, especially as my data is partitioned in eventtime and I need the compaction to be non blocking (my users query impala and expect near real time performance in each query). When starting the work on that, I noticed that Hive already has a compaction mechanism included and the Flink community works a lot in terms of integrating with hive in the latest releases. Some of my questions are not directly related to Flink, but I guess many of you have also experience with hive and writing from Flink to Hive is rather common nowadays.

I read online that Spark should integrate nicely with Hive tables, i.e. instead of querying HDFS files, querying a hive table has the same performance [5]. We also all know that Impala integrates nicely with Hive so that overall, I can expect writing to Hive internal tables instead of HDFS parquet doesn't have any disadvantages for me.

My questions:
1. Can I use Flink to "streaming write" to Hive?
2. For compaction, I need "transactional tables" and according to the hive docs, transactional tables must be fully managed by hive (i.e., they are not external). Does Flink support writing to those out of the box? (I only have Hive 2 available)
3. Does Flink use the "Hive Streaming Data Ingest" APIs?
4. Do you see any downsides in writing to hive compared to writing to parquet directly? (Especially in my usecase only having impala and spark consumers)
5. Not Flink related: Have you ever experienced performance issues when using hive transactional tables over writing parquet directly? I guess there must be a reason why "transactional" is off by default in Hive? I won't use any features except for compaction, i.e. there are only streaming inserts, no updates, no deletes. (Delete only after given retention and always delete entire partitions)


Best regards
Theo

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Hive + Compaction + Parquet?

Kurt Young
Hi Theo, 

Regarding your first 2 questions, the answer is yes Flink supports streaming write to Hive. 
And Flink also supports automatically compacting small files during streaming write [1]. 
(Hive and Filesystem shared the same mechanism to do compaction, we forgot to add a dedicated document for hive.)

And you don't need the hive transaction table for this because Flink will compact all the small files
_before_ commit the files or partition to hive. From hive's perspective, the written files are already
large files.

I think this might address most of your confusions and let me know if you have further questions. 


Best,
Kurt


On Mon, Mar 15, 2021 at 5:05 PM Flavio Pompermaier <[hidden email]> wrote:
What about using Apache Hudi o Apache Iceberg? 

On Thu, Mar 4, 2021 at 10:15 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi,

I know Jingsong worked on Flink/Hive filesystem integration in the Table/SQL API. Maybe he can shed some light on your questions.

Best,

Dawid

On 02/03/2021 21:03, Theo Diefenthal wrote:
Hi there,

Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too many small files and writing to parquet fast but also desiring large files is a rather common problem and solutions were suggested like recently in the mailing list [1] or in flink forward talks [2]. Cloudera also posted two possible scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously compact the many small files into larger ones, at best non blocking and in an occasionally running batch job.

I am now about to implement something like suggested in the cloudera blog [4] but from parquet to parquet. For me, it seems to be not straight forward but rather involved, especially as my data is partitioned in eventtime and I need the compaction to be non blocking (my users query impala and expect near real time performance in each query). When starting the work on that, I noticed that Hive already has a compaction mechanism included and the Flink community works a lot in terms of integrating with hive in the latest releases. Some of my questions are not directly related to Flink, but I guess many of you have also experience with hive and writing from Flink to Hive is rather common nowadays.

I read online that Spark should integrate nicely with Hive tables, i.e. instead of querying HDFS files, querying a hive table has the same performance [5]. We also all know that Impala integrates nicely with Hive so that overall, I can expect writing to Hive internal tables instead of HDFS parquet doesn't have any disadvantages for me.

My questions:
1. Can I use Flink to "streaming write" to Hive?
2. For compaction, I need "transactional tables" and according to the hive docs, transactional tables must be fully managed by hive (i.e., they are not external). Does Flink support writing to those out of the box? (I only have Hive 2 available)
3. Does Flink use the "Hive Streaming Data Ingest" APIs?
4. Do you see any downsides in writing to hive compared to writing to parquet directly? (Especially in my usecase only having impala and spark consumers)
5. Not Flink related: Have you ever experienced performance issues when using hive transactional tables over writing parquet directly? I guess there must be a reason why "transactional" is off by default in Hive? I won't use any features except for compaction, i.e. there are only streaming inserts, no updates, no deletes. (Delete only after given retention and always delete entire partitions)


Best regards
Theo