http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Rocksdb-implementation-tp35199p35200.html
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;
import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;
/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {
/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;
/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;
/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);
MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}
/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}
/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}
/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;
//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());
resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());
resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());
resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());
checkDescripancyAndTriggerAlert(resultMessage);
return resultMessage;
}
/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}
/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done
Hi,
I have implemented the flink job with MapStates. The functionality is like,
- I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
- For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
- I have two MapStates in CoProcessFunction for both streams separately.
- When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
- When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
- Now, I want all the state data to be stored in Rocksdb.
- After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?
Help will be highly appreciated.
Thanks,
Jaswin