Hello,
I am having an issue with Event time stamp and timezone with Flink 1.8 (1.8 because I need it to work on AWS Kinesis) I have a very simple pipeline that read events from a stream, transform to a Table does a small window (Tumblin 1 min) aggregation and groupby, transforms back to a stream and sink the result. I have created a small Integration test where I pass a custom Source and Custom Sink Collector so that I can verify the results. I go inspired by this project to do the testing, https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java This is a snipped from my Integration Test. 0L is the event timestamp that will be used by the flink job. So here I'm firing all the events at 1970-01-01 00:00:00 ParallelSourceFunction<List<ObjectNode>> source =(https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java) My Flink pipeline uses a Tumbling Window of 1 minute and I add to the objects (which has a filed java.sql.Timestamp) the window.rowTime which is written to the sink. When I check the results in sink.result all the timestamp.getTime() are using my computer timezone (gtm +1). For example the first window which is 1970-01-01 00:00:59.999 has as timestamp.getTime() of `-3540001` I expected it to be 59999 which would really corespong to 1970-01-01 00:00:59.999. Is this a bug or do I have to setup something in order for Flink to consider all the timestamp UTC ? Thank you! |
Hi Faye,
the problem lies in the wrong design of JDK's java.sql.Timestamp. You can also find a nice summary in the answer here [1]. java.sql.Timestamp is timezone dependent. Internally, we subtract/normalize the timezone and work with the UNIX timestamp. Beginning from Flink 1.9 we are using the new Java time classes such as LocalDateTime. Until then it would be great to set the JVM's timezone to UTC or make remove the timezone both in sources and sinks. Regards, Timo [1] https://stackoverflow.com/a/43883203/806430 On 11.08.20 22:55, Faye Pressly wrote: > Hello, > > I am having an issue with Event time stamp and timezone with Flink 1.8 > (1.8 because I need it to work on AWS Kinesis) > > I have a very simple pipeline that read events from a stream, transform > to a Table does a small window (Tumblin 1 min) aggregation and groupby, > transforms back to a stream and sink the result. > > I have created a small Integration test where I pass a custom Source and > Custom Sink Collector so that I can verify the results. > > I go inspired by this project to do the testing, > https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java > > This is a snipped from my Integration Test. 0L is the event timestamp > that will be used by the flink job. So here I'm firing all the events at > 1970-01-01 00:00:00 > > ParallelSourceFunction<List<ObjectNode>> source = > new ParallelCollectionSource( > Arrays.asList( > new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)), > new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)), > new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)), > new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)), > new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)), > new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)), > new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L)))); > > CollectingSink sink =new CollectingSink(); > > new Pipeline().execute(source, sink); > > (https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java) > > My Flink pipeline uses a Tumbling Window of 1 minute and I add to the > objects (which has a filed java.sql.Timestamp) the window.rowTime which > is written to the sink. > > When I check the results in sink.result all the timestamp.getTime() are > using my computer timezone (gtm +1). > For example the first window which is 1970-01-01 00:00:59.999 has as > timestamp.getTime() of `-3540001` > > I expected it to beĀ 59999 which would really corespong to 1970-01-01 > 00:00:59.999. > > Is this a bug or do I have to setup something in order for Flink to > consider all the timestamp UTC ? > > Thank you! |
Free forum by Nabble | Edit this page |