Hi all, Along with the community's effort, inside Alibaba we have explored Flink's potential as an execution engine not just for stream processing but also for batch processing. We are encouraged by our findings and have initiated our effort to make Flink's SQL capabilities full-fledged. When comparing what's available in Flink to the offerings from competitive data processing engines, we identified a major gap in Flink: a well integration with Hive ecosystem. This is crucial to the success of Flink SQL and batch due to the well-established data ecosystem around Hive. Therefore, we have done some initial work along this direction but there are still a lot of effort needed. We have two strategies in mind. The first one is to make Flink SQL full-fledged and well-integrated with Hive ecosystem. This is a similar approach to what Spark SQL adopted. The second strategy is to make Hive itself work with Flink, similar to the proposal in [1]. Each approach bears its pros and cons, but they don’t need to be mutually exclusive with each targeting at different users and use cases. We believe that both will promote a much greater adoption of Flink beyond stream processing. We have been focused on the first approach and would like to showcase Flink's batch and SQL capabilities with Flink SQL. However, we have also planned to start strategy #2 as the follow-up effort. I'm completely new to Flink(, with a short bio [2] below), though many of my colleagues here at Alibaba are long-time contributors. Nevertheless, I'd like to share our thoughts and invite your early feedback. At the same time, I am working on a detailed proposal on Flink SQL's integration with Hive ecosystem, which will be also shared when ready. While the ideas are simple, each approach will demand significant effort, more than what we can afford. Thus, the input and contributions from the communities are greatly welcome and appreciated. Regards, Xuefu References: [2] Xuefu Zhang is a long-time open source veteran, worked or working on many projects under Apache Foundation, of which he is also an honored member. About 10 years ago he worked in the Hadoop team at Yahoo where the projects just got started. Later he worked at Cloudera, initiating and leading the development of Hive on Spark project in the communities and across many organizations. Prior to joining Alibaba, he worked at Uber where he promoted Hive on Spark to all Uber's SQL on Hadoop workload and significantly improved Uber's cluster efficiency. |
Hi Xuefu, Welcome to the Flink community and thanks for starting this discussion! Better Hive integration would be really great! Can you go into details of what you are proposing? I can think of a couple ways to improve Flink in that regard: * Support for Hive UDFs * Support for Hive metadata catalog * Support for HiveQL syntax * ??? Best, Fabian Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu <[hidden email]>: Hi all, |
Hi Xuefu, Appreciate this proposal, and like Fabian, it would look better if you can give more details of the plan. Thanks, vino. Fabian Hueske <[hidden email]> 于2018年10月10日周三 下午5:27写道:
|
Hi Fabian/Vno, Thank you very much for your encouragement inquiry. Sorry that I didn't see Fabian's email until I read Vino's response just now. (Somehow Fabian's went to the spam folder.) My proposal contains long-term and short-terms goals. Nevertheless, the effort will focus on the following areas, including Fabian's list: 1. Hive metastore connectivity - This covers both read/write access, which means Flink can make full use of Hive's metastore as its catalog (at least for the batch but can extend for streaming as well). 2. Metadata compatibility - Objects (databases, tables, partitions, etc) created by Hive can be understood by Flink and the reverse direction is true also. 3. Data compatibility - Similar to #2, data produced by Hive can be consumed by Flink and vise versa. 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its own implementation or make Hive's implementation work in Flink. Further, for user created UDFs in Hive, Flink SQL should provide a mechanism allowing user to import them into Flink without any code change required. 5. Data types - Flink SQL should support all data types that are available in Hive. 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with extension to support Hive's syntax and language features, around DDL, DML, and SELECT queries. 7. SQL CLI - this is currently developing in Flink but more effort is needed. 8. Server - provide a server that's compatible with Hive's HiverServer2 in thrift APIs, such that HiveServer2 users can reuse their existing client (such as beeline) but connect to Flink's thrift server instead. 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other application to use to connect to its thrift server 10. Support other user's customizations in Hive, such as Hive Serdes, storage handlers, etc. 11. Better task failure tolerance and task scheduling at Flink runtime. As you can see, achieving all those requires significant effort and across all layers in Flink. However, a short-term goal could include only core areas (such as 1, 2, 4, 5, 6, 7) or start at a smaller scope (such as #3, #6). Please share your further thoughts. If we generally agree that this is the right direction, I could come up with a formal proposal quickly and then we can follow up with broader discussions. Thanks, Xuefu
|
Would it maybe make sense to provide Flink as an engine on Hive („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely coupled than integrating hive in all possible flink core modules and thus introducing a very tight dependency to Hive in the core. 1,2,3 could be achieved via a connector based on the Flink Table API. Just as a proposal to start this Endeavour as independent projects (hive engine, connector) to avoid too tight coupling with Flink. Maybe in a more distant future if the Hive integration is heavily demanded one could then integrate it more tightly if needed.
|
Hi Xuefu,
thanks for your proposal, it is a nice summary. Here are my thoughts to your list: 1. I think this is also on our current mid-term roadmap. Flink lacks a poper catalog support for a very long time. Before we can connect catalogs we need to define how to map all the information from a catalog to Flink's representation. This is why the work on the unified connector API [1] is going on for quite some time as it is the first approach to discuss and represent the pure characteristics of connectors. 2. It would be helpful to figure out what is missing in [1] to to ensure this point. I guess we will need a new design document just for a proper Hive catalog integration. 3. This is already work in progress. ORC has been merged, Parquet is on its way [1]. 4. This should be easy. There was a PR in past that I reviewed but was not maintained anymore. 5. The type system of Flink SQL is very flexible. Only UNION type is missing. 6. A Flink SQL DDL is on the roadmap soon once we are done with [1]. Support for Hive syntax also needs cooperation with Apache Calcite. 7-11. Long-term goals. I would also propose to start with a smaller scope where also current Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the Flink SQL ecosystem. After that we can aim to be fully compatible including syntax and UDFs (4, 6 etc.). Once the core is ready, we can work on the tooling (7, 8, 9) and performance (10, 11). @Jörn: Yes, we should not have a tight dependency on Hive. It should be treated as one "connector" system out of many. Thanks, Timo [1] https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4# [2] https://github.com/apache/flink/pull/6483 Am 11.10.18 um 07:54 schrieb Jörn Franke: > Would it maybe make sense to provide Flink as an engine on Hive („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely coupled than integrating hive in all possible flink core modules and thus introducing a very tight dependency to Hive in the core. > 1,2,3 could be achieved via a connector based on the Flink Table API. > Just as a proposal to start this Endeavour as independent projects (hive engine, connector) to avoid too tight coupling with Flink. Maybe in a more distant future if the Hive integration is heavily demanded one could then integrate it more tightly if needed. > > What is meant by 11? >> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu <[hidden email]>: >> >> Hi Fabian/Vno, >> >> Thank you very much for your encouragement inquiry. Sorry that I didn't see Fabian's email until I read Vino's response just now. (Somehow Fabian's went to the spam folder.) >> >> My proposal contains long-term and short-terms goals. Nevertheless, the effort will focus on the following areas, including Fabian's list: >> >> 1. Hive metastore connectivity - This covers both read/write access, which means Flink can make full use of Hive's metastore as its catalog (at least for the batch but can extend for streaming as well). >> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) created by Hive can be understood by Flink and the reverse direction is true also. >> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed by Flink and vise versa. >> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its own implementation or make Hive's implementation work in Flink. Further, for user created UDFs in Hive, Flink SQL should provide a mechanism allowing user to import them into Flink without any code change required. >> 5. Data types - Flink SQL should support all data types that are available in Hive. >> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with extension to support Hive's syntax and language features, around DDL, DML, and SELECT queries. >> 7. SQL CLI - this is currently developing in Flink but more effort is needed. >> 8. Server - provide a server that's compatible with Hive's HiverServer2 in thrift APIs, such that HiveServer2 users can reuse their existing client (such as beeline) but connect to Flink's thrift server instead. >> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other application to use to connect to its thrift server >> 10. Support other user's customizations in Hive, such as Hive Serdes, storage handlers, etc. >> 11. Better task failure tolerance and task scheduling at Flink runtime. >> >> As you can see, achieving all those requires significant effort and across all layers in Flink. However, a short-term goal could include only core areas (such as 1, 2, 4, 5, 6, 7) or start at a smaller scope (such as #3, #6). >> >> Please share your further thoughts. If we generally agree that this is the right direction, I could come up with a formal proposal quickly and then we can follow up with broader discussions. >> >> Thanks, >> Xuefu >> >> >> >> ------------------------------------------------------------------ >> Sender:vino yang <[hidden email]> >> Sent at:2018 Oct 11 (Thu) 09:45 >> Recipient:Fabian Hueske <[hidden email]> >> Cc:dev <[hidden email]>; Xuefu <[hidden email]>; user <[hidden email]> >> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem >> >> Hi Xuefu, >> >> Appreciate this proposal, and like Fabian, it would look better if you can give more details of the plan. >> >> Thanks, vino. >> >> Fabian Hueske <[hidden email]> 于2018年10月10日周三 下午5:27写道: >> Hi Xuefu, >> >> Welcome to the Flink community and thanks for starting this discussion! Better Hive integration would be really great! >> Can you go into details of what you are proposing? I can think of a couple ways to improve Flink in that regard: >> >> * Support for Hive UDFs >> * Support for Hive metadata catalog >> * Support for HiveQL syntax >> * ??? >> >> Best, Fabian >> >> Am Di., 9. Okt. 2018 um 19:22 Uhr schrieb Zhang, Xuefu <[hidden email]>: >> Hi all, >> >> Along with the community's effort, inside Alibaba we have explored Flink's potential as an execution engine not just for stream processing but also for batch processing. We are encouraged by our findings and have initiated our effort to make Flink's SQL capabilities full-fledged. When comparing what's available in Flink to the offerings from competitive data processing engines, we identified a major gap in Flink: a well integration with Hive ecosystem. This is crucial to the success of Flink SQL and batch due to the well-established data ecosystem around Hive. Therefore, we have done some initial work along this direction but there are still a lot of effort needed. >> >> We have two strategies in mind. The first one is to make Flink SQL full-fledged and well-integrated with Hive ecosystem. This is a similar approach to what Spark SQL adopted. The second strategy is to make Hive itself work with Flink, similar to the proposal in [1]. Each approach bears its pros and cons, but they don’t need to be mutually exclusive with each targeting at different users and use cases. We believe that both will promote a much greater adoption of Flink beyond stream processing. >> >> We have been focused on the first approach and would like to showcase Flink's batch and SQL capabilities with Flink SQL. However, we have also planned to start strategy #2 as the follow-up effort. >> >> I'm completely new to Flink(, with a short bio [2] below), though many of my colleagues here at Alibaba are long-time contributors. Nevertheless, I'd like to share our thoughts and invite your early feedback. At the same time, I am working on a detailed proposal on Flink SQL's integration with Hive ecosystem, which will be also shared when ready. >> >> While the ideas are simple, each approach will demand significant effort, more than what we can afford. Thus, the input and contributions from the communities are greatly welcome and appreciated. >> >> Regards, >> >> >> Xuefu >> >> References: >> >> [1] https://issues.apache.org/jira/browse/HIVE-10712 >> [2] Xuefu Zhang is a long-time open source veteran, worked or working on many projects under Apache Foundation, of which he is also an honored member. About 10 years ago he worked in the Hadoop team at Yahoo where the projects just got started. Later he worked at Cloudera, initiating and leading the development of Hive on Spark project in the communities and across many organizations. Prior to joining Alibaba, he worked at Uber where he promoted Hive on Spark to all Uber's SQL on Hadoop workload and significantly improved Uber's cluster efficiency. >> >> |
In reply to this post by Jörn Franke
Hi Jörn, Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact it is one of the two approaches that I named in the beginning of the thread. As also pointed out there, this isn't mutually exclusive from work we proposed inside Flink and they target at different user groups and user cases. Further, what we proposed to do in Flink should be a good showcase that demonstrate Flink's capabilities in batch processing and convince Hive community of the worth of a new engine. As you might know, the idea encountered some doubt and resistance. Nevertheless, we do have a solid plan for Hive on Flink, which we will execute once Flink SQL is in a good shape. I also agree with you that Flink SQL shouldn't be closely coupled with Hive. While we mentioned Hive in many of the proposed items, most of them are coupled only in concepts and functionality rather than code or libraries. We are taking the advantage of the connector framework in Flink. The only thing that might be exceptional is to support Hive built-in UDFs, which we may not make it work out of the box to avoid the coupling. We could, for example, require users bring in Hive library and register themselves. This is subject to further discussion. #11 is about Flink runtime enhancement that is meant to make task failures more tolerable (so that the job don't have to start from the beginning in case of task failures) and to make task scheduling more resource-efficient. Flink's current design in those two aspects leans more to stream processing, which may not be good enough for batch processing. We will provide more detailed design when we get to them. Please let me know if you have further thoughts or feedback. Thanks, Xuefu
|
In reply to this post by Timo Walther
Hi Xuefu, Thanks for putting together the overview. I would like to add some more on top of Timo's comments. 1,2. I agree with Timo that a proper catalog support should also address the metadata compatibility issues. I was actually wondering if you are referring to something like utilizing table stats for plan optimization? 4. If the key is to have users integrate Hive UDF without code changes to Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern mostly on the support of Hive UDFs that should be implemented in Flink-table natively? 7,8. Correct me if I am wrong, but I feel like some of the related components might have already been discussed in the longer term road map of FLIP-24 [1]? 9. per Jorn's comment to stay clear from a tight dependency on Hive and treat it as one "connector" system. Should we also consider treating JDBC/ODBC driver as part of the component from the connector system instead of having Flink to provide them? Rong On Thu, Oct 11, 2018 at 12:46 AM Timo Walther <[hidden email]> wrote: Hi Xuefu, |
In reply to this post by Timo Walther
Hi Timo, Thank you for your input. It's exciting to see that the community has already initiated some of the topics. We'd certainly like to leverage the current and previous work and make progress in phases. Here I'd like to comment on a few things on top of your feedback. 1. I think there are two aspects #1 and #2 with regard to Hive metastore: a) as an backend storage for Flink's metadata (currently in memory), and b) an external catalog (just like a JDBC catalog) that Flink can interact with. While it may be possible and would be nice if we can achieve both in a single design, our focus has been on the latter. We will consider both cases in our design. 2. Re #5, I agree that Flink seems having the majority of data types. However, supporting some of them (such as struct) at SQL layer needs work on the parser (Calcite). 3. Similarly for #6, work needs to be done on parsing side. We can certainly ask Calcite community to provide Hive dialect parsing. This can be challenging and time-consuming. At the same time, we can also explore the possibilities of solving the problem in Flink, such as using Calcite's official extension mechanism. We will open the discussion when we get there. Yes, I agree with you that we should start with a small scope while keeping a forward thinking. Specifically, we will first look at the metadata and data compatibilities, data types, DDL/DML, Query, UDFs, and so on. I think we align well on this. Please let me know if you have further thoughts or commends. Thanks, Xuefu
|
In reply to this post by Rong Rong
Hi Rong, Thanks for your feedback. Some of my earlier comments might have addressed some of your points, so here I'd like to cover some specifics. 1. Yes, I expect that table stats stored in Hive will be used in Flink plan optimization, but it's not part of compatibility concern (yet). 2. Both implementing Hive UDFs in Flink natively and making Hive UDFs work in Flink are considered. 3. I am aware of FLIP-24, but here the proposal is to make remote server compatible with HiveServer2. They are not mutually exclusive either. 4. The JDBC/ODBC driver in question is for the remote server that Flink provides. It's usually the servicer owner who provides drivers to their services. We weren't talking about JDBC/ODBC driver to external DB systems. Let me know if you have further questions. Thanks, Xuefu
|
I think integrating Flink with Hive would be an amazing option and also to get Flink's SQL up to pace would be amazing.
Current Flink Sql syntax to prepare and process a table is too verbose, users manually need to retype table definitions and that's a pain. Hive metastore integration should be done through, many users are okay defining their table schemas in Hive as it is easy to main, change or even migrate. Also we could simply choosing batch and stream there with simply something like a "process as" clause. select count(*) from flink_mailing_list process as stream; select count(*) from flink_mailing_list process as batch; Thanks, Taher Koitawala Integrating On Fri 12 Oct, 2018, 2:35 AM Zhang, Xuefu, <[hidden email]> wrote:
|
One other thought on the same lines was to use hive tables to store kafka information to process streaming tables. Something like "create table streaming_table ( bootstrapServers string, topic string, keySerialiser string, ValueSerialiser string)" Insert into streaming_table values(,"10.17.1.1:9092,10.17.2.2:9092,10.17.3.3:9092", "KafkaTopicName", "SimpleStringSchema", "SimpleSchemaString"); Create table processingtable( //Enter fields here which match the kafka records schema); Now we make a custom clause called something like "using" The way we use this is: Using streaming_table as configuration select count(*) from processingtable as streaming; This way users can now pass Flink SQL info easily and get rid of the Flink SQL configuration file all together. This is simple and easy to understand and I think most users would follow this. Thanks, Taher Koitawala On Fri 12 Oct, 2018, 7:24 AM Taher Koitawala, <[hidden email]> wrote:
|
Hi Taher, Thank you for your input. I think you emphasized two important points: 1. Hive metastore could be used for storing Flink metadata 2. There are some usability issues around Flink SQL configuration I think we all agree on #1. #2 may be well true and the usability should be improved. However, I'm afraid that this is orthogonal to Hive integration and the proposed solution might be just one of the possible solutions. On the surface, the extensions you proposed seem going beyond the syntax and semantics of SQL language in general. I don't disagree on the value of your proposal. I guess it's better to solve #1 first and leave #2 for follow-up discussions. How does this sound to you? Thanks, Xuefu
|
Sounds smashing; I think the initial integration will help 60% or so flink sql users and a lot other use cases will emerge when we solve the first one.
Thanks, Taher Koitawala On Fri 12 Oct, 2018, 10:13 AM Zhang, Xuefu, <[hidden email]> wrote:
|
In reply to this post by Zhang, Xuefu
Thank you very nice , I fully agree with that.
|
Thank you Xuefu, for bringing up this awesome, detailed proposal! It will resolve lots of existing pain for users like me. In general, I totally agree that improving FlinkSQL's completeness would be a much better start point than building 'Hive on Flink', as the Hive community is concerned about Flink's SQL incompleteness and lack of proven batch performance shown in https://issues.apache.org/jira/browse/HIVE-10712. Improving FlinkSQL seems a more natural direction to start with in order to achieve the integration. Xuefu and Timo has laid a quite clear path of what to tackle next. Given that there're already some efforts going on, for item 1,2,5,3,4,6 in Xuefu's list, shall we:
It's gonna be a great and influential project , and I'd love to participate into it to move FlinkSQL's adoption and ecosystem even further. Thanks, Bowen
|
In reply to this post by Zhang, Xuefu
Welcome to the community and thanks for the great proposal, Xuefu! I think the proposal can be divided into 2 stages: making Flink to support Hive features, and make Hive to work with Flink. I agreed with Timo that on starting with a smaller scope, so we can make progress faster. As for [6], a proposal for DDL is already in progress, and will come after the unified SQL connector API is done. For supporting Hive syntax, we might need to work with the Calcite community, and a recent effort called babel (https://issues.apache.org/jira/browse/CALCITE-2280) in Calcite might help here. Thanks Shuyi On Wed, Oct 10, 2018 at 8:02 PM Zhang, Xuefu <[hidden email]> wrote:
"So you have to trust that the dots will somehow connect in your future."
|
In reply to this post by phoenixjiangnan
Hi Bowen, Thank you for your feedback and interest in the project. Your contribution is certainly welcome. Per your suggestion, I have created an Uber JIRA (https://issues.apache.org/jira/browse/FLINK-10556) to track our overall effort on this. For each subtask, we'd like to see a short description on the status quo and what is planned to add or change. Design doc should be provided when it's deemed necessary. I'm looking forward to seeing your contributions! Thanks, Xuefu Thanks, Xuefu
|
In reply to this post by Shuyi Chen
Hi Shuyi, Thank you for your input. Yes, I agreed with a phased approach and like to move forward fast. :) We did some work internally on DDL utilizing babel parser in Calcite. While babel makes Calcite's grammar extensible, at first impression it still seems too cumbersome for a project when too much extensions are made. It's even challenging to find where the extension is needed! It would be certainly better if Calcite can magically support Hive QL by just turning on a flag, such as that for MYSQL_5. I can also see that this could mean a lot of work on Calcite. Nevertheless, I will bring up the discussion over there and to see what their community thinks. Would mind to share more info about the proposal on DDL that you mentioned? We can certainly collaborate on this. Thanks, Xuefu
|
Hi all, To wrap up the discussion, I have attached a PDF describing the proposal, which is also attached to FLINK-10556 [1]. Please feel free to watch that JIRA to track the progress. Please also let me know if you have additional comments or questions. Thanks, Xuefu [1] https://issues.apache.org/jira/browse/FLINK-10556
|
Free forum by Nabble | Edit this page |