Clarification: use of AllWindowedStream.apply() function

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Clarification: use of AllWindowedStream.apply() function

nsengupta
I am trying to understand if the AllWindowedStream.apply() function can be used for creating a DataStream of new types.

Here is a portion of the code:
------------------------------------------------------------------------------------------------------------------------

case class RawMITSIMTuple(
                             tupletype: Int,      timeOfReport: Int, vehicleID: Int,   vehicleSpeed: Int,
                             expressWayID: Int,   vehicleLane: Int,  vehicleDir: Int,
                             vehicleSegment: Int, vehiclePos: Int,   queyID: Int,
                             segmentInit: Int,    segmentEnd: Int ,
                             dayOfWeek: Int,      timeOfDay: Int,    dayID: Int
                     )

  case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, eWaySegment: Int)

  case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: Int)

  case class PositionReport(
                              tupletype: Int, timeOfReport: Int,
                              eWayCoordinates: EWayCoordinates,
                              vehicleDetails: VehicleDetails
                       )

val envDefault = StreamExecutionEnvironment.getExecutionEnvironment
envDefault.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ...

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))

------------------------------------------------------------------------------------------------------------------------

positionReportStream above is of type AllWindowedStream. As such, I cannot use it as a DataStream[PositionReport]: I cannot segregate it by some kind of KeySelection and use it further down.

I have been thinking of using a FoldFunction on it, but that gives a collection of PositionReport. So, I get a DataStream[Vector[PositionReport]] (Vector is just an example).

The other alternative is to use an AllWindowedStream.apply() function, where I can emit a DataStream[PositionReport]. But, that will mean that I am using the apply function more as a *mapper*. Is that the right way to use it?

Could someone please push me to the correct way to deal with it?

-- Nirmalya
 
Reply | Threaded
Open this post in threaded view
|

Re: Clarification: use of AllWindowedStream.apply() function

nsengupta
I have gone through this post, where Aljoscha explains that mapping on WindowedStream is not allowed.

So, I think I haven't asked the question properly. Here is (hopefully) a better and easier version:

1.    I begin with records of type RawMITSIMTuple.
2.    When I group them using a Window, I get an AllWindowedStream[RawMITSIMTuple].
3.    I fold the tuples obtained in the Window, which gives me a DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the output of previous step, where I first get hold of each of the RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya
Reply | Threaded
Open this post in threaded view
|

Re: Clarification: use of AllWindowedStream.apply() function

Aljoscha Krettek
Hi,
you would indeed use apply(), or better fold(<initial_value>, <fold_function>, <window_function>) to map the result of folding your window to some other data type. If you will, a WindowFunction allows "mapping" the result of your windowing to a different type.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]> wrote:
I have gone through this  post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.    I begin with records of type RawMITSIMTuple.
2.    When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.    I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Clarification: use of AllWindowedStream.apply() function

nsengupta
Thanks, Aljoscha for the clarification.

I understand that instead of using a flatMap() in the way I am using, I am better off using :
* a fold (init, fold_func, window_func) first and then
* map to a different type of my choice, inside the window_func, parameterised above

I hope I am correct. If so, you don't need to spend time to comment; ☺otherwise, please give a hint.

-- Nirmalya

-

On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi,
you would indeed use apply(), or better fold(<initial_value>, <fold_function>, <window_function>) to map the result of folding your window to some other data type. If you will, a WindowFunction allows "mapping" the result of your windowing to a different type.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]> wrote:
I have gone through this  post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.    I begin with records of type RawMITSIMTuple.
2.    When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.    I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



To unsubscribe from Clarification: use of AllWindowedStream.apply() function, click here.
NAML



--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Clarification: use of AllWindowedStream.apply() function

Aljoscha Krettek
Yes, you're correct. :-)

On Thu, 16 Feb 2017 at 14:24 nsengupta <[hidden email]> wrote:
Thanks, Aljoscha for the clarification.

I understand that instead of using a flatMap() in the way I am using, I am better off using :
* a fold (init, fold_func, window_func) first and then
* map to a different type of my choice, inside the window_func, parameterised above

I hope I am correct. If so, you don't need to spend time to comment; ☺otherwise, please give a hint.

-- Nirmalya

-

On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi,
you would indeed use apply(), or better fold(<initial_value>, <fold_function>, <window_function>) to map the result of folding your window to some other data type. If you will, a WindowFunction allows "mapping" the result of your windowing to a different type.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]> wrote:
I have gone through this  post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.    I begin with records of type RawMITSIMTuple.
2.    When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.    I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



To unsubscribe from Clarification: use of AllWindowedStream.apply() function, click here.
NAML



--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."


View this message in context: Re: Clarification: use of AllWindowedStream.apply() function
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.