For reasons I cannot grasp, I am unable to move ahead.
Here's the code: --------------------------------------------------------------------------------------------------------------------------------------------- import org.apache.flink.api.common.functions.FoldFunction import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.util.Collector import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport, RawMITSIMTuple, VehicleID} case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, eWaySegment: Int) case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: Int) case class PositionReport( // tupletype: Int, timeOfReport: Int, eWayCoordinates: EWayCoordinates, vehicleDetails: VehicleDetails ) // .... val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4) envDefault .setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val readings = IndexedSeq [RawMITSIMTuple] ( RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1), RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1) ) val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]] { override def fold( t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport ): Map[EWayCoordinates, Set[VehicleID]] = { t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty) + (o.vehicleDetails.vehicleID))) } } val windower = new AllWindowFunction[Map[EWayCoordinates, Set[VehicleID]],(EWayCoordinates,Int),Window] { override def apply( w: Window, bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]], collector: Collector[(EWayCoordinates, VehicleID)]): Unit = { val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e => e.size) allVehiclesInLast30Mins.foreach(e => println(e)) collector.collect((EWayCoordinates(-1,-1,-1,-1),0)) } } val uniqueVehicles = envDefault .fromCollection(readings) .map(e => MITSIMUtils.preparePositionReport(e)) .assignAscendingTimestamps(e => e.timeOfReport) .keyBy(e => ( e.eWayCoordinates.eWayID, e.eWayCoordinates.eWayDir, e.eWayCoordinates.eWaySegment, e.vehicleDetails.vehicleID)) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) .fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower, // Satisfying the compiler new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), new TupleTypeInfo[(EWayCoordinates,Int)] ) ----------------------------------------------------------------------------------------- The compiler is unhappy: [ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136: error: missing argument list for method fold in class AllWindowedStream [ERROR] Unapplied methods are only converted to functions when a function type is expected. [ERROR] You can make this conversion explicit by writing `fold _` or `fold(_)(_)(_)` instead of `fold`. [ERROR] .fold( [ERROR] ^ [ERROR] one error found ---------------------------------------------------------------------------------------- I understand why is the compiler unhappy, but I am unsure if I have to go through all the devilry. In no Flink example, I see some such thing being prescribed. But, then, perhaps I am missing an important point. I have been through this comment by Yassine Marzougui, before I added those type hints. But, I am using Flink 1.2.0. I know this sounds silly, but I am simply failing to get out of this. All help appreciated. -- Nirmalya |
Hi Nirmalya,
what does the compiler say if you use the variant without explicit TypeInfo? Like this: .fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower, ) Best, Aljoscha On Thu, 23 Feb 2017 at 14:41 nsengupta <[hidden email]> wrote: For reasons I cannot grasp, I am unable to move ahead. |
Hello Aljoscha,
Many thanks for taking this up. This is the modified code: ---------------------------------------------------------------------------------- val uniqueVehicles = envDefault .fromCollection(readings) .map(e => MITSIMUtils.preparePositionReport(e)) .assignAscendingTimestamps(e => e.timeOfReport) .keyBy(e => ( e.eWayCoordinates.eWayID, e.eWayCoordinates.eWayDir, e.eWayCoordinates.eWaySegment, e.vehicleDetails.vehicleID)) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) .fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower // I have taken the TupleTypeInfo out, to see what the compiler says! // Satisfying the compiler: /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), new TupleTypeInfo[(EWayCoordinates,Int)]*/ ) ---------------------------------------------------------------------------------- And, this is what the compiler says: ---------------------------------------------------------------------------------- [INFO] Compiling 3 source files to /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at 1487991829901 [ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137: error: overloaded method value fold with alternatives: [ERROR] [ACC, R](initialValue: ACC, preAggregator: (ACC, org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction: (org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and> [ERROR] [ACC, R](initialValue: ACC, preAggregator: org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC], windowFunction: org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ERROR] cannot be applied to (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]], org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]], org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates, Int),org.apache.flink.streaming.api.windowing.windows.Window]) [ERROR] .fold( [ERROR] ^ [ERROR] one error found [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ ---------------------------------------------------------------------------------- |
It seems the type of your initial accumulator, which is Map[EWayCoordinates,Set[VehicleID]], does not match the accumulator type on your FoldFunction, which is Map[EWayCoordinates,Set[Int]]. Could you change that? On Sat, 25 Feb 2017 at 04:09 nsengupta <[hidden email]> wrote: Hello Aljoscha, |
Free forum by Nabble | Edit this page |