Hi, All
When i use the Tumbling Windows, find lost some record. My code as follow env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.addSource(FlinkKafkaConsumer011......) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) { @Override public long extractTimestamp(JSONObject jsonObject) { long logTime = jsonObject.getLongValue("logTime"); return logTime; } }) .keyBy(jsonObject -> { return jsonObject.getString("userId"); }) .timeWindow(Time.seconds(30)) .process(new ProcessWindowFunction<JSONObject, JSONObject, String, TimeWindow>() { public void process(String key, Context context, Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String start = sdf.format(new Date(context.window().getStart())); String end = sdf.format(new Date(context.window().getEnd())); System.out.println(start + "----" + end); for (JSONObject jsonObject : iterable) { collector.collect(jsonObject); }}} .print("===="); From the print result, i found lost some record in the tumbling window. I can't figure out, any one can help me ? |
Hi, Can you share more details what do you mean that you loose some records? Can you share what data are you ingesting what are the expected results and what are the actual results you are getting. Without that it's impossible to help you. So far your code looks rather correct. Best, Dawid On 26/03/2020 08:52, Jim Chen wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |