Scala Code Generation

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Scala Code Generation

schultze
Hello,

I am currently working on a compilation unit translating AsterixDB's AQL
into runnable Scala code for Flink's Scala API. During code generation I
discovered some things that are quite hard to work around. I am still
working with Flink version 0.8, so some of the problems I have might
already be fixed in 0.9 and if so please tell me.

First, whenever a record gets projected down to only a single field (e.g.
by a map or reduce function) it is no longer considered a record, but a
variable of the type of that field. If afterwards I want to apply
additional functions like .sum(0) I get an error message like

"Aggregating on field positions is only possible on tuple data types."

This is the same for all functions (like write or join) as the "record" is
no longer considered a dataset.

Second, I found that records longer than 22 fields are not supported.
Whenever I have a record that is longer than that I receive a build error
as

object Tuple23 is not a member of package scala

As I currently try to translate some TPCH queries the datasets might very
well get longer than that, although it is possible to work around that
using dedicated mapping of each dataset before joining them.

Thanks a lot for any help and best regards,
Max Schultze

Reply | Threaded
Open this post in threaded view
|

Re: Scala Code Generation

Ufuk Celebi

> On 13 Oct 2015, at 16:06, [hidden email] wrote:
>
> Hello,
>
> I am currently working on a compilation unit translating AsterixDB's AQL
> into runnable Scala code for Flink's Scala API. During code generation I
> discovered some things that are quite hard to work around. I am still
> working with Flink version 0.8, so some of the problems I have might
> already be fixed in 0.9 and if so please tell me.
>
> First, whenever a record gets projected down to only a single field (e.g.
> by a map or reduce function) it is no longer considered a record, but a
> variable of the type of that field. If afterwards I want to apply
> additional functions like .sum(0) I get an error message like

A workaround is to return Tuple1<X> for this. Then you can run the aggregation. I think that the Tuple0 class has been added after 0.8 though.

> "Aggregating on field positions is only possible on tuple data types."
>
> This is the same for all functions (like write or join) as the "record" is
> no longer considered a dataset.

What do you mean? At least in the current versions, the join projections return a Tuple type as well.

> Second, I found that records longer than 22 fields are not supported.
> Whenever I have a record that is longer than that I receive a build error
> as

Flink’s Tuple classes go up to Tuple25. You can work around this by using a custom PoJo type, e.g.

class TPCHRecord {
    public int f0;
    ...
    public int f99;
}

If possible, I would suggest to update to the latest 0.9 or the upcoming 0.10 release. A lot of stuff has been fixed since 0.8. I think it will be worth it. If you encounter any problems while doing this, feel free to ask here. :)

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Scala Code Generation

Till Rohrmann

If you're using Scala, then you're bound to a maximum of 22 fields in a tuple, because the Scala library does not provide larger tuples. You could generate your own case classes which have more than the 22 fields, though.

On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <[hidden email]> wrote:

> On 13 Oct 2015, at 16:06, [hidden email] wrote:
>
> Hello,
>
> I am currently working on a compilation unit translating AsterixDB's AQL
> into runnable Scala code for Flink's Scala API. During code generation I
> discovered some things that are quite hard to work around. I am still
> working with Flink version 0.8, so some of the problems I have might
> already be fixed in 0.9 and if so please tell me.
>
> First, whenever a record gets projected down to only a single field (e.g.
> by a map or reduce function) it is no longer considered a record, but a
> variable of the type of that field. If afterwards I want to apply
> additional functions like .sum(0) I get an error message like

A workaround is to return Tuple1<X> for this. Then you can run the aggregation. I think that the Tuple0 class has been added after 0.8 though.

> "Aggregating on field positions is only possible on tuple data types."
>
> This is the same for all functions (like write or join) as the "record" is
> no longer considered a dataset.

What do you mean? At least in the current versions, the join projections return a Tuple type as well.

> Second, I found that records longer than 22 fields are not supported.
> Whenever I have a record that is longer than that I receive a build error
> as

Flink’s Tuple classes go up to Tuple25. You can work around this by using a custom PoJo type, e.g.

class TPCHRecord {
    public int f0;
    ...
    public int f99;
}

If possible, I would suggest to update to the latest 0.9 or the upcoming 0.10 release. A lot of stuff has been fixed since 0.8. I think it will be worth it. If you encounter any problems while doing this, feel free to ask here. :)

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Scala Code Generation

schultze
Thanks a lot for the help.

I was able to apply the Tuple1 functionality to fix my problem. I also
moved up to Flink 0.9.

However I have another problem executing generated Scala programs. It
seems like a Scala program executed with a Flink 0.9 Job Manager only has
a limited amount of usable operators. I use the Flink quickstart package
to generate executable .jar files (using mvn clean package). The following
is a simple example program generated by my compiler from a rewritten AQL
query of TPCH query Q6. Whenever I pack it into a .jar file and try to
execute it using a local job manager, I get a "Class not found"-error,
however when I remove any of the operators it works just fine. I also ran
the example within eclipse using the old Flink 0.8 quickstart package.
Interestingly it worked fine there, too, no matter how many operators I
used. Does the Scala environment in Flink 0.9 indeed only have a limited
amount of usable operators? Is this a configuration issue and it is
possible to increase that number?

This is the Query I ran:

import org.apache.flink.api.scala._
import org.apache.flink.api.java.aggregation

object Job {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val $l =
env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl",
"\n", "|")
    val val0 = $l.filter( x => x._11 >= "1994-01-01")
    val val1 = val0.filter( x => x._11 < "1995-01-01")
    val val2 = val1.filter( x => x._7 >= -0.01)
    val val3 = val2.filter( x => x._7 < 0.01)
    val val4 = val3.filter( x => x._5 < 24)
    val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9,
x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) }
        .sum(14)
    val val6 = val5.map{ x => Tuple1(x._15) }
        .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|")

    env.execute("Flink Scala API parsed AQL Query")
  }
}

Thanks a lot for any help!
Best regards,
Max Schultze



> If you're using Scala, then you're bound to a maximum of 22 fields in a
> tuple, because the Scala library does not provide larger tuples. You could
> generate your own case classes which have more than the 22 fields, though.
> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <[hidden email]> wrote:
>
>>
>> > On 13 Oct 2015, at 16:06, [hidden email] wrote:
>> >
>> > Hello,
>> >
>> > I am currently working on a compilation unit translating AsterixDB's
>> AQL
>> > into runnable Scala code for Flink's Scala API. During code generation
>> I
>> > discovered some things that are quite hard to work around. I am still
>> > working with Flink version 0.8, so some of the problems I have might
>> > already be fixed in 0.9 and if so please tell me.
>> >
>> > First, whenever a record gets projected down to only a single field
>> (e.g.
>> > by a map or reduce function) it is no longer considered a record, but
>> a
>> > variable of the type of that field. If afterwards I want to apply
>> > additional functions like .sum(0) I get an error message like
>>
>> A workaround is to return Tuple1<X> for this. Then you can run the
>> aggregation. I think that the Tuple0 class has been added after 0.8
>> though.
>>
>> > "Aggregating on field positions is only possible on tuple data types."
>> >
>> > This is the same for all functions (like write or join) as the
>> "record"
>> is
>> > no longer considered a dataset.
>>
>> What do you mean? At least in the current versions, the join projections
>> return a Tuple type as well.
>>
>> > Second, I found that records longer than 22 fields are not supported.
>> > Whenever I have a record that is longer than that I receive a build
>> error
>> > as
>>
>> Flink’s Tuple classes go up to Tuple25. You can work around this by
>> using
>> a custom PoJo type, e.g.
>>
>> class TPCHRecord {
>>     public int f0;
>>     ...
>>     public int f99;
>> }
>>
>> If possible, I would suggest to update to the latest 0.9 or the upcoming
>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will
>> be
>> worth it. If you encounter any problems while doing this, feel free to
>> ask
>> here. :)
>>
>> – Ufuk
>


Reply | Threaded
Open this post in threaded view
|

Re: Scala Code Generation

schultze
I was able to reproduce the error with some more queries by now. However
it seems like it is only a problem for Flink's local mode. During cluster
execution everything works just fine.

Regards, Max


> Thanks a lot for the help.
>
> I was able to apply the Tuple1 functionality to fix my problem. I also
> moved up to Flink 0.9.
>
> However I have another problem executing generated Scala programs. It
> seems like a Scala program executed with a Flink 0.9 Job Manager only has
> a limited amount of usable operators. I use the Flink quickstart package
> to generate executable .jar files (using mvn clean package). The following
> is a simple example program generated by my compiler from a rewritten AQL
> query of TPCH query Q6. Whenever I pack it into a .jar file and try to
> execute it using a local job manager, I get a "Class not found"-error,
> however when I remove any of the operators it works just fine. I also ran
> the example within eclipse using the old Flink 0.8 quickstart package.
> Interestingly it worked fine there, too, no matter how many operators I
> used. Does the Scala environment in Flink 0.9 indeed only have a limited
> amount of usable operators? Is this a configuration issue and it is
> possible to increase that number?
>
> This is the Query I ran:
>
> import org.apache.flink.api.scala._
> import org.apache.flink.api.java.aggregation
>
> object Job {
>   def main(args: Array[String]) {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>
>     val $l =
> env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl",
> "\n", "|")
>     val val0 = $l.filter( x => x._11 >= "1994-01-01")
>     val val1 = val0.filter( x => x._11 < "1995-01-01")
>     val val2 = val1.filter( x => x._7 >= -0.01)
>     val val3 = val2.filter( x => x._7 < 0.01)
>     val val4 = val3.filter( x => x._5 < 24)
>     val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9,
> x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) }
>         .sum(14)
>     val val6 = val5.map{ x => Tuple1(x._15) }
>         .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|")
>
>     env.execute("Flink Scala API parsed AQL Query")
>   }
> }
>
> Thanks a lot for any help!
> Best regards,
> Max Schultze
>
>
>
>> If you're using Scala, then you're bound to a maximum of 22 fields in a
>> tuple, because the Scala library does not provide larger tuples. You
>> could
>> generate your own case classes which have more than the 22 fields,
>> though.
>> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <[hidden email]> wrote:
>>
>>>
>>> > On 13 Oct 2015, at 16:06, [hidden email] wrote:
>>> >
>>> > Hello,
>>> >
>>> > I am currently working on a compilation unit translating AsterixDB's
>>> AQL
>>> > into runnable Scala code for Flink's Scala API. During code
>>> generation
>>> I
>>> > discovered some things that are quite hard to work around. I am still
>>> > working with Flink version 0.8, so some of the problems I have might
>>> > already be fixed in 0.9 and if so please tell me.
>>> >
>>> > First, whenever a record gets projected down to only a single field
>>> (e.g.
>>> > by a map or reduce function) it is no longer considered a record, but
>>> a
>>> > variable of the type of that field. If afterwards I want to apply
>>> > additional functions like .sum(0) I get an error message like
>>>
>>> A workaround is to return Tuple1<X> for this. Then you can run the
>>> aggregation. I think that the Tuple0 class has been added after 0.8
>>> though.
>>>
>>> > "Aggregating on field positions is only possible on tuple data
>>> types."
>>> >
>>> > This is the same for all functions (like write or join) as the
>>> "record"
>>> is
>>> > no longer considered a dataset.
>>>
>>> What do you mean? At least in the current versions, the join
>>> projections
>>> return a Tuple type as well.
>>>
>>> > Second, I found that records longer than 22 fields are not supported.
>>> > Whenever I have a record that is longer than that I receive a build
>>> error
>>> > as
>>>
>>> Flink’s Tuple classes go up to Tuple25. You can work around this by
>>> using
>>> a custom PoJo type, e.g.
>>>
>>> class TPCHRecord {
>>>     public int f0;
>>>     ...
>>>     public int f99;
>>> }
>>>
>>> If possible, I would suggest to update to the latest 0.9 or the
>>> upcoming
>>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will
>>> be
>>> worth it. If you encounter any problems while doing this, feel free to
>>> ask
>>> here. :)
>>>
>>> – Ufuk
>>
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Scala Code Generation

Theodore Vasiloudis
In reply to this post by Till Rohrmann
You could generate your own case classes which have more than the 22 fields, though.

Actually that is not possible with case classes in Scala 2.10, you would have to use a normal class if you have more than 22 fields.
This constraint was removed in 2.11.

On Wed, Oct 14, 2015 at 11:42 AM, Till Rohrmann <[hidden email]> wrote:

If you're using Scala, then you're bound to a maximum of 22 fields in a tuple, because the Scala library does not provide larger tuples. You could generate your own case classes which have more than the 22 fields, though.

On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <[hidden email]> wrote:

> On 13 Oct 2015, at 16:06, [hidden email] wrote:
>
> Hello,
>
> I am currently working on a compilation unit translating AsterixDB's AQL
> into runnable Scala code for Flink's Scala API. During code generation I
> discovered some things that are quite hard to work around. I am still
> working with Flink version 0.8, so some of the problems I have might
> already be fixed in 0.9 and if so please tell me.
>
> First, whenever a record gets projected down to only a single field (e.g.
> by a map or reduce function) it is no longer considered a record, but a
> variable of the type of that field. If afterwards I want to apply
> additional functions like .sum(0) I get an error message like

A workaround is to return Tuple1<X> for this. Then you can run the aggregation. I think that the Tuple0 class has been added after 0.8 though.

> "Aggregating on field positions is only possible on tuple data types."
>
> This is the same for all functions (like write or join) as the "record" is
> no longer considered a dataset.

What do you mean? At least in the current versions, the join projections return a Tuple type as well.

> Second, I found that records longer than 22 fields are not supported.
> Whenever I have a record that is longer than that I receive a build error
> as

Flink’s Tuple classes go up to Tuple25. You can work around this by using a custom PoJo type, e.g.

class TPCHRecord {
    public int f0;
    ...
    public int f99;
}

If possible, I would suggest to update to the latest 0.9 or the upcoming 0.10 release. A lot of stuff has been fixed since 0.8. I think it will be worth it. If you encounter any problems while doing this, feel free to ask here. :)

– Ufuk