Sung Gon Yi

I wrote below codes.

It works extraordinarily. 
Processing performs after SourceFunction generates all data and quit.
If SourceFunction works infinitely, processing is never performed.

But, it works well when parallelismForTimestamp is given other value (eg. 3), 

I want to know the mechanism of processing of below codes.

Sung Gon

package org.skon.flink

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.util.Collector

object ParallelismWithGlobalWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment


val maxParallelism = 4
val parallelismForTimestamp = 4

val stream = env
.addSource(new SourceFunction[(Long, Long, Long)] {
override def run(ctx: SourceFunction.SourceContext[(Long, Long, Long)]): Unit = {
(0 to 250000).foreach(count => ctx.collect((count, 1L, 2L)))
override def cancel(): Unit = {}
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
override def extractTimestamp(element: (Long, Long, Long)): Long = element._1

.apply[(Long, Long, Long)]((_: Long, _: GlobalWindow, elements: Iterable[(Long, Long, Long)], out: Collector[(Long, Long, Long)]) => out.collect(elements.last))
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Long, Long, Long)](Time.seconds(0L)) {
override def extractTimestamp(element: (Long, Long, Long)): Long = element._1
.reduce((_, v2) => v2)
.process[(Long, Long, Long)]((value, _, out) => {
.addSink(new CollectionSink[(Long, Long, Long)])

env.execute("Parallelism Test with Global Window")
  class CollectionSink[T] extends SinkFunction[T] {
    private val values: List[String] = List[String]()

override def invoke(value: T): Unit = values + value.toString