I am trying to understand if the AllWindowedStream.apply() function can be used for creating a DataStream of new types.
Here is a portion of the code: ------------------------------------------------------------------------------------------------------------------------ case class RawMITSIMTuple( tupletype: Int, timeOfReport: Int, vehicleID: Int, vehicleSpeed: Int, expressWayID: Int, vehicleLane: Int, vehicleDir: Int, vehicleSegment: Int, vehiclePos: Int, queyID: Int, segmentInit: Int, segmentEnd: Int , dayOfWeek: Int, timeOfDay: Int, dayID: Int ) case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, eWaySegment: Int) case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: Int) case class PositionReport( tupletype: Int, timeOfReport: Int, eWayCoordinates: EWayCoordinates, vehicleDetails: VehicleDetails ) val envDefault = StreamExecutionEnvironment.getExecutionEnvironment envDefault.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // ... val positionReportStream = this .readRawMITSIMTuplesInjected(envDefault,args(0)) .assignAscendingTimestamps(e => e.timeOfReport) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) ------------------------------------------------------------------------------------------------------------------------ positionReportStream above is of type AllWindowedStream. As such, I cannot use it as a DataStream[PositionReport]: I cannot segregate it by some kind of KeySelection and use it further down. I have been thinking of using a FoldFunction on it, but that gives a collection of PositionReport. So, I get a DataStream[Vector[PositionReport]] (Vector is just an example). The other alternative is to use an AllWindowedStream.apply() function, where I can emit a DataStream[PositionReport]. But, that will mean that I am using the apply function more as a *mapper*. Is that the right way to use it? Could someone please push me to the correct way to deal with it? -- Nirmalya |
I have gone through this post, where Aljoscha explains that mapping on WindowedStream is not allowed.
So, I think I haven't asked the question properly. Here is (hopefully) a better and easier version: 1. I begin with records of type RawMITSIMTuple. 2. When I group them using a Window, I get an AllWindowedStream[RawMITSIMTuple]. 3. I fold the tuples obtained in the Window, which gives me a DataStream[Vector[RawMITSIMTuple]. 4. What I need is a DataStream[PositionReport]. So, I need to flatMap the output of previous step, where I first get hold of each of the RawMITSIMTuple and map that to PositionReport. val positionReportStream = this .readRawMITSIMTuplesInjected(envDefault,args(0)) .assignAscendingTimestamps(e => e.timeOfReport) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => { collectorBin :+ rawRecord) }) .flatMap(r => r.map(e => this.preparePositionReport(e))) This gives me what I want, but I feel this is verbose and inefficient. Am I thinking correctly? If so, what is a better idiom to use in such cases? -- Nirmalya |
Hi, you would indeed use apply(), or better fold(<initial_value>, <fold_function>, <window_function>) to map the result of folding your window to some other data type. If you will, a WindowFunction allows "mapping" the result of your windowing to a different type. Best, Aljoscha On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]> wrote: I have gone through this post |
Thanks, Aljoscha for the clarification. I understand that instead of using a flatMap() in the way I am using, I am better off using : * a fold (init, fold_func, window_func) first and then * map to a different type of my choice, inside the window_func, parameterised above I hope I am correct. If so, you don't need to spend time to comment; ☺otherwise, please give a hint. -- Nirmalya - On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
Yes, you're correct. :-) On Thu, 16 Feb 2017 at 14:24 nsengupta <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |