TextFile source && KeyedWindow triggers --> Unexpected execution order

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

TextFile source && KeyedWindow triggers --> Unexpected execution order

ANON Marta

Hello!

 

I have a datastream like this:

env.readTextFile(“events.log”)

.map(event => StopFactory(event)) // I have defined a Stop class and this creates an instance from the file line

.assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp from a field from each instance

.keyBy("mediaResource.contentId")

.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))

.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))

.reduce( (eventA: Stop, _: Stop) => {

      eventA.addVisualization()

      println("********** REDUCING **********")

      eventA

    }),

         (key: Tuple,

         window: TimeWindow,

         input: Iterable[Stop],

         out: Collector[Stop] ) =>

        {

          val stop = input.iterator.next()

          println("------------- PROCESSING ----------------")

          if (stop.changed) {

            stop.changed = false

            out.collect(stop)

          }

          .print()

        })

 

If I execute this, I see that first of all are executed every reduce and at the end starts to execute the processingFunction part

 

This is a problem for me because I want to add a sinkfunction for outputting results each 5 minutes from the whole daily aggregation and the goal is having either kafka source (I didn’t have any problem with kafka) or a bunch of large files.

 

If I change the countinousTimeEventTrigger with a CountTrigger, I see that it does the sink and process each “x” events but at the end of the file and after every reduce is executed.

 

How could I manage this daily aggregation with 5 minutes trigger of the partial aggregation having a file as source?

 

Thank you!!

 

Marta

Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER EL MEDIO AMBIENTE ESTÁ TAMBIÉN EN SU MANO.

**** DISCLAIMER****

Este e-mail contiene información confidencial, el contenido de la misma se encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene prohibida su reproducción, uso, divulgación o impresión total o parcial. Si ha recibido este mensaje por error, notifíquelo de inmediato al remitente borrando el mensaje original juntamente con sus ficheros anexos. Gracias

This e-mail contains confidencial information protected by Law. It is prohibited to reproduce, use, disclose and totally or partially print the content of this e-mail to any person other than intended recipient. If you have received this message by mistake, please notify immediately the sender and delete the original message jointly with all attached files. Thank you.
Reply | Threaded
Open this post in threaded view
|

Re: TextFile source && KeyedWindow triggers --> Unexpected execution order

Dawid Wysakowicz-2

Hi Marta,

Do you mean you want to emit results every 5 minutes based on the wall time (processing time)? If so you should use the ContinuousProcessingTimeTrigger instead of ContinuousEventTimeTrigger which will emit results based on the event time.

Does that solve your problem?

Best,

Dawid

On 02/12/2020 20:54, ANON Marta wrote:

Hello!

 

I have a datastream like this:

env.readTextFile(“events.log”)

.map(event => StopFactory(event)) // I have defined a Stop class and this creates an instance from the file line

.assignTimestampsAndWatermarks(stopEventTimeExtractor) // extract the timestamp from a field from each instance

.keyBy("mediaResource.contentId")

.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.DAYS)))

.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.MINUTES)))

.reduce( (eventA: Stop, _: Stop) => {

      eventA.addVisualization()

      println("********** REDUCING **********")

      eventA

    }),

         (key: Tuple,

         window: TimeWindow,

         input: Iterable[Stop],

         out: Collector[Stop] ) =>

        {

          val stop = input.iterator.next()

          println("------------- PROCESSING ----------------")

          if (stop.changed) {

            stop.changed = false

            out.collect(stop)

          }

          .print()

        })

 

If I execute this, I see that first of all are executed every reduce and at the end starts to execute the processingFunction part

 

This is a problem for me because I want to add a sinkfunction for outputting results each 5 minutes from the whole daily aggregation and the goal is having either kafka source (I didn’t have any problem with kafka) or a bunch of large files.

 

If I change the countinousTimeEventTrigger with a CountTrigger, I see that it does the sink and process each “x” events but at the end of the file and after every reduce is executed.

 

How could I manage this daily aggregation with 5 minutes trigger of the partial aggregation having a file as source?

 

Thank you!!

 

Marta

Antes de imprimir este mensaje, por favor, compruebe que es necesario. PROTEGER EL MEDIO AMBIENTE ESTÁ TAMBIÉN EN SU MANO.

**** DISCLAIMER****

Este e-mail contiene información confidencial, el contenido de la misma se encuentra protegido por Ley. Cualquier persona distinta a su destinataria tiene prohibida su reproducción, uso, divulgación o impresión total o parcial. Si ha recibido este mensaje por error, notifíquelo de inmediato al remitente borrando el mensaje original juntamente con sus ficheros anexos. Gracias

This e-mail contains confidencial information protected by Law. It is prohibited to reproduce, use, disclose and totally or partially print the content of this e-mail to any person other than intended recipient. If you have received this message by mistake, please notify immediately the sender and delete the original message jointly with all attached files. Thank you.

signature.asc (849 bytes) Download Attachment