Hi there, Currently, I have a Flink 1.11 job which writes parquet files via the StreamingFileSink to HDFS (simply using DataStream API). I commit like every 3 minutes and thus have many small files in HDFS. Downstream, the generated table is consumed from Spark Jobs and Impala queries. HDFS doesn't like to have too many small files and writing to parquet fast but also desiring large files is a rather common problem and solutions were suggested like recently in the mailing list [1] or in flink forward talks [2]. Cloudera also posted two possible scenarios in their blog posts [3], [4]. Mostly, it comes down to asynchronously compact the many small files into larger ones, at best non blocking and in an occasionally running batch job. I am now about to implement something like suggested in the cloudera blog [4] but from parquet to parquet. For me, it seems to be not straight forward but rather involved, especially as my data is partitioned in eventtime and I need the compaction to be non blocking (my users query impala and expect near real time performance in each query). When starting the work on that, I noticed that Hive already has a compaction mechanism included and the Flink community works a lot in terms of integrating with hive in the latest releases. Some of my questions are not directly related to Flink, but I guess many of you have also experience with hive and writing from Flink to Hive is rather common nowadays. I read online that Spark should integrate nicely with Hive tables, i.e. instead of querying HDFS files, querying a hive table has the same performance [5]. We also all know that Impala integrates nicely with Hive so that overall, I can expect writing to Hive internal tables instead of HDFS parquet doesn't have any disadvantages for me. My questions: 1. Can I use Flink to "streaming write" to Hive? 2. For compaction, I need "transactional tables" and according to the hive docs, transactional tables must be fully managed by hive (i.e., they are not external). Does Flink support writing to those out of the box? (I only have Hive 2 available) 3. Does Flink use the "Hive Streaming Data Ingest" APIs? 4. Do you see any downsides in writing to hive compared to writing to parquet directly? (Especially in my usecase only having impala and spark consumers) 5. Not Flink related: Have you ever experienced performance issues when using hive transactional tables over writing parquet directly? I guess there must be a reason why "transactional" is off by default in Hive? I won't use any features except for compaction, i.e. there are only streaming inserts, no updates, no deletes. (Delete only after given retention and always delete entire partitions) Best regards Theo |
Hi, I know Jingsong worked on Flink/Hive filesystem integration in the Table/SQL API. Maybe he can shed some light on your questions. Best, Dawid On 02/03/2021 21:03, Theo Diefenthal
wrote:
OpenPGP_signature (855 bytes) Download Attachment |
What about using Apache Hudi o Apache Iceberg? On Thu, Mar 4, 2021 at 10:15 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Hi Theo, Regarding your first 2 questions, the answer is yes Flink supports streaming write to Hive. And Flink also supports automatically compacting small files during streaming write [1]. (Hive and Filesystem shared the same mechanism to do compaction, we forgot to add a dedicated document for hive.) And you don't need the hive transaction table for this because Flink will compact all the small files _before_ commit the files or partition to hive. From hive's perspective, the written files are already large files. I think this might address most of your confusions and let me know if you have further questions. Best, Kurt On Mon, Mar 15, 2021 at 5:05 PM Flavio Pompermaier <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |