Posted by
nsengupta on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html
For reasons I cannot grasp, I am unable to move ahead.
Here's the code:
---------------------------------------------------------------------------------------------------------------------------------------------
import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport, RawMITSIMTuple, VehicleID}
case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, eWaySegment: Int)
case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: Int)
case class PositionReport(
// tupletype: Int,
timeOfReport: Int,
eWayCoordinates: EWayCoordinates,
vehicleDetails: VehicleDetails
)
// ....
val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
envDefault
.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings = IndexedSeq [RawMITSIMTuple] (
RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1)
)
val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]] {
override
def fold(
t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
): Map[EWayCoordinates, Set[VehicleID]] = {
t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty) + (o.vehicleDetails.vehicleID)))
}
}
val windower = new AllWindowFunction[Map[EWayCoordinates, Set[VehicleID]],(EWayCoordinates,Int),Window] {
override
def apply(
w: Window,
bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {
val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e => e.size)
allVehiclesInLast30Mins.foreach(e => println(e))
collector.collect((EWayCoordinates(-1,-1,-1,-1),0))
}
}
val uniqueVehicles = envDefault
.fromCollection(readings)
.map(e => MITSIMUtils.preparePositionReport(e))
.assignAscendingTimestamps(e => e.timeOfReport)
.keyBy(e => (
e.eWayCoordinates.eWayID,
e.eWayCoordinates.eWayDir,
e.eWayCoordinates.eWaySegment,
e.vehicleDetails.vehicleID))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
.fold(
// Seed
Map[EWayCoordinates,Set[VehicleID]](),
// FoldFunction
folder,
// WindowFunction
windower,
// Satisfying the compiler
new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
new TupleTypeInfo[(EWayCoordinates,Int)]
)
-----------------------------------------------------------------------------------------
The compiler is unhappy:
[ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136: error: missing argument list for method fold in class AllWindowedStream
[ERROR] Unapplied methods are only converted to functions when a function type is expected.
[ERROR] You can make this conversion explicit by writing `fold _` or `fold(_)(_)(_)` instead of `fold`.
[ERROR] .fold(
[ERROR] ^
[ERROR] one error found
----------------------------------------------------------------------------------------
I understand why is the compiler unhappy, but I am unsure if I have to go through all the
devilry. In no Flink example, I see some such thing being prescribed. But, then, perhaps I am missing an important point.
I have been through this
comment by
Yassine Marzougui, before I added those type hints. But, I am using
Flink 1.2.0.
I know this sounds silly, but I am simply failing to get out of this.
All help appreciated.
-- Nirmalya