I have simple flink stream program, where I am using socket as my continuous source
I have window size of 2 seconds. Somehow my window process function is not triggering and even if I pass events in any order, flink is not ignoring I can see the output only when I kill my socket , please find the code snippet below final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); DataStream<Price> price = env.socketTextStream("localhost", 9998).uid("price source").map(new MapFunction<String, Price>() { @Override public Price map(String s) throws Exception { return new Price(s.split(",")[0], LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new BigDecimal(s.split(",")[3]), s.split(",")[4], new BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) ); } } ); DataStream<Price> priceStream = price .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps() .withTimestampAssigner((p,timestamp) -> { ZoneId zoneId = ZoneId.systemDefault(); long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond(); System.out.println(epoch); return epoch; })) .keyBy(new KeySelector<Price, String>() { @Override public String getKey(Price price) throws Exception { return price.getPerformanceId(); } }).window(TumblingEventTimeWindows.of(Time.seconds(2))) .process(new ProcessWindowFunction<Price, Price, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Price> iterable, Collector<Price> collector) throws Exception { System.out.println(context.window().getStart()+ "Current watermark: "+context.window().getEnd()); Price p1 = null ; for(Price p : iterable) { System.out.println(p.toString()); p1= p; } collector.collect(p1); } }); priceStream.writeAsText("c:\\ab.txt"); also data I am inputting are p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00 p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01 p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02 p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03 p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04 p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01 p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01 p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01 ---Regards--- Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. |
I saw one potential issue. Your timestamp assigner returns timestamp in second resolution while Flink requires millisecond resolution. Kezhu Wang
On February 24, 2021 at 11:49:59, sagar ([hidden email]) wrote:
|
HI Corrected with below code, but still getting same issue Instant instant = p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant(); On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <[hidden email]> wrote:
---Regards---
Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. |
It is fairly simple requirement, if I changed it to PRocessing time it works fine , but not working with event time..help appreciated! On Wed, Feb 24, 2021 at 10:51 AM sagar <[hidden email]> wrote:
---Regards---
Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. |
Try `env.setParallelism(1)`. Default parallelism for local environment is `Runtime.getRuntime.availableProcessors`. You test data set are so small that when they are scatter cross multiple parallel instances, there will be no data with event time assigned to trigger downstream computation. Or you could try `WatermarkStrategy.withIdleness`. Best, Kezhu Wang On February 24, 2021 at 15:43:47, sagar ([hidden email]) wrote:
|
Thanks Kezhu, It worked!!! On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang <[hidden email]> wrote:
---Regards---
Sagar Bandal This is confidential mail ,All Rights are Reserved.If you are not intended receipiant please ignore this email. |
Hi, Glad to hear. Normally, you would not encounter this if there are massive data. `WatermarkStrategy.withIdleness` could be more appropriate in production. Best, Kezhu Wang On February 24, 2021 at 22:35:11, sagar ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |