When i use the Tumbling Windows, find lost some record

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

When i use the Tumbling Windows, find lost some record

Jim Chen
Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(FlinkKafkaConsumer011......)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject) {
                        long logTime = jsonObject.getLongValue("logTime");
                        return logTime;
                    }
                })
.keyBy(jsonObject -> {
                    return jsonObject.getString("userId");
                })
.timeWindow(Time.seconds(30))
.process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                 public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String start = sdf.format(new Date(context.window().getStart()));
                        String end = sdf.format(new Date(context.window().getEnd()));
                        System.out.println(start + "----" + end);
                        for (JSONObject jsonObject : iterable) {
                           collector.collect(jsonObject);
}}}
.print("====");

From the print result, i found lost some record in the tumbling window. I can't figure out, any one can help me ?
Reply | Threaded
Open this post in threaded view
|

Re: When i use the Tumbling Windows, find lost some record

Dawid Wysakowicz-2

Hi,

Can you share more details what do you mean that you loose some records? Can you share what data are you ingesting what are the expected results and what are the actual results you are getting. Without that it's impossible to help you. So far your code looks rather correct.

Best,

Dawid

On 26/03/2020 08:52, Jim Chen wrote:
Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(FlinkKafkaConsumer011......)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) {
                    @Override
                    public long extractTimestamp(JSONObject jsonObject) {
                        long logTime = jsonObject.getLongValue("logTime");
                        return logTime;
                    }
                })
.keyBy(jsonObject -> {
                    return jsonObject.getString("userId");
                })
.timeWindow(Time.seconds(30))
.process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() {
                 public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception {
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String start = sdf.format(new Date(context.window().getStart()));
                        String end = sdf.format(new Date(context.window().getEnd()));
                        System.out.println(start + "----" + end);
                        for (JSONObject jsonObject : iterable) {
                           collector.collect(jsonObject);
}}}
.print("====");

From the print result, i found lost some record in the tumbling window. I can't figure out, any one can help me ?

signature.asc (849 bytes) Download Attachment