Re: Compilation Error in WindowStream.fold()

Posted by nsengupta on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html

Hello Aljoscha,

Many thanks for taking this up.

This is the modified code:
----------------------------------------------------------------------------------
val uniqueVehicles = envDefault
      .fromCollection(readings)
      .map(e => MITSIMUtils.preparePositionReport(e))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .keyBy(e => (
        e.eWayCoordinates.eWayID,
        e.eWayCoordinates.eWayDir,
        e.eWayCoordinates.eWaySegment,
        e.vehicleDetails.vehicleID))
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower

           // I have taken the TupleTypeInfo out, to see what the compiler says!
           // Satisfying the compiler:

           /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
           new TupleTypeInfo[(EWayCoordinates,Int)]*/
      )
----------------------------------------------------------------------------------

And, this is what the compiler says:
----------------------------------------------------------------------------------
[INFO] Compiling 3 source files to /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at 1487991829901
[ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137: error: overloaded method value fold with alternatives:
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: (ACC, org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction: (org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC], windowFunction: org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]], org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]], org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates, Int),org.apache.flink.streaming.api.windowing.windows.Window])
[ERROR]       .fold(
[ERROR]        ^
[ERROR] one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------

----------------------------------------------------------------------------------