Need some help to understand the cause of the error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Need some help to understand the cause of the error

nsengupta
Hello Flinksters,

I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here:

---------------------------------------------------------------------------------------------------------------

case class Reading(field1:String,field2:String,field3:Int)

object MultiWindowing {

  def main(args: Array[String]) {}

  //  WindowFunction<IN,OUT,KEY,W extends Window>

  class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] {

      //  .....
    }
  }

  val env = DataStreamTestEnvironment.createTestEnvironment(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: EventTimeInput[Reading]  =
    EventTimeInputBuilder
    .startWith(Reading("hans", "elephant", 15))
    .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
    .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))

  //acquire data source from input
  val stream = env.fromInput(input)

  //apply transformation
  val k = stream.keyBy(new KeySelector [Reading,String] {
    def getKey(r:Reading) =  r.field2
  })
    .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))

    k.sum(3)
    .print()

  env.execute()

}

---------------------------------------------------------------------------------------------------------------

And at runtime, I get this error:

----------------------------------------------------------------------------------------------------------------

Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array).
at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
... 6 more


---------------------------------------------------------------------------------------------------------------

Can someone help me by pointing out the mistake I am making?

-- Nirmalya

--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Need some help to understand the cause of the error

Aljoscha Krettek
Hi,
as far as I can see it the problem is in this line:
k.sum(3)

using field indices is only valid for Tuple Types. In your case you should be able to use this:
k.sum(“field3”)

because this is a field of your Reading type.

Cheers,
Aljoscha

> On 26 Feb 2016, at 02:44, Nirmalya Sengupta <[hidden email]> wrote:
>
> Hello Flinksters,
>
> I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here:
>
> ---------------------------------------------------------------------------------------------------------------
>
> case class Reading(field1:String,field2:String,field3:Int)
>
> object MultiWindowing {
>
>   def main(args: Array[String]) {}
>
>   //  WindowFunction<IN,OUT,KEY,W extends Window>
>
>   class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] {
>
>       //  .....
>     }
>   }
>
>   val env = DataStreamTestEnvironment.createTestEnvironment(1)
>
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>   val input: EventTimeInput[Reading]  =
>     EventTimeInputBuilder
>     .startWith(Reading("hans", "elephant", 15))
>     .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
>     .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))
>
>   //acquire data source from input
>   val stream = env.fromInput(input)
>
>   //apply transformation
>   val k = stream.keyBy(new KeySelector [Reading,String] {
>     def getKey(r:Reading) =  r.field2
>   })
>     .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
>
>     k.sum(3)
>     .print()
>
>   env.execute()
>
> }
>
> ---------------------------------------------------------------------------------------------------------------
>
> And at runtime, I get this error:
>
> ----------------------------------------------------------------------------------------------------------------
>
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array).
> at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
> at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
> at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
> ... 6 more
>
>
> ---------------------------------------------------------------------------------------------------------------
>
> Can someone help me by pointing out the mistake I am making?
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they should be.
> Now put the foundation under them."