Using Event Timestamp sink get's back with machine timezone

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Event Timestamp sink get's back with machine timezone

Faye Pressly
Hello,

I am having an issue with Event time stamp and timezone with Flink 1.8 (1.8 because I need it to work on AWS Kinesis)

I have a very simple pipeline that read events from a stream, transform to a Table does a small window (Tumblin 1 min) aggregation and groupby, transforms back to a stream and sink the result.

I have created a small Integration test where I pass a custom Source and Custom Sink Collector so that I can verify the results.

I go inspired by this project to do the testing, https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java

This is a snipped from my Integration Test. 0L is the event timestamp that will be used by the flink job. So here I'm firing all the events at 1970-01-01 00:00:00
ParallelSourceFunction<List<ObjectNode>> source =
new ParallelCollectionSource(
Arrays.asList(
new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)),
new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)),
new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L))));
CollectingSink sink = new CollectingSink();
new Pipeline().execute(source, sink);
(https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java)

My Flink pipeline uses a Tumbling Window of 1 minute and I add to the objects (which has a filed java.sql.Timestamp) the window.rowTime which is written to the sink.

When I check the results in sink.result all the timestamp.getTime() are using my computer timezone (gtm +1).
For example the first window which is 1970-01-01 00:00:59.999 has as timestamp.getTime() of `-3540001`

I expected it to be 59999 which would really corespong to 1970-01-01 00:00:59.999.

Is this a bug or do I have to setup something in order for Flink to consider all the timestamp UTC ?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Using Event Timestamp sink get's back with machine timezone

Timo Walther
Hi Faye,

the problem lies in the wrong design of JDK's java.sql.Timestamp. You
can also find a nice summary in the answer here [1]. java.sql.Timestamp
is timezone dependent. Internally, we subtract/normalize the timezone
and work with the UNIX timestamp. Beginning from Flink 1.9 we are using
the new Java time classes such as LocalDateTime. Until then it would be
great to set the JVM's timezone to UTC or make remove the timezone both
in sources and sinks.

Regards,
Timo

[1] https://stackoverflow.com/a/43883203/806430


On 11.08.20 22:55, Faye Pressly wrote:

> Hello,
>
> I am having an issue with Event time stamp and timezone with Flink 1.8
> (1.8 because I need it to work on AWS Kinesis)
>
> I have a very simple pipeline that read events from a stream, transform
> to a Table does a small window (Tumblin 1 min) aggregation and groupby,
> transforms back to a stream and sink the result.
>
> I have created a small Integration test where I pass a custom Source and
> Custom Sink Collector so that I can verify the results.
>
> I go inspired by this project to do the testing,
> https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
>
> This is a snipped from my Integration Test. 0L is the event timestamp
> that will be used by the flink job. So here I'm firing all the events at
> 1970-01-01 00:00:00
>
> ParallelSourceFunction<List<ObjectNode>> source =
>    new ParallelCollectionSource(
>      Arrays.asList(
>        new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)),
> new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)),
> new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L))));
>
> CollectingSink sink =new CollectingSink();
>
> new Pipeline().execute(source, sink);
>
> (https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java)
>
> My Flink pipeline uses a Tumbling Window of 1 minute and I add to the
> objects (which has a filed java.sql.Timestamp) the window.rowTime which
> is written to the sink.
>
> When I check the results in sink.result all the timestamp.getTime() are
> using my computer timezone (gtm +1).
> For example the first window which is 1970-01-01 00:00:59.999 has as
> timestamp.getTime() of `-3540001`
>
> I expected it to beĀ 59999 which would really corespong to 1970-01-01
> 00:00:59.999.
>
> Is this a bug or do I have to setup something in order for Flink to
> consider all the timestamp UTC ?
>
> Thank you!