Hello Flinksters,
I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here: --------------------------------------------------------------------------------------------------------------- case class Reading(field1:String,field2:String,field3:Int) object MultiWindowing { def main(args: Array[String]) {} // WindowFunction<IN,OUT,KEY,W extends Window> class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] { // ..... } } val env = DataStreamTestEnvironment.createTestEnvironment(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input: EventTimeInput[Reading] = EventTimeInputBuilder .startWith(Reading("hans", "elephant", 15)) .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS)) .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS)) //acquire data source from input val stream = env.fromInput(input) //apply transformation val k = stream.keyBy(new KeySelector [Reading,String] { def getKey(r:Reading) = r.field2 }) .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES)) k.sum(3) .print() env.execute() } --------------------------------------------------------------------------------------------------------------- And at runtime, I get this error: ---------------------------------------------------------------------------------------------------------------- Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array). at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76) at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37) at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373) at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63) at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala) ... 6 more --------------------------------------------------------------------------------------------------------------- Can someone help me by pointing out the mistake I am making? -- Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Hi,
as far as I can see it the problem is in this line: k.sum(3) using field indices is only valid for Tuple Types. In your case you should be able to use this: k.sum(“field3”) because this is a field of your Reading type. Cheers, Aljoscha > On 26 Feb 2016, at 02:44, Nirmalya Sengupta <[hidden email]> wrote: > > Hello Flinksters, > > I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here: > > --------------------------------------------------------------------------------------------------------------- > > case class Reading(field1:String,field2:String,field3:Int) > > object MultiWindowing { > > def main(args: Array[String]) {} > > // WindowFunction<IN,OUT,KEY,W extends Window> > > class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] { > > // ..... > } > } > > val env = DataStreamTestEnvironment.createTestEnvironment(1) > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val input: EventTimeInput[Reading] = > EventTimeInputBuilder > .startWith(Reading("hans", "elephant", 15)) > .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS)) > .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS)) > > //acquire data source from input > val stream = env.fromInput(input) > > //apply transformation > val k = stream.keyBy(new KeySelector [Reading,String] { > def getKey(r:Reading) = r.field2 > }) > .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES)) > > k.sum(3) > .print() > > env.execute() > > } > > --------------------------------------------------------------------------------------------------------------- > > And at runtime, I get this error: > > ---------------------------------------------------------------------------------------------------------------- > > Exception in thread "main" java.lang.ExceptionInInitializerError > at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array). > at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76) > at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37) > at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373) > at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63) > at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala) > ... 6 more > > > --------------------------------------------------------------------------------------------------------------- > > Can someone help me by pointing out the mistake I am making? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is where they should be. > Now put the foundation under them." |
Free forum by Nabble | Edit this page |