How to count number of records received per second in processing time while using event time characteristic

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to count number of records received per second in processing time while using event time characteristic

Saiph Kappa
Hi,

I have a flink streaming application and I want to count records received per second (as a way of measuring the throughput of my application). However, I am using the EventTime time characteristic, as follows:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val
s = env.socketTextStream("localhost", 1234)

s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-" +
System.currentTimeMillis())

val mainStrean = s.map(line => {
val Array(p1, p2) = line.split(" ")
(p1, p2.toInt)
})
.assignAscendingTimestamps(p => System.currentTimeMillis())
which naturally gives me this error:

[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

How can I do this?


Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: How to count number of records received per second in processing time while using event time characteristic

Aljoscha Krettek
Hi,
you can explicitly specify that you want processing-time windows like this:

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(...)

Also note that the timestamp you append in "writeAsCsv("records-per-second-" + System.currentTimeMillis())" will only take the timestamp at the time when this function is called, this will only happen once when your program is started.

Best,
Aljoscha

On Tue, 28 Jun 2016 at 17:33 Saiph Kappa <[hidden email]> wrote:
Hi,

I have a flink streaming application and I want to count records received per second (as a way of measuring the throughput of my application). However, I am using the EventTime time characteristic, as follows:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val
s = env.socketTextStream("localhost", 1234)

s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-" +
System.currentTimeMillis())

val mainStrean = s.map(line => {
val Array(p1, p2) = line.split(" ")
(p1, p2.toInt)
})
.assignAscendingTimestamps(p => System.currentTimeMillis())
which naturally gives me this error:

[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

How can I do this?


Thanks.