Hi,
It's hard for me to help you debug your code, but as long as:
- you are using event time for processing records (in operators like `WindowOperator`)
- you do not have late records
- you are replaying the same records
- your code is deterministic
- you do not rely on the order of the records
Flink should behave deterministically and the results should be the same.
Maybe try to write unit tests/integration tests for your operators/logic and feed some pre computed input? Or try to reproduce the problem and then narrow it down to some single user_id/key, create a unit test/it case for this and debug your code in a debugger on a local machine?
One thing to note, are you sure you are reprocessing the same records? Kafka has for example the concept of retention time, after it can drop older records from the topic.
Piotrek
I have a streaming job where I am doing window operation on
"user_id" and then doing some summarization based on some time bases logic like :
1. end the session based on 30 mins inactivity of the user.
2. The End_trip event or cancellation event has arrived for the user.
I am trying to rerun the job with some old offset for backfilling then I am getting wrong results. Some of the sessions is ending with same start and end time. How to control the streaming job when lot of data get accumulated in Kafka and I have to replay the job. Please help me what is going wrong.
My assumption is it may be due to:
1. Out of order events
2. I am reading data from multiple topics so the end_trip event that is happening at a later time can be read before and end the session.
I am using keyedProcessFunction like this :
public class DemandFunnelProcessFunction extends
KeyedProcessFunction<Tuple, Tuple2<Long, GenericRecord>, DemandFunnelSummaryTuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(DemandFunnelProcessFunction.class);
private transient ValueState<DemandFunnelSummaryTuple> sessionSummary;
private transient ValueState<ArrayList<Integer>> distanceListState;
@SuppressWarnings("checkstyle:LocalVariableName")
@Override
public void processElement(Tuple2<Long, GenericRecord> recordTuple2, Context context,
Collector<DemandFunnelSummaryTuple> collector) throws Exception {
GenericRecord record = recordTuple2.f1;
String event_name = record.get("event_name").toString();
long event_ts = (Long) record.get("event_ts");
DemandFunnelSummaryTuple currentTuple = sessionSummary.value();
ArrayList<Integer> distanceList =
distanceListState.value() != null ? distanceListState.value() : new ArrayList<Integer>();
try {
if (currentTuple == null) {
currentTuple = new DemandFunnelSummaryTuple();
String demandSessionId = UUID.randomUUID().toString();
currentTuple.setDemandSessionId(demandSessionId);
currentTuple.setStartTime(event_ts);
currentTuple.setUserId(recordTuple2.f0);
currentTuple.setEventName("demand_funnel_summary");
int geo_id = record.get("geo_id") != null ? (int) record.get("geo_id") : 0;
currentTuple.setGeoId(geo_id);
}
long endTime = currentTuple.getEndTime();
if (event_name.equals("search_list_keyless")) {
//System.out.println("inside search_list_keyless " + recordTuple2.f0);
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchEventUtil.searchSummaryCalculation(record, currentTuple, distanceList);
}
currentTuple.setEndTime(event_ts);
sessionSummary.update(currentTuple);
distanceListState.update(distanceList);
if (event_name.equals("keyless_booking_cancellation") || event_name
.equals("keyless_end_trip")) {
try {
DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
if (sessionSummaryTuple != null) {
sessionSummaryTuple.setAvgResultCount(
(double) distanceList.size() / sessionSummaryTuple.getTotalSearch());
if (distanceList.size() > 0) {
int distanceSum = distanceList.stream()
.collect(Collectors.summingInt(Integer::intValue));
sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / distanceList.size());
sessionSummaryTuple
.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
sessionSummaryTuple
.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
}
sessionSummaryTuple.setEndTime(event_ts);
collector.collect(sessionSummaryTuple);
}
} catch (Exception e) {
DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
LOGGER.info("Error in collecting event for user_id " + sessionSummaryTuple.getUserId());
e.printStackTrace();
}
sessionSummary.clear();
distanceListState.clear();
}
} catch (Exception e) {
LOGGER.info("error in processing event --" + recordTuple2.f1.toString());
LOGGER.info(e.toString());
e.printStackTrace();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<DemandFunnelSummaryTuple> out)
throws Exception {
try {
DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
if (sessionSummaryTuple != null) {
System.out.println(
"calling on timer" + sessionSummaryTuple.getUserId() + " " + sessionSummaryTuple
.getEndTime() + " " + timestamp);
ArrayList<Integer> distanceList = distanceListState.value();
if (distanceList != null && distanceList.size() > 0) {
sessionSummaryTuple
.setAvgResultCount(
(double) distanceList.size() / sessionSummaryTuple.getTotalSearch());
int distanceSum = distanceList.stream().collect(Collectors.summingInt(Integer::intValue));
sessionSummaryTuple.setAvgBikeDistance((double) distanceSum / distanceList.size());
sessionSummaryTuple.setP50DistNearestBike(SearchEventUtil.percentile(distanceList, 50));
sessionSummaryTuple.setP90DistNearestBike(SearchEventUtil.percentile(distanceList, 90));
}
sessionSummaryTuple.setEndTime(timestamp);
out.collect(sessionSummaryTuple);
}
} catch (Exception e) {
DemandFunnelSummaryTuple sessionSummaryTuple = sessionSummary.value();
if (sessionSummaryTuple != null) {
LOGGER.info("Error in collecting event for user_id " + sessionSummaryTuple.getUserId());
}
e.printStackTrace();
}
sessionSummary.clear();
distanceListState.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<DemandFunnelSummaryTuple> descriptor =
new ValueStateDescriptor<DemandFunnelSummaryTuple>(
"demand_session", // the state name
TypeInformation.of(new TypeHint<DemandFunnelSummaryTuple>() {
}));
sessionSummary = getRuntimeContext().getState(descriptor);
ValueStateDescriptor<ArrayList<Integer>> disDescriptor =
new ValueStateDescriptor<ArrayList<Integer>>(
"distance_state", // the state name
TypeInformation.of(new TypeHint<ArrayList<Integer>>() {
}));
distanceListState = getRuntimeContext().getState(disDescriptor);
}
}
--
Thanks & Regards,
Anuj Jain