Hi everyone,
I have implemented a way to measure latency in a DataStream (I hope): I'm consuming a Kafka topic and I'm union'ing the resulting stream with a custom source that emits a (machine-local) timestamp every 1000ms (using currentTimeMillis). On the consuming end I'm distinguishing between the Kafka events and the timestamps. When encountering a timestamp, I take the difference of the processing machine's local time and the timestamp found in the stream, expecting a positive difference (with the processing machine's timestamp being larger than the timestamp found in the stream). However, the opposite is the case. Now I am wondering about when events are actually processed. Union the Stream from Kafka+my custom source, batching them in 10s windows (which is what I do), I expect 10 timestamps with ascending values and a rough gap of 1000ms in the stream: On the receiving end I again take the currentTimeMillis in my fold function, expecting the resulting value to be larger (most of the time) than the timestamps encountered in the stream: The system clocks are in sync up to 1ms. Maybe I am not clear about when certain timestamps are created (i.e. when the UDFs are invoked) or how windows are processed. Any advice is greatly appreciated, also alternative approaches to calculating latency. I'm on Flink 0.10.2 by the way. Thanks in advance for the help! Robert My GPG Key ID: 336E2680 |
1. why are you doing join instead of something like System.currentTimeInMillis()? at the end you have tuple of your data with timestamp anyways...so why just not to wrap you data in tuple2 with additional info of creation ts? you can use ntp for this. On 2 May 2016 at 20:02, Robert Schmidtke <[hidden email]> wrote:
|
Hi Igor, thanks for your reply. As for your first point I'm not sure I understand correctly. I'm ingesting records at a rate of about 50k records per second, and those records are fairly small. If I add a time stamp to each of them, I will have a lot more data, which is not exactly what I want. Instead I wanted to add something like a watermark once every second and only have a time stamp on this one, and calculate the latency from it. For your second point, in fact the clocks are up to 8s apart -.-" not sure how I missed this yesterday. as I'm not an admin of the machine I will request ntp to be set up. Thanks! Robert On Mon, May 2, 2016 at 10:19 PM, Igor Berman <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
After fixing the clock issue on the application level, the latency is as expected. Thanks again! Robert On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke <[hidden email]> wrote:
My GPG Key ID: 336E2680 |
Free forum by Nabble | Edit this page |