Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Philip Lee
Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API. In sum up, I am working on some benchmark for flink

I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I work on translation from Hive Query of Benchmark to Flink codes.

As I stuided it, I have a few of questions.

First of all, if there are people who do no know Hive functions, let me briefly explan.
  • ORDER BY: it just guarntees total order in the output.
  • SORT BY: it only guarntess ordering of the rows within a reducer.
  • GROUP BY: this is just groupBy function in SQL.
  • DISTRIBUTE BY: all rows with the same distributed by columns will go to the same reducer.
  • CLUSTER BY: this is just consisted of Distribute By the same column + Sort By the same column.
I just want to check that the flink functions I use are equal to Hive one.
< Hive SQL Query = Flink functions >
  • ORDER BY = sortPartition(,)
  • SORT BY= groupBy(`col).sortPartition(,)
  • GROUP BY: this is just groupBy function.
  • DISTRIBUTE BY = groupBy(`col)
  • CLUSTER BY groupBy(`col).sortPartition(,)
I do not see much difference between groupBy and distributed by if I apply it to flink function.
If this is hadoop version, we could say mapper is distribute by on hadoop. However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to guess groupBy on Flink could be the function which is to distribute the rows by the specified key.  

Please feel free to correct what I suggested.


Secondly, I just want to make sure the difference between reduce function and reduceGroup. I guess there must be a trade-off between two functinos. I know reduceGroup is invoked with an Iterator, but which case is more proper and benifical to use reduceGroup function rather than reduce function?

Best Regards,
Philip

--

==========================================================

Hae Joon Lee


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST 


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) 


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==========================================================

Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Maximilian Michels
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:

> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================
Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Maximilian Michels
Hi Philip,

You're welcome. Just a small correction: Hive's SORT BY should be
DataSet.groupBy(key).sortGroup(key) in Flink. This ensures sorted
grouped records within the reducer that follows. No need to set the
parallelism to 1.

Best,
Max

On Mon, Oct 19, 2015 at 1:28 PM, Philip Lee <[hidden email]> wrote:

> Thanks, Max!
>
> I really appreciate about the way you answered.
> As you remarked on ORDER BY, in order to user this function on flink
> I have to set parallelism to 1 in SortPartiton for a total ordering in on
> place?
>
> I just want to make sure about SORT BY as well.
> As a reminder, this function on Hive is just sorting ordering within a
> reducer.
> If I apply [groupBy] function before [sortPartiton],
> it will be same way of SORT BY? It does not need to set paralleism to 1,
> right?
>
> Best Regards,
> Philip
>
>
> On Mon, Oct 19, 2015 at 1:17 PM, Philip Lee <[hidden email]> wrote:
>>
>> Dear all,
>>
>> Actually, last night I sent the email to flink committer about proper
>> translation from Hive to Flink.
>> I got the answer from flink mailing-list people about it.
>> I am pretty sure these two mails will really help you.
>>
>> I will take a note by following this contents on our google docs.
>> This note will also help big-benchmark people later.
>>
>> Regards,
>> Philip
>>
>>
>>
>> ---------- Forwarded message ----------
>> From: Maximilian Michels <[hidden email]>
>> Date: Mon, Oct 19, 2015 at 1:01 PM
>> Subject: Re: Hi, Flink people, a question about translation from HIVE
>> Query to Flink fucntioin by using Table API
>> To: "[hidden email]" <[hidden email]>
>>
>>
>> Hi Philip,
>>
>> Thank you for your questions. I think you have mapped the HIVE
>> functions to the Flink ones correctly. Just a remark on the ORDER BY.
>> You wrote that it produces a total order of all the records. In this
>> case, you'd have do a SortPartition operation with parallelism set to
>> 1. This is necessary because we need to have all records in one place
>> to perform a sort on them.
>>
>> Considering your reduce question: There is no fundamental
>> advantage/disadvantage of using GroupReduce over Reduce. It depends on
>> your use case which one is more convenient or efficient. For the
>> regular reduce, you just get two elements and produce one. You can't
>> easily keep state between the reduces other than in the value itself.
>> The GroupReduce, on the other hand, may produce none, one, or multiple
>> elements per grouping and keep state in between emitting values. Thus,
>> GroupReduce is a more powerful operator and can be seen as a superset
>> of the Reduce operator. I would advise you to use the one you find
>> easiest to use.
>>
>> Best regards,
>> Max
>>
>> On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:
>> > Hi, Flink people, a question about translation from HIVE Query to Flink
>> > fucntioin by using Table API. In sum up, I am working on some benchmark
>> > for
>> > flink
>> >
>> > I am Philip Lee majoring in Computer Science in Master Degree of TUB. ,
>> > I
>> > work on translation from Hive Query of Benchmark to Flink codes.
>> >
>> > As I stuided it, I have a few of questions.
>> >
>> > First of all, if there are people who do no know Hive functions, let me
>> > briefly explan.
>> >
>> > ORDER BY: it just guarntees total order in the output.
>> > SORT BY: it only guarntess ordering of the rows within a reducer.
>> > GROUP BY: this is just groupBy function in SQL.
>> > DISTRIBUTE BY: all rows with the same distributed by columns will go to
>> > the
>> > same reducer.
>> > CLUSTER BY: this is just consisted of Distribute By the same column +
>> > Sort
>> > By the same column.
>> >
>> > I just want to check that the flink functions I use are equal to Hive
>> > one.
>> > < Hive SQL Query = Flink functions >
>> >
>> > ORDER BY = sortPartition(,)
>> > SORT BY= groupBy(`col).sortPartition(,)
>> > GROUP BY: this is just groupBy function.
>> > DISTRIBUTE BY = groupBy(`col)
>> > CLUSTER BY = groupBy(`col).sortPartition(,)
>> >
>> > I do not see much difference between groupBy and distributed by if I
>> > apply
>> > it to flink function.
>> > If this is hadoop version, we could say mapper is distribute by on
>> > hadoop.
>> > However, I am not much sure what could be DISTRIBUTE BY on flink. I
>> > tried to
>> > guess groupBy on Flink could be the function which is to distribute the
>> > rows
>> > by the specified key.
>> >
>> > Please feel free to correct what I suggested.
>> >
>> >
>> > Secondly, I just want to make sure the difference between reduce
>> > function
>> > and reduceGroup. I guess there must be a trade-off between two
>> > functinos. I
>> > know reduceGroup is invoked with an Iterator, but which case is more
>> > proper
>> > and benifical to use reduceGroup function rather than reduce function?
>> >
>> > Best Regards,
>> > Philip
>> >
>> > --
>> >
>> > ==========================================================
>> >
>> > Hae Joon Lee
>> >
>> >
>> > Now, in Germany,
>> >
>> > M.S. Candidate, Interested in Distributed System, Iterative Processing
>> >
>> > Dept. of Computer Science, Informatik in German, TUB
>> >
>> > Technical University of Berlin
>> >
>> >
>> > In Korea,
>> >
>> > M.S. Candidate, Computer Architecture Laboratory
>> >
>> > Dept. of Computer Science, KAIST
>> >
>> >
>> > Rm# 4414 CS Dept. KAIST
>> >
>> > 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>> >
>> >
>> > Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>> >
>> > ==========================================================
>>
>>
>>
>> --
>>
>> ==========================================================
>>
>> Hae Joon Lee
>>
>>
>> Now, in Germany,
>>
>> M.S. Candidate, Interested in Distributed System, Iterative Processing
>>
>> Dept. of Computer Science, Informatik in German, TUB
>>
>> Technical University of Berlin
>>
>>
>> In Korea,
>>
>> M.S. Candidate, Computer Architecture Laboratory
>>
>> Dept. of Computer Science, KAIST
>>
>>
>> Rm# 4414 CS Dept. KAIST
>>
>> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>>
>>
>> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>>
>> ==========================================================
>
>
>
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================
Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Fabian Hueske-2
In reply to this post by Maximilian Michels
Hi Philip,

here a few additions to what Max said:
- ORDER BY: As Max said, Flink's sortPartition() does only sort with a partition and does not produce a total order. You can either set the parallelism to 1 as Max suggested or use a custom partitioner to range partition the data.
- SORT BY: From your description, the semantics are not 100% clear. If SORT BY refers to the order of tuples WITHIN a reduce function call, it should be groupBy().sortGroup() in Flink instead of sortPartition
- DISTRIBUTE BY: This should be partitionByHash() instead of groupBy(). GroupBy() will also sort the data which is not required for DISTRIBUTE BY.
- CLUSTER BY: This should be partitionByHash().sortPartition().
- Reduce vs. GroupReduce: A ReduceFunction is always combinable. This is optional for GroupReduceFunctions.

Cheers, Fabian



2015-10-19 13:01 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:
> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================

Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Philip Lee
Thanks, Fabian.

I just want to check one thing again. 
As you said, [Distribute By] is partitionByHash(). and [Sort By] should be sortGroup on Flink. However, [Cluster By] is consist of partitionByHash().sortPartition()

As far as I know, [Cluster By] is same as the combination with [Distribute By] + [Sort By]. Therefore, according to your suggestion, should it be partitionByHash() + sortGroup() instead of sortPartition() ?

Or probably I did not still get much difference between Partition and scope within a reduce.

Regards,
Philip

On Mon, Oct 19, 2015 at 2:17 PM, Fabian Hueske <[hidden email]> wrote:
Hi Philip,

here a few additions to what Max said:
- ORDER BY: As Max said, Flink's sortPartition() does only sort with a partition and does not produce a total order. You can either set the parallelism to 1 as Max suggested or use a custom partitioner to range partition the data.
- SORT BY: From your description, the semantics are not 100% clear. If SORT BY refers to the order of tuples WITHIN a reduce function call, it should be groupBy().sortGroup() in Flink instead of sortPartition
- DISTRIBUTE BY: This should be partitionByHash() instead of groupBy(). GroupBy() will also sort the data which is not required for DISTRIBUTE BY.
- CLUSTER BY: This should be partitionByHash().sortPartition().
- Reduce vs. GroupReduce: A ReduceFunction is always combinable. This is optional for GroupReduceFunctions.

Cheers, Fabian



2015-10-19 13:01 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:
> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================




--

==========================================================

Hae Joon Lee


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST 


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) 


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==========================================================

Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Fabian Hueske-2
The difference between a partition and a group is the following:
- A partition refers to all records that are processed by a task instance or sub task. If you use hash partitioning, all elements that share the same key will be in one partition, but usually there will be more than one key in a partition. For example: 1,3,3,3,1,1,1,4,1,1,3,4 would be a partition with keys 1,3, and 4.
- A group is the set of all records that have the same key. The partition in the example above would have three groups, one for each key. Since the records can have more than one field (the key field), you can also sort the records within a group on a different field than the grouping field.

It is not possible to call partitionByHash().sortGroup() because, sortGroup() requires groups which is done by groupBy().

Best, Fabian

2015-10-19 14:31 GMT+02:00 Philip Lee <[hidden email]>:
Thanks, Fabian.

I just want to check one thing again. 
As you said, [Distribute By] is partitionByHash(). and [Sort By] should be sortGroup on Flink. However, [Cluster By] is consist of partitionByHash().sortPartition()

As far as I know, [Cluster By] is same as the combination with [Distribute By] + [Sort By]. Therefore, according to your suggestion, should it be partitionByHash() + sortGroup() instead of sortPartition() ?

Or probably I did not still get much difference between Partition and scope within a reduce.

Regards,
Philip

On Mon, Oct 19, 2015 at 2:17 PM, Fabian Hueske <[hidden email]> wrote:
Hi Philip,

here a few additions to what Max said:
- ORDER BY: As Max said, Flink's sortPartition() does only sort with a partition and does not produce a total order. You can either set the parallelism to 1 as Max suggested or use a custom partitioner to range partition the data.
- SORT BY: From your description, the semantics are not 100% clear. If SORT BY refers to the order of tuples WITHIN a reduce function call, it should be groupBy().sortGroup() in Flink instead of sortPartition
- DISTRIBUTE BY: This should be partitionByHash() instead of groupBy(). GroupBy() will also sort the data which is not required for DISTRIBUTE BY.
- CLUSTER BY: This should be partitionByHash().sortPartition().
- Reduce vs. GroupReduce: A ReduceFunction is always combinable. This is optional for GroupReduceFunctions.

Cheers, Fabian



2015-10-19 13:01 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:
> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================




--

==========================================================

Hae Joon Lee


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST 


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) 


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==========================================================


Reply | Threaded
Open this post in threaded view
|

Re: Hi, Flink people, a question about translation from HIVE Query to Flink fucntioin by using Table API

Philip Lee
Hi, one more simple quesiton about ORDER BY count, item1, item2 in HIVE SQL for flink

1)
in SQL when trying order by 3 columns like the above example, it orders 'count' first then orders 'item1' in each same 'count' then orders item2, right?

in Flink when using flink funtions for it by using sortPartition(), we could not call sortPartition(count),sortPartiton(item1).sortPartition(item2) 
because last sortPartition could be only applied finally. Do you think I have to put groupBy() betwen sort functino to do the above example?
Is there the other function for it?

2)
Plus, I heard via forwarding mailing list that [DISTRIBUTE BY] in HIVE could be partitionByHash() inFlink.
But [DISTRIBUTE BY] is to speard out all rows distributed by columns to the same reducer, whereas partitionByHash() to spread out all rows hashed by columns to the same reducer.
Do you think the result could be same? 
I tried to check how to spread out rows by using [DISTRIBUTE BY] in Hive, but I still did not find the details information.








On Mon, Oct 19, 2015 at 2:40 PM, Fabian Hueske <[hidden email]> wrote:
The difference between a partition and a group is the following:
- A partition refers to all records that are processed by a task instance or sub task. If you use hash partitioning, all elements that share the same key will be in one partition, but usually there will be more than one key in a partition. For example: 1,3,3,3,1,1,1,4,1,1,3,4 would be a partition with keys 1,3, and 4.
- A group is the set of all records that have the same key. The partition in the example above would have three groups, one for each key. Since the records can have more than one field (the key field), you can also sort the records within a group on a different field than the grouping field.

It is not possible to call partitionByHash().sortGroup() because, sortGroup() requires groups which is done by groupBy().

Best, Fabian

2015-10-19 14:31 GMT+02:00 Philip Lee <[hidden email]>:
Thanks, Fabian.

I just want to check one thing again. 
As you said, [Distribute By] is partitionByHash(). and [Sort By] should be sortGroup on Flink. However, [Cluster By] is consist of partitionByHash().sortPartition()

As far as I know, [Cluster By] is same as the combination with [Distribute By] + [Sort By]. Therefore, according to your suggestion, should it be partitionByHash() + sortGroup() instead of sortPartition() ?

Or probably I did not still get much difference between Partition and scope within a reduce.

Regards,
Philip

On Mon, Oct 19, 2015 at 2:17 PM, Fabian Hueske <[hidden email]> wrote:
Hi Philip,

here a few additions to what Max said:
- ORDER BY: As Max said, Flink's sortPartition() does only sort with a partition and does not produce a total order. You can either set the parallelism to 1 as Max suggested or use a custom partitioner to range partition the data.
- SORT BY: From your description, the semantics are not 100% clear. If SORT BY refers to the order of tuples WITHIN a reduce function call, it should be groupBy().sortGroup() in Flink instead of sortPartition
- DISTRIBUTE BY: This should be partitionByHash() instead of groupBy(). GroupBy() will also sort the data which is not required for DISTRIBUTE BY.
- CLUSTER BY: This should be partitionByHash().sortPartition().
- Reduce vs. GroupReduce: A ReduceFunction is always combinable. This is optional for GroupReduceFunctions.

Cheers, Fabian



2015-10-19 13:01 GMT+02:00 Maximilian Michels <[hidden email]>:
Hi Philip,

Thank you for your questions. I think you have mapped the HIVE
functions to the Flink ones correctly. Just a remark on the ORDER BY.
You wrote that it produces a total order of all the records. In this
case, you'd have do a SortPartition operation with parallelism set to
1. This is necessary because we need to have all records in one place
to perform a sort on them.

Considering your reduce question: There is no fundamental
advantage/disadvantage of using GroupReduce over Reduce. It depends on
your use case which one is more convenient or efficient. For the
regular reduce, you just get two elements and produce one. You can't
easily keep state between the reduces other than in the value itself.
The GroupReduce, on the other hand, may produce none, one, or multiple
elements per grouping and keep state in between emitting values. Thus,
GroupReduce is a more powerful operator and can be seen as a superset
of the Reduce operator. I would advise you to use the one you find
easiest to use.

Best regards,
Max

On Sun, Oct 18, 2015 at 9:16 PM, Philip Lee <[hidden email]> wrote:
> Hi, Flink people, a question about translation from HIVE Query to Flink
> fucntioin by using Table API. In sum up, I am working on some benchmark for
> flink
>
> I am Philip Lee majoring in Computer Science in Master Degree of TUB. , I
> work on translation from Hive Query of Benchmark to Flink codes.
>
> As I stuided it, I have a few of questions.
>
> First of all, if there are people who do no know Hive functions, let me
> briefly explan.
>
> ORDER BY: it just guarntees total order in the output.
> SORT BY: it only guarntess ordering of the rows within a reducer.
> GROUP BY: this is just groupBy function in SQL.
> DISTRIBUTE BY: all rows with the same distributed by columns will go to the
> same reducer.
> CLUSTER BY: this is just consisted of Distribute By the same column + Sort
> By the same column.
>
> I just want to check that the flink functions I use are equal to Hive one.
> < Hive SQL Query = Flink functions >
>
> ORDER BY = sortPartition(,)
> SORT BY= groupBy(`col).sortPartition(,)
> GROUP BY: this is just groupBy function.
> DISTRIBUTE BY = groupBy(`col)
> CLUSTER BY = groupBy(`col).sortPartition(,)
>
> I do not see much difference between groupBy and distributed by if I apply
> it to flink function.
> If this is hadoop version, we could say mapper is distribute by on hadoop.
> However, I am not much sure what could be DISTRIBUTE BY on flink. I tried to
> guess groupBy on Flink could be the function which is to distribute the rows
> by the specified key.
>
> Please feel free to correct what I suggested.
>
>
> Secondly, I just want to make sure the difference between reduce function
> and reduceGroup. I guess there must be a trade-off between two functinos. I
> know reduceGroup is invoked with an Iterator, but which case is more proper
> and benifical to use reduceGroup function rather than reduce function?
>
> Best Regards,
> Philip
>
> --
>
> ==========================================================
>
> Hae Joon Lee
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==========================================================




--

==========================================================

Hae Joon Lee


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST 


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) 


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==========================================================





--

==========================================================

Hae Joon Lee


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST 


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) 


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==========================================================