How can I identify the type of element for which onTime is called in flink?
I want to store the objects for which onTimer is called to sideOutputs and then streamout the sideoutput data to kafka topic. I am not understanding how to stream out the sideoutput data like where should I write that processing logic. Below is the code snippet
I have done so far
/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {
private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);
/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private static MapState<String, CartMessage> cartState = null;
/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;
/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);
MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}
/**
*
* @return
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
}
/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}
/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
context.timerService().registerEventTimeTimer(context.timestamp()+3600000);
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}
/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;
//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());
resultMessage.setPgOrderAmount(Double.valueOf(pgMessage.getOrderAmount().getValue()));
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());
logger.info("cart amount {} pg amount {} ",resultMessage.getCartOrderAmount(),resultMessage.getPgOrderAmount());
resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
checkDescripancyAndCollectResult(resultMessage,collector);
}
/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
}
}
Help will be highly appreciated.