Rocksdb implementation

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Rocksdb implementation

jaswin.shah@outlook.com
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

jaswin.shah@outlook.com
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

Arvid Heise-3
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

Yun Tang
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

jaswin.shah@outlook.com
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

Congxian Qiu
Hi

Flink will store state in StateBackend, there exist two StateBackends: HeapStateBackend - which will store state in heap, and RocksDBStateBackend -- which will store state in RocksDB.

You can set RocksDB with the following ways:[1]
1. add `env.setStateBackend(...);` in your code
2. add configuration `state.backend: rocksdb` in `flink-conf.yaml`

Best,
Congxian


Jaswin Shah <[hidden email]> 于2020年5月19日周二 下午3:59写道:
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

jaswin.shah@outlook.com
In reply to this post by jaswin.shah@outlook.com
Thanks yun and Arvid. 
Just a question, is it possible to have a batch execution inside the same streaming job. You meant to say I should collect the missing messages from both streams in sideoutput on timer expiry. So, I will execute a batch job on side output as sideput will be shared with the same streaming job that I have. Basically, I need that missing message infos outside.

From: Jaswin Shah <[hidden email]>
Sent: 19 May 2020 13:29
To: Yun Tang <[hidden email]>; Arvid Heise <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

Arvid Heise-3
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform the same query on a windowed stream. So if you would execute the batchy part every day, you can just create a tumble window of 24h and then perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah <[hidden email]> wrote:
Thanks yun and Arvid. 
Just a question, is it possible to have a batch execution inside the same streaming job. You meant to say I should collect the missing messages from both streams in sideoutput on timer expiry. So, I will execute a batch job on side output as sideput will be shared with the same streaming job that I have. Basically, I need that missing message infos outside.

From: Jaswin Shah <[hidden email]>
Sent: 19 May 2020 13:29
To: Yun Tang <[hidden email]>; Arvid Heise <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

jaswin.shah@outlook.com
If I create such large tumbling window, that data will stay in memory for large time until the window is not triggered, right?  So,won't there be possibility of data loss, or flink would recover in case of any outage.

From: Arvid Heise <[hidden email]>
Sent: 20 May 2020 00:10
To: Jaswin Shah <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform the same query on a windowed stream. So if you would execute the batchy part every day, you can just create a tumble window of 24h and then perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah <[hidden email]> wrote:
Thanks yun and Arvid. 
Just a question, is it possible to have a batch execution inside the same streaming job. You meant to say I should collect the missing messages from both streams in sideoutput on timer expiry. So, I will execute a batch job on side output as sideput will be shared with the same streaming job that I have. Basically, I need that missing message infos outside.

From: Jaswin Shah <[hidden email]>
Sent: 19 May 2020 13:29
To: Yun Tang <[hidden email]>; Arvid Heise <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

Arvid Heise-3
If you enabled checkpointing (which is strongly recommended) [1], no data is lost.


On Tue, May 19, 2020 at 8:59 PM Jaswin Shah <[hidden email]> wrote:
If I create such large tumbling window, that data will stay in memory for large time until the window is not triggered, right?  So,won't there be possibility of data loss, or flink would recover in case of any outage.

From: Arvid Heise <[hidden email]>
Sent: 20 May 2020 00:10
To: Jaswin Shah <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform the same query on a windowed stream. So if you would execute the batchy part every day, you can just create a tumble window of 24h and then perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah <[hidden email]> wrote:
Thanks yun and Arvid. 
Just a question, is it possible to have a batch execution inside the same streaming job. You meant to say I should collect the missing messages from both streams in sideoutput on timer expiry. So, I will execute a batch job on side output as sideput will be shared with the same streaming job that I have. Basically, I need that missing message infos outside.

From: Jaswin Shah <[hidden email]>
Sent: 19 May 2020 13:29
To: Yun Tang <[hidden email]>; Arvid Heise <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb implementation

jaswin.shah@outlook.com
Okay, so on checkpointing window's data would also be persisted.

From: Arvid Heise <[hidden email]>
Sent: 20 May 2020 01:05
To: Jaswin Shah <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
If you enabled checkpointing (which is strongly recommended) [1], no data is lost.


On Tue, May 19, 2020 at 8:59 PM Jaswin Shah <[hidden email]> wrote:
If I create such large tumbling window, that data will stay in memory for large time until the window is not triggered, right?  So,won't there be possibility of data loss, or flink would recover in case of any outage.

From: Arvid Heise <[hidden email]>
Sent: 20 May 2020 00:10
To: Jaswin Shah <[hidden email]>
Cc: Yun Tang <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

you cannot run a DataSet program inside a DataStream. However, you can perform the same query on a windowed stream. So if you would execute the batchy part every day, you can just create a tumble window of 24h and then perform your batchy analysis on that time window.

Alternatively, you can dump the data into Kafka or a file system and then run the batchy part as a separate program.

On Tue, May 19, 2020 at 1:36 PM Jaswin Shah <[hidden email]> wrote:
Thanks yun and Arvid. 
Just a question, is it possible to have a batch execution inside the same streaming job. You meant to say I should collect the missing messages from both streams in sideoutput on timer expiry. So, I will execute a batch job on side output as sideput will be shared with the same streaming job that I have. Basically, I need that missing message infos outside.

From: Jaswin Shah <[hidden email]>
Sent: 19 May 2020 13:29
To: Yun Tang <[hidden email]>; Arvid Heise <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
++

From: Yun Tang <[hidden email]>
Sent: 18 May 2020 23:47
To: Arvid Heise <[hidden email]>; Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin

As Arvid suggested, it's not encouraged to query the internal RocksDB directly. Apart from Arvid's solution, I think queryable state [1] might also help you. I think you just want to know the left entries in both of map state after several days and query the state should make the meet, please refer to the official doc and this example [2] to know more details.



Best
Yun Tang

From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 23:40
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Rocksdb implementation
 
Hi Jaswin,

I'd discourage using rocksdb directly. It's more of an implementation detail of Flink. I'd also discourage to write to Kafka directly without using our Kafka Sink, as you will receive duplicates upon recovery.

If you run the KeyedCoProcessFunction continuously anyways, I'd add a timer (2 days?) [1] for all unmatched records and on triggering of the timer, output the record through a side output [2], where you do your batch logic. Then you don't need a separate batch job to clean that up. If you actually want to output to Kafka for some other application, you just need to stream the side output to a KafkaProducer.


On Mon, May 18, 2020 at 10:30 AM Jaswin Shah <[hidden email]> wrote:
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.paytm.reconsys.functions.processfunctions;

import com.paytm.reconsys.Constants;
import com.paytm.reconsys.configs.ConfigurationsManager;
import com.paytm.reconsys.enums.DescripancyTypeEnum;
import com.paytm.reconsys.exceptions.MissingConfigurationsException;
import com.paytm.reconsys.messages.ResultMessage;
import com.paytm.reconsys.messages.cart.CartMessage;
import com.paytm.reconsys.messages.cart.Payment;
import com.paytm.reconsys.messages.pg.PGMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import java.util.Properties;

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PGMessage, ResultMessage> {

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private MapState<String, CartMessage> cartState = null;

/**
* Map state for pg messages, orderId+mid is key and pgMessage is value.
*/
private MapState<String, PGMessage> pgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {
MapStateDescriptor<String, CartMessage> cartStateDescriptor = new MapStateDescriptor<> (
"cartData",
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor<String, PGMessage> pgStateDescriptor = new MapStateDescriptor<>(
"pgData",
TypeInformation.of(String.class),
TypeInformation.of(PGMessage.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
if(pgState.contains(searchKey)) {
generateResultMessage(cartMessage,pgState.get(searchKey));
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PGMessage pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
if(cartState.contains(searchKey)) {
generateResultMessage(cartState.get(searchKey),pgMessage);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}


/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private ResultMessage generateResultMessage(CartMessage cartMessage, PGMessage pgMessage) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(payment.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getOrderCompletionTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(cartMessage.getGrandtotal());

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPayMethod());

checkDescripancyAndTriggerAlert(resultMessage);

return resultMessage;
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndTriggerAlert(ResultMessage resultMessage) {
if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
//Send message to kafka queue for order status discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equals(resultMessage.getCartOrderAmount(), resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
//Send message to kafka queue for pay method discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
//Send message to kafka queue for pay amount discrepancy.
sendMessageToKafkaTopic(resultMessage.toString());
}
}

/**
* Send a message to kafka topic
*
* @param message
*/
private void sendMessageToKafkaTopic(String message) {
Properties kafkaProperties = ConfigurationsManager.getResultSystemKafkaProperties();
//kafkaProperties.put("transactional.id","trans123");
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties, new StringSerializer(), new StringSerializer());
//producer.initTransactions();
try {
//producer.beginTransaction();
producer.send(new ProducerRecord<>(ConfigurationsManager.getResultTopicName(), message));
//producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
} catch (MissingConfigurationsException e) {
e.printStackTrace();
}
producer.close();
}
}
This is the snapshot of implementation I have done

From: Jaswin Shah <[hidden email]>
Sent: 18 May 2020 13:55
To: [hidden email] <[hidden email]>
Subject: Rocksdb implementation
 
Hi,
I have implemented the flink job with MapStates. The functionality is like, 
  1. I have two datastreams which I connect with connect operator and then call coprocessfunction with every pair of objects.
  2. For element of first datastream, processElement1 method is called and for an element of second datastream, processElement2 method is called.
  3. I have two MapStates in CoProcessFunction for both streams separately.
  4. When processElement1 is called, it checks in MapState2 if corresponding element with given id is present, if present, I match, and delete. If not present, I add the object in MapState1.
  5. When processElement2 is called, it checks in MapState1 if corresponding element with given id is present, if present, I match and delete. I fnot present I add object in MapState2.
  6. Now, I want all the state data to be stored in Rocksdb.
  7. After few days, I want to run a batch streaming job on Rocksdb to check if there are any objects which have not match found to create a report of those.
I need a help how can I store this state data in Rocksdb and how to do setups, configurations and codes for those which I am not understanding. Also, is it possible to run batch streaming job on Rocksdb data?

Help will be highly appreciated.

Thanks,
Jaswin


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng