DataStream EventTime last data cannot be output?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStream EventTime last data cannot be output?

刘 文
DataStream EventTime last data cannot be output ?


In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
). But the latest record can not be processed in time, or can not be processed
). How can I deal with this problem?



The following is the Flink program ,Flink 1.7.2
---------------------------------------------------------------------------



package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime

import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector


object SockWordCountRun {



def main(args: Array[String]): Unit = {


// get the execution environment
// val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = ConfigurationUtil.getConfiguration(true)

val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



import org.apache.flink.streaming.api.scala._
val dataStream = env.socketTextStream("localhost", 1234, '\n')

// .setParallelism(3)


dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

val maxOutOfOrderness = 2 * 1000L // 3.5 seconds
var currentMaxTimestamp: Long = _
var currentTimestamp: Long = _

override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)

override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val jsonObject = JSON.parseObject(element)

val timestamp = jsonObject.getLongValue("extract_data_time")
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
currentTimestamp = timestamp

/* println("===========watermark begin===========")
println()
println(new Date(currentMaxTimestamp - 20 * 1000))
println(jsonObject)
println("===========watermark end===========")
println()*/
timestamp
}

})
.timeWindowAll(Time.seconds(3))

.process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {


println()
println("开始提交window")
println(new Date())
for(e <- elements) out.collect(e)
println("结束提交window")
println(new Date())
println()
}
})

.print()
//.setParallelism(3)





println("==================================以下为执行计划==================================")
println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
//执行计划
println(env.getStreamGraph.getStreamingPlanAsJSON)
println("==================================以上为执行计划 JSON串==================================\n")


env.execute("Socket 水印作业")






println("结束")

}


// Data type for words with count
case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " : " + count
}


def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}


configuration
}


}





best   thinktothings

Reply | Threaded
Open this post in threaded view
|

Re: DataStream EventTime last data cannot be output?

Stephen Connolly
I had this issue myself.

Your timestamp assigner will only advance the window as it receives data, thus when you reach the end of the data there will be data which is newer than the last window.

One solution is to have the source flag that there will be no more data. If you can do this then that is the best solution.

Another solution is to mix event time and wall clock time in deciding the window, thus the window will eventually move past and output the data. Note that if you use this approach and you are reprocessing the data, because the wall clock will be different, your data may be grouped differently and you could see different results depending on what kind of computation you are using.

The next gotcha that I hit was parallelism, if you are assigning timestamps in a parallel task (say after a keyBy) then each of the parallel tasks will have their own window assigner. If your data is poorly distributed for your key function then you might end up with one of those parallel timestamp assigners only getting one or zero data points and thus all data output is blocked forever!


HTH

On Wed, 6 Mar 2019 at 14:51, 刘 文 <[hidden email]> wrote:
DataStream EventTime last data cannot be output ?


In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
). But the latest record can not be processed in time, or can not be processed
). How can I deal with this problem?



The following is the Flink program ,Flink 1.7.2
---------------------------------------------------------------------------



package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime

import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector


object SockWordCountRun {



def main(args: Array[String]): Unit = {


// get the execution environment
// val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = ConfigurationUtil.getConfiguration(true)

val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



import org.apache.flink.streaming.api.scala._
val dataStream = env.socketTextStream("localhost", 1234, '\n')

// .setParallelism(3)


dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {

val maxOutOfOrderness = 2 * 1000L // 3.5 seconds
var currentMaxTimestamp: Long = _
var currentTimestamp: Long = _

override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)

override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val jsonObject = JSON.parseObject(element)

val timestamp = jsonObject.getLongValue("extract_data_time")
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
currentTimestamp = timestamp

/* println("===========watermark begin===========")
println()
println(new Date(currentMaxTimestamp - 20 * 1000))
println(jsonObject)
println("===========watermark end===========")
println()*/
timestamp
}

})
.timeWindowAll(Time.seconds(3))

.process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {


println()
println("开始提交window")
println(new Date())
for(e <- elements) out.collect(e)
println("结束提交window")
println(new Date())
println()
}
})

.print()
//.setParallelism(3)





println("==================================以下为执行计划==================================")
println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
//执行计划
println(env.getStreamGraph.getStreamingPlanAsJSON)
println("==================================以上为执行计划 JSON串==================================\n")


env.execute("Socket 水印作业")






println("结束")

}


// Data type for words with count
case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " : " + count
}


def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}


configuration
}


}





best   thinktothings