Compilation Error in WindowStream.fold()

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Compilation Error in WindowStream.fold()

nsengupta
For reasons I cannot grasp, I am unable to move ahead.

Here's the code:
---------------------------------------------------------------------------------------------------------------------------------------------


import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport, RawMITSIMTuple, VehicleID}

case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, eWaySegment: Int)

case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: Int)

case class PositionReport(
                              // tupletype: Int,
                              timeOfReport: Int,
                              eWayCoordinates: EWayCoordinates,
                              vehicleDetails: VehicleDetails
                       )


// ....


val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
  envDefault
    .setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
  val readings = IndexedSeq [RawMITSIMTuple] (
    RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,112,28,1,0,0, 1,  5757,-1,-1,-1,-1,-1,-1)
  )

val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]] {
      override
        def fold(
              t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
        ): Map[EWayCoordinates, Set[VehicleID]] = {
        t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty) + (o.vehicleDetails.vehicleID)))
      }
    }

    val windower = new AllWindowFunction[Map[EWayCoordinates, Set[VehicleID]],(EWayCoordinates,Int),Window] {
      override
      def apply(
           w: Window,
           bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
           collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {

        val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e => e.size)

        allVehiclesInLast30Mins.foreach(e => println(e))

        collector.collect((EWayCoordinates(-1,-1,-1,-1),0))

      }
    }

    val uniqueVehicles = envDefault
      .fromCollection(readings)
      .map(e => MITSIMUtils.preparePositionReport(e))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .keyBy(e => (
        e.eWayCoordinates.eWayID,
        e.eWayCoordinates.eWayDir,
        e.eWayCoordinates.eWaySegment,
        e.vehicleDetails.vehicleID))
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),
       
           // FoldFunction
           folder,
       
           // WindowFunction
           windower,
       
           // Satisfying the compiler
           new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
           new TupleTypeInfo[(EWayCoordinates,Int)]
      )

-----------------------------------------------------------------------------------------

The compiler is unhappy:

[ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136: error: missing argument list for method fold in class AllWindowedStream
[ERROR] Unapplied methods are only converted to functions when a function type is expected.
[ERROR] You can make this conversion explicit by writing `fold _` or `fold(_)(_)(_)` instead of `fold`.
[ERROR]       .fold(
[ERROR]            ^
[ERROR] one error found

----------------------------------------------------------------------------------------

I understand why is the compiler unhappy, but I am unsure if I have to go through all the devilry. In no Flink example, I see some such thing being prescribed. But, then, perhaps I am missing an important point.

I have been through this comment by Yassine Marzougui, before I added those type hints. But, I am using Flink 1.2.0.

I know this sounds silly, but I am simply failing to get out of this.

All help appreciated.

-- Nirmalya

Reply | Threaded
Open this post in threaded view
|

Re: Compilation Error in WindowStream.fold()

Aljoscha Krettek
Hi Nirmalya,
what does the compiler say if you use the variant without explicit TypeInfo? Like this:

 .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower,
  )

Best,
Aljoscha

On Thu, 23 Feb 2017 at 14:41 nsengupta <[hidden email]> wrote:
For reasons I cannot grasp, I am unable to move ahead.

Here's the code:
---------------------------------------------------------------------------------------------------------------------------------------------


import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport,
RawMITSIMTuple, VehicleID}

case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
eWaySegment: Int)

case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
Int)

case class PositionReport(
                              // tupletype: Int,
                              timeOfReport: Int,
                              eWayCoordinates: EWayCoordinates,
                              vehicleDetails: VehicleDetails
                       )


// ....


val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
  envDefault
    .setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val readings = IndexedSeq [RawMITSIMTuple] (
    RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
    RawMITSIMTuple(0,2,112,28,1,0,0, 1,  5757,-1,-1,-1,-1,-1,-1)
  )

val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]]
{
      override
        def fold(
              t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
        ): Map[EWayCoordinates, Set[VehicleID]] = {
        t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty)
+ (o.vehicleDetails.vehicleID)))
      }
    }

    val windower = new AllWindowFunction[Map[EWayCoordinates,
Set[VehicleID]],(EWayCoordinates,Int),Window] {
      override
      def apply(
           w: Window,
           bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
           collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {

        val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e =>
e.size)

        allVehiclesInLast30Mins.foreach(e => println(e))

        collector.collect((EWayCoordinates(-1,-1,-1,-1),0))

      }
    }

    val uniqueVehicles = envDefault
      .fromCollection(readings)
      .map(e => MITSIMUtils.preparePositionReport(e))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .keyBy(e => (
        e.eWayCoordinates.eWayID,
        e.eWayCoordinates.eWayDir,
        e.eWayCoordinates.eWaySegment,
        e.vehicleDetails.vehicleID))
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower,

           // Satisfying the compiler
           new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
           new TupleTypeInfo[(EWayCoordinates,Int)]
      )

-----------------------------------------------------------------------------------------

The compiler is unhappy:

[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136:
error: missing argument list for method fold in class AllWindowedStream
[ERROR] Unapplied methods are only converted to functions when a function
type is expected.
[ERROR] You can make this conversion explicit by writing `fold _` or
`fold(_)(_)(_)` instead of `fold`.
[ERROR]       .fold(
[ERROR]            ^
[ERROR] one error found

----------------------------------------------------------------------------------------

I understand why is the compiler unhappy, but I am unsure if I have to go
through all the *devilry*. In no Flink example, I see some such thing being
prescribed. But, then, perhaps I am missing an important point.

I have been through this  comment
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Incremental-aggregations-Example-not-working-td10581.html#a10585>
by *Yassine Marzougui*, before I added those type hints. But, I am using
*Flink 1.2.0*.

I know this sounds silly, but I am simply failing to get out of this.

All help appreciated.

-- Nirmalya





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Compilation Error in WindowStream.fold()

nsengupta
Hello Aljoscha,

Many thanks for taking this up.

This is the modified code:
----------------------------------------------------------------------------------
val uniqueVehicles = envDefault
      .fromCollection(readings)
      .map(e => MITSIMUtils.preparePositionReport(e))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .keyBy(e => (
        e.eWayCoordinates.eWayID,
        e.eWayCoordinates.eWayDir,
        e.eWayCoordinates.eWaySegment,
        e.vehicleDetails.vehicleID))
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower

           // I have taken the TupleTypeInfo out, to see what the compiler says!
           // Satisfying the compiler:

           /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
           new TupleTypeInfo[(EWayCoordinates,Int)]*/
      )
----------------------------------------------------------------------------------

And, this is what the compiler says:
----------------------------------------------------------------------------------
[INFO] Compiling 3 source files to /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at 1487991829901
[ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137: error: overloaded method value fold with alternatives:
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: (ACC, org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction: (org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC], windowFunction: org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]], org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]], org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates, Int),org.apache.flink.streaming.api.windowing.windows.Window])
[ERROR]       .fold(
[ERROR]        ^
[ERROR] one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------

----------------------------------------------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: Compilation Error in WindowStream.fold()

Aljoscha Krettek
It seems the type of your initial accumulator, which is Map[EWayCoordinates,Set[VehicleID]], does not match the accumulator type on your FoldFunction, which is Map[EWayCoordinates,Set[Int]]. Could you change that?

On Sat, 25 Feb 2017 at 04:09 nsengupta <[hidden email]> wrote:
Hello Aljoscha,

Many thanks for taking this up.

This is the modified code:
----------------------------------------------------------------------------------
val uniqueVehicles = envDefault
      .fromCollection(readings)
      .map(e => MITSIMUtils.preparePositionReport(e))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .keyBy(e => (
        e.eWayCoordinates.eWayID,
        e.eWayCoordinates.eWayDir,
        e.eWayCoordinates.eWaySegment,
        e.vehicleDetails.vehicleID))
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower

           // I have taken the TupleTypeInfo out, to see what the compiler
says!
           // Satisfying the compiler:

           /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
           new TupleTypeInfo[(EWayCoordinates,Int)]*/
      )
----------------------------------------------------------------------------------

And, this is what the compiler says:
----------------------------------------------------------------------------------
[INFO] Compiling 3 source files to
/home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at
1487991829901
[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137:
error: overloaded method value fold with alternatives:
[ERROR]   [ACC, R](initialValue: ACC, preAggregator: (ACC,
org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction:
(org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC],
org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit
evidence$8:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
<and>
[ERROR]   [ACC, R](initialValue: ACC, preAggregator:
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC],
windowFunction:
org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit
evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC],
implicit evidence$6:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR]  cannot be applied to
(scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]],
org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates,
Int),org.apache.flink.streaming.api.windowing.windows.Window])
[ERROR]       .fold(
[ERROR]        ^
[ERROR] one error found
[INFO]
------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO]
------------------------------------------------------------------------

----------------------------------------------------------------------------------



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.