Thanks for responding Alexander.
We have solved the problem now with ValueState now. Basically, here we are implementing outer join logic with custom keyedCoprocessFunction implementations.
Hi Jaswin,
I would like to clarify something first - what do you key your streams by, when joining them?
It seems that what you want to do is to match each CartMessage with a corresponding Payment that has the same orderId+mid. If this is the case, you probably do not need the MapState in the first place.
Alexander Fedulov
| Solutions Architect
us @VervericaData
Forward - The
Apache Flink Conference
Processing | Event Driven | Real Time
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {
private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);
/** * Map state for cart messages, orderId+mid is key and cartMessage is value. */ private static MapState<String, CartMessage> cartState = null;
/** * Map state for pg messages, orderId+mid is key and pgMessage is value. */ private static MapState<String, PaymentNotifyRequestWrapper> pgState = null;
/** * Intializations for cart and pg mapStates * * @param config */ @Override public void open(Configuration config) { MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> ( Constants.CART_DATA, TypeInformation.of(String.class), TypeInformation.of(CartMessage.class) ); cartState = getRuntimeContext().getMapState(cartStateDescriptor);
MapStateDescriptor<String, PaymentNotifyRequestWrapper> pgStateDescriptor = new MapStateDescriptor<>( Constants.PG_DATA, TypeInformation.of(String.class), TypeInformation.of(PaymentNotifyRequestWrapper.class) ); pgState = getRuntimeContext().getMapState(pgStateDescriptor); }
/** * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from pgMapState. * 3. If not present, add orderId+mid as key and cart object as value in cartMapState. * @param cartMessage * @param context * @param collector * @throws Exception */ @Override public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception { String searchKey = cartMessage.createJoinStringCondition(); PaymentNotifyRequestWrapper paymentNotifyObject = pgState.get(searchKey); if(Objects.nonNull(paymentNotifyObject)) { generateResultMessage(cartMessage,paymentNotifyObject,collector); pgState.remove(searchKey); } else { cartState.put(searchKey,cartMessage); } }
/** * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present. * 2. If present, match, checkDescripancy, process and delete entry from cartMapState. * 3. If not present, add orderId+mid as key and cart object as value in pgMapState. * @param pgMessage * @param context * @param collector * @throws Exception */ @Override public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception { String searchKey = pgMessage.createJoinStringCondition(); CartMessage cartMessage = cartState.get(searchKey); if(Objects.nonNull(cartMessage)) { generateResultMessage(cartMessage,pgMessage,collector); cartState.remove(searchKey); } else { pgState.put(searchKey,pgMessage); } }
/** * Create ResultMessage from cart and pg messages. * * @param cartMessage * @param pgMessage * @return */ private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) { ResultMessage resultMessage = new ResultMessage(); Payment payment = null;
//Logic should be in cart: check for (Payment pay : cartMessage.getPayments()) { if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) { payment = pay; break; } } if(Objects.isNull(payment)) { return; }
resultMessage.setOrderId(cartMessage.getId()); resultMessage.setMid(payment.getMid());
resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode()); resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
resultMessage.setCartOrderCompletionTime(payment.getUpdated_at()); resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());
resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue()); resultMessage.setCartOrderAmount(String.valueOf(cartMessage.getGrandtotal().longValue()));
resultMessage.setCartPaymethod(payment.getPayment_method()); resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());
checkDescripancyAndCollectResult(resultMessage,collector); }
/** * Evaluate if there is descripancy of any fields between the messages from two different systems. * Write all the descripancy logic here. * * @param resultMessage */ private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) { resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY); collector.collect(resultMessage.clone()); }
if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY); collector.collect(resultMessage.clone()); }
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) { resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY); collector.collect(resultMessage.clone()); } } }
I have implemented the flink job with MapStates. The functionality is like,
- I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
- For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
- I have two MapStates in CoProcessFunction for both streams separately.
- When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
- When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
- Now, I want all the state data to be stored in Rocksdb.
- After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help on how to configure TTL for messages and collect them to ontimer method on missing element timeout expiry and how to collect this data in sideoutputs and run a batch process over side output. Few code examples would be appreciated.