Bug while using Table API

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Bug while using Table API

Simone Robutti
Hello,

while trying my first job using the Table API I got blocked by this error:

Exception in thread "main" java.lang.NoSuchFieldError: RULES
at org.apache.flink.api.table.plan.rules.FlinkRuleSets$.<init>(FlinkRuleSets.scala:148)
at org.apache.flink.api.table.plan.rules.FlinkRuleSets$.<clinit>(FlinkRuleSets.scala)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:212)
at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
at org.example.Job$.main(Job.scala:64)
at org.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


This happens with every Job. Right now I'm trying with the WordCount example from the documentation. I'm using SBT quickstart project with version 1.1-SNAPSHOT. I can actually access the "StreamRules.RULES" field directly so it is there, but if I try to access DATASTREAM_OPT_RULES the same error as the one reported is raised.

I tried both with Scala 2.11 and Scala 2.10.


Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Vasiliki Kalavri
Hi Simone,

I tried reproducing your problem with no luck.
I ran the WordCountTable example using sbt quickstart with Flink 1.1-SNAPSHOT and Scala 2.10 and it worked fine.
Can you maybe post the code you tried?

Thanks,
-Vasia.

On 4 May 2016 at 11:20, Simone Robutti <[hidden email]> wrote:
Hello,

while trying my first job using the Table API I got blocked by this error:

Exception in thread "main" java.lang.NoSuchFieldError: RULES
at org.apache.flink.api.table.plan.rules.FlinkRuleSets$.<init>(FlinkRuleSets.scala:148)
at org.apache.flink.api.table.plan.rules.FlinkRuleSets$.<clinit>(FlinkRuleSets.scala)
at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:212)
at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
at org.example.Job$.main(Job.scala:64)
at org.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


This happens with every Job. Right now I'm trying with the WordCount example from the documentation. I'm using SBT quickstart project with version 1.1-SNAPSHOT. I can actually access the "StreamRules.RULES" field directly so it is there, but if I try to access DATASTREAM_OPT_RULES the same error as the one reported is raised.

I tried both with Scala 2.11 and Scala 2.10.



Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Simone Robutti
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)


    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val expr = tEnv.fromDataSet(input)
    val result = expr
      .groupBy("word")
      .select("word , count.sum as count")
    tEnv.toDataSet[WC](result).print()

    env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)


Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Vasiliki Kalavri

Thanks Simone! I've managed to reproduce the error. I'll try to figure out what's wrong and I'll keep you updated.

-Vasia.

On May 4, 2016 3:25 PM, "Simone Robutti" <[hidden email]> wrote:
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)


    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val expr = tEnv.fromDataSet(input)
    val result = expr
      .groupBy("word")
      .select("word , count.sum as count")
    tEnv.toDataSet[WC](result).print()

    env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)


Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Vasiliki Kalavri
Hi Simone,

Fabian has pushed a fix for the streaming TableSources that removed the Calcite Stream rules [1].
The reported error does not appear anymore with the current master. Could you please also give it a try and verify that it works for you?

Thanks,
-Vasia.


On 4 May 2016 at 17:33, Vasiliki Kalavri <[hidden email]> wrote:

Thanks Simone! I've managed to reproduce the error. I'll try to figure out what's wrong and I'll keep you updated.

-Vasia.

On May 4, 2016 3:25 PM, "Simone Robutti" <[hidden email]> wrote:
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)


    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val expr = tEnv.fromDataSet(input)
    val result = expr
      .groupBy("word")
      .select("word , count.sum as count")
    tEnv.toDataSet[WC](result).print()

    env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)



Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Simone Robutti
Ok, I tested it and it works on the same example. :)

2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri <[hidden email]>:
Hi Simone,

Fabian has pushed a fix for the streaming TableSources that removed the Calcite Stream rules [1].
The reported error does not appear anymore with the current master. Could you please also give it a try and verify that it works for you?

Thanks,
-Vasia.


On 4 May 2016 at 17:33, Vasiliki Kalavri <[hidden email]> wrote:

Thanks Simone! I've managed to reproduce the error. I'll try to figure out what's wrong and I'll keep you updated.

-Vasia.

On May 4, 2016 3:25 PM, "Simone Robutti" <[hidden email]> wrote:
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)


    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val expr = tEnv.fromDataSet(input)
    val result = expr
      .groupBy("word")
      .select("word , count.sum as count")
    tEnv.toDataSet[WC](result).print()

    env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)




Reply | Threaded
Open this post in threaded view
|

Re: Bug while using Table API

Vasiliki Kalavri
Good to know :)

On 12 May 2016 at 11:16, Simone Robutti <[hidden email]> wrote:
Ok, I tested it and it works on the same example. :)

2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri <[hidden email]>:
Hi Simone,

Fabian has pushed a fix for the streaming TableSources that removed the Calcite Stream rules [1].
The reported error does not appear anymore with the current master. Could you please also give it a try and verify that it works for you?

Thanks,
-Vasia.


On 4 May 2016 at 17:33, Vasiliki Kalavri <[hidden email]> wrote:

Thanks Simone! I've managed to reproduce the error. I'll try to figure out what's wrong and I'll keep you updated.

-Vasia.

On May 4, 2016 3:25 PM, "Simone Robutti" <[hidden email]> wrote:
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)


    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val expr = tEnv.fromDataSet(input)
    val result = expr
      .groupBy("word")
      .select("word , count.sum as count")
    tEnv.toDataSet[WC](result).print()

    env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)