DataStream EventTime last data cannot be output ?
In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
). But the latest record can not be processed in time, or can not be processed
). How can I deal with this problem?
The following is the Flink program ,Flink 1.7.2
---------------------------------------------------------------------------
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
import java.util.{Date, Properties}
import com.alibaba.fastjson.JSON
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
object SockWordCountRun {
def main(args: Array[String]): Unit = {
// get the execution environment
// val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val configuration : Configuration = ConfigurationUtil.getConfiguration(true)
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
import org.apache.flink.streaming.api.scala._
val dataStream = env.socketTextStream("localhost", 1234, '\n')
// .setParallelism(3)
dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
val maxOutOfOrderness = 2 * 1000L // 3.5 seconds
var currentMaxTimestamp: Long = _
var currentTimestamp: Long = _
override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val jsonObject = JSON.parseObject(element)
val timestamp = jsonObject.getLongValue("extract_data_time")
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
currentTimestamp = timestamp
/* println("===========watermark begin===========")
println()
println(new Date(currentMaxTimestamp - 20 * 1000))
println(jsonObject)
println("===========watermark end===========")
println()*/
timestamp
}
})
.timeWindowAll(Time.seconds(3))
.process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
println()
println("开始提交window")
println(new Date())
for(e <- elements) out.collect(e)
println("结束提交window")
println(new Date())
println()
}
})
.print()
//.setParallelism(3)
println("==================================以下为执行计划==================================")
println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
//执行计划
println(env.getStreamGraph.getStreamingPlanAsJSON)
println("==================================以上为执行计划 JSON串==================================\n")
env.execute("Socket 水印作业")
println("结束")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " : " + count
}
def getConfiguration(isDebug:Boolean = false):Configuration = {
val configuration : Configuration = new Configuration()
if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}
configuration
}
}
best thinktothings