Hi,
I have a flink streaming application and I want to count records received per second (as a way of measuring the throughput of my application). However, I am using the EventTime time characteristic, as follows:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val s = env.socketTextStream("localhost", 1234)
s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-" +
System.currentTimeMillis())
val mainStrean = s.map(line => {
val Array(p1, p2) = line.split(" ")
(p1, p2.toInt)
})
.assignAscendingTimestamps(p => System.currentTimeMillis())
which naturally gives me this error:
[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
How can I do this?
Thanks.