Parallelism issue

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Parallelism issue

Sung Gon Yi
Hello.

I wrote below codes.

It works extraordinarily. 
Processing performs after SourceFunction generates all data and quit.
If SourceFunction works infinitely, processing is never performed.

But, it works well when parallelismForTimestamp is given other value (eg. 3), 

I want to know the mechanism of processing of below codes.

Sincerely,
Sung Gon

—————
package org.skon.flink

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector

object ParallelismWithGlobalWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(50L)
env.setParallelism(1)

val maxParallelism = 4
val parallelismForTimestamp = 4

val stream = env
.addSource(new SourceFunction[(Long, Long, Long)] {
override def run(ctx: SourceFunction.SourceContext[(Long, Long, Long)]): Unit = {
(0 to 250000).foreach(count => ctx.collect((count, 1L, 2L)))
}
override def cancel(): Unit = {}
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
override def extractTimestamp(element: (Long, Long, Long)): Long = element._1
})

stream
.keyBy(_._2)
.window(GlobalWindows.create)
.evictor(TimeEvictor.of(Time.seconds(20L)))
.trigger(CountTrigger.of(1L))
.apply[(Long, Long, Long)]((_: Long, _: GlobalWindow, elements: Iterable[(Long, Long, Long)], out: Collector[(Long, Long, Long)]) => out.collect(elements.last))
.setParallelism(maxParallelism)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
override def extractTimestamp(element: (Long, Long, Long)): Long = element._1
})
.setParallelism(parallelismForTimestamp)
.keyBy(_._3)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.reduce((_, v2) => v2)
.setParallelism(maxParallelism)
.process[(Long, Long, Long)]((value, _, out) => {
Console.println(value)
out.collect(value)
})
.addSink(new CollectionSink[(Long, Long, Long)])

env.execute("Parallelism Test with Global Window")
}
  class CollectionSink[T] extends SinkFunction[T] {
    private val values: List[String] = List[String]()

override def invoke(value: T): Unit = values + value.toString
}
}
—————