Timeout Callbacks issue -Flink

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private static MapState<String, CartPG> cartPgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {

MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartPG.class)
);
cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
}

/**
*
* @return
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
logger.info("On timer called key is {}",ctx.getCurrentKey());
String searchKey = ctx.getCurrentKey();
CartPG cartPg = cartPgState.get(searchKey);
if(Objects.nonNull(cartPg)) {
ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
cartPgState.remove(searchKey);
}
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
Long cartEventTimeStamp = context.timestamp();
logger.info("cart time : {} ",cartEventTimeStamp);
context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

String searchKey = cartMessage.createJoinStringCondition();

CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setCartMessage(cartMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

Long pgEventTimeStamp = context.timestamp();
logger.info("pg time : {} ",pgEventTimeStamp);
context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

String searchKey = pgMessage.createJoinStringCondition();
CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setPgMessage(pgMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}

resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

checkDescripancyAndCollectResult(resultMessage,collector);
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
collector.collect(resultMessage.clone());
}

if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
collector.collect(resultMessage.clone());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
}
}

/**
* Connect to cart and pg streams and process
*
* @param cartStream
* @param pgStream
* @return
*/
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
.process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
try {
Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting cart message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
try {
Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting pg message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
//1. Consume cartStream
SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
cartStream.name(Constants.CART_SYSTEM);

//2. Filter cart messages
SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction()) ;

//3. Map carts data
filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

//4. Assign timestamps and watermarks
filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
@Override
public long extractTimestamp(CartMessage cartMessage) {
return DateTimeUtils.extractCartTimeStamp(cartMessage);
}
});
return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

//1. Consume pg streams
SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

pgStream.name(Constants.PG_SYSTEM);

//2. Assign timestamps and watermarks to pg messages
pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
@Override
public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
return DateTimeUtils.extractPGTimeStamp(pgMessage);
}
});
return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah <[hidden email]>
Sent: 23 May 2020 17:18
To: [hidden email] <[hidden email]>; Arvid Heise <[hidden email]>; Yun Tang <[hidden email]>
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.

/**
* CoProcessFuntion to process cart and pg messages connected using connect operator.
* @author jaswin.shah
* @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
*/
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
* Map state for cart messages, orderId+mid is key and cartMessage is value.
*/
private static MapState<String, CartPG> cartPgState = null;

/**
* Intializations for cart and pg mapStates
*
* @param config
*/
@Override
public void open(Configuration config) {

MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartPG.class)
);
cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
}

/**
*
* @return
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
logger.info("On timer called key is {}",ctx.getCurrentKey());
String searchKey = ctx.getCurrentKey();
CartPG cartPg = cartPgState.get(searchKey);
if(Objects.nonNull(cartPg)) {
ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
cartPgState.remove(searchKey);
}
}

/**
* 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
* 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
* @param cartMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
Long cartEventTimeStamp = context.timestamp();
logger.info("cart time : {} ",cartEventTimeStamp);
context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

String searchKey = cartMessage.createJoinStringCondition();

CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setCartMessage(cartMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
* 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
* 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
* 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
* @param pgMessage
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

Long pgEventTimeStamp = context.timestamp();
logger.info("pg time : {} ",pgEventTimeStamp);
context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

String searchKey = pgMessage.createJoinStringCondition();
CartPG cartPG = cartPgState.get(searchKey);

if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
cartPgState.remove(searchKey);
} else {
cartPG = new CartPG();
cartPG.setPgMessage(pgMessage);
cartPgState.put(searchKey,cartPG);
}
}

/**
* Create ResultMessage from cart and pg messages.
*
* @param cartMessage
* @param pgMessage
* @return
*/
private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}

resultMessage.setOrderId(cartMessage.getId());
resultMessage.setMid(payment.getMid());

resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

resultMessage.setCartPaymethod(payment.getPayment_method());
resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

checkDescripancyAndCollectResult(resultMessage,collector);
}

/**
* Evaluate if there is descripancy of any fields between the messages from two different systems.
* Write all the descripancy logic here.
*
* @param resultMessage
*/
private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
collector.collect(resultMessage.clone());
}

if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
collector.collect(resultMessage.clone());
}

if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
collector.collect(resultMessage.clone());
}
}
}

/**
* Connect to cart and pg streams and process
*
* @param cartStream
* @param pgStream
* @return
*/
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
.process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
try {
Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting cart message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
try {
Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
return orderTimeStamp.getTime();
} catch (ParseException e) {
logger.error("Exception in converting pg message timeStamp..",e);
}
return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
//1. Consume cartStream
SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
cartStream.name(Constants.CART_SYSTEM);

//2. Filter cart messages
SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction()) ;

//3. Map carts data
filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

//4. Assign timestamps and watermarks
filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
@Override
public long extractTimestamp(CartMessage cartMessage) {
return DateTimeUtils.extractCartTimeStamp(cartMessage);
}
});
return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

//1. Consume pg streams
SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

pgStream.name(Constants.PG_SYSTEM);

//2. Assign timestamps and watermarks to pg messages
pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
@Override
public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
return DateTimeUtils.extractPGTimeStamp(pgMessage);
}
});
return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

Dawid Wysakowicz-2

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
In reply to this post by jaswin.shah@outlook.com
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
One question Dawid:
If I maintain a ValueState of Maps if this is what you were referring to:
  1. In my use case, I am registering the timeout for a key when I store that in state. i.e. If I do not receive the matching event with same key from other stream, then I would like to receive a callback and invoke a onTimer method from which I am pushing the unmatched events to sideouput.
  2. In case of ValueState, if I register the callback timeout on current key, for the past information in ValueState will flink give me the callback for the events state at that point of time in past? as ValueState will get updated with most recent user key on every events arrival.

From: Jaswin Shah <[hidden email]>
Sent: 25 May 2020 14:47
To: Dawid Wysakowicz <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Timeout Callbacks issue -Flink
 
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

Dawid Wysakowicz-2
In reply to this post by jaswin.shah@outlook.com

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
In reply to this post by jaswin.shah@outlook.com
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah <[hidden email]>
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

Dawid Wysakowicz-2

I don't necessarily know how can I better describe it. The MapState/ValueState is always implicitly scoped to the current key. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current key. It will not have anything you stored for different keys. ValueState will have the value that you stored there for OnTimerContext.getCurrentKey(in onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah [hidden email]
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
In reply to this post by jaswin.shah@outlook.com
OMG!!! If this is the case Dawid, I think I am solving the problem in an incorrect way. 
Here I would like to explain my use-case:
Basically, I have two streams 1 and 2, and I am storing events of stream1 to MapState and when ever events are arrived from stream2, I check in MapState if that event with same key is already present in MapState. If present I take it, match and purge from MapState. While an event arrives, I register that event with its key to timerService to with expiryTimeout. When expiryTimeout for that key/event reaches and no event from stream2 is matched yet, callback is received to onTimer method where I fetch the event from MapState registered for that key and send that to sideoutput which I send to some sink.
As you are telling ctx.getCurrentKey() will give me currently scoped key only to KeyedCoProcessFunction which actually a bit scaring me here.
If this is a case, I don't understand why callback mechanism (onTimer) is then designed in flink.
Or if this is the only case you talked, what is the way to get the key registered in past from callback method i.e. the key for which I received the callback.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:57
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

I don't necessarily know how can I better describe it. The MapState/ValueState is always implicitly scoped to the current key. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current key. It will not have anything you stored for different keys. ValueState will have the value that you stored there for OnTimerContext.getCurrentKey(in onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah [hidden email]
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
Like, callback mechanism is designed to get a callback for key registered when the timer reaches the expiry registered. So, at the moment flink gives the callback, there must be a mechanism to get a key for which I am receiving the call back, right? If this is not possible what is sole purpose of callback mechanism? As anyone will for sure would like to get the key for which callback arrived to take some action on that key.
If this is not possible, is callback mechanism only for state clearance, in that case only TTL would be sufficient for me.

Regards,
Jaswin

From: Jaswin Shah <[hidden email]>
Sent: 25 May 2020 16:06
To: Dawid Wysakowicz <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Timeout Callbacks issue -Flink
 
OMG!!! If this is the case Dawid, I think I am solving the problem in an incorrect way. 
Here I would like to explain my use-case:
Basically, I have two streams 1 and 2, and I am storing events of stream1 to MapState and when ever events are arrived from stream2, I check in MapState if that event with same key is already present in MapState. If present I take it, match and purge from MapState. While an event arrives, I register that event with its key to timerService to with expiryTimeout. When expiryTimeout for that key/event reaches and no event from stream2 is matched yet, callback is received to onTimer method where I fetch the event from MapState registered for that key and send that to sideoutput which I send to some sink.
As you are telling ctx.getCurrentKey() will give me currently scoped key only to KeyedCoProcessFunction which actually a bit scaring me here.
If this is a case, I don't understand why callback mechanism (onTimer) is then designed in flink.
Or if this is the only case you talked, what is the way to get the key registered in past from callback method i.e. the key for which I received the callback.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:57
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

I don't necessarily know how can I better describe it. The MapState/ValueState is always implicitly scoped to the current key. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current key. It will not have anything you stored for different keys. ValueState will have the value that you stored there for OnTimerContext.getCurrentKey(in onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah [hidden email]
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

Dawid Wysakowicz-2

Where did I say you cannot get the key for which the onTimer method is called? You can and you do it with OnTimerContext.getCurrentKey, the way you do it. My only point is that if I understand your code correctly you will only ever have a single entry in the MapState for every key. Remember that my assumption is that the


String searchKey = cartMessage.createJoinStringCondition(); // if my assumption is correct you could also use Context.getCurrentKey();


is the same with what you use in the keyBy:


.keyBy(new CartJoinColumnsSelector(), ...)


Best,

Dawid



On 25/05/2020 13:06, Jaswin Shah wrote:
Like, callback mechanism is designed to get a callback for key registered when the timer reaches the expiry registered. So, at the moment flink gives the callback, there must be a mechanism to get a key for which I am receiving the call back, right? If this is not possible what is sole purpose of callback mechanism? As anyone will for sure would like to get the key for which callback arrived to take some action on that key.
If this is not possible, is callback mechanism only for state clearance, in that case only TTL would be sufficient for me.

Regards,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 25 May 2020 16:06
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
OMG!!! If this is the case Dawid, I think I am solving the problem in an incorrect way. 
Here I would like to explain my use-case:
Basically, I have two streams 1 and 2, and I am storing events of stream1 to MapState and when ever events are arrived from stream2, I check in MapState if that event with same key is already present in MapState. If present I take it, match and purge from MapState. While an event arrives, I register that event with its key to timerService to with expiryTimeout. When expiryTimeout for that key/event reaches and no event from stream2 is matched yet, callback is received to onTimer method where I fetch the event from MapState registered for that key and send that to sideoutput which I send to some sink.
As you are telling ctx.getCurrentKey() will give me currently scoped key only to KeyedCoProcessFunction which actually a bit scaring me here.
If this is a case, I don't understand why callback mechanism (onTimer) is then designed in flink.
Or if this is the only case you talked, what is the way to get the key registered in past from callback method i.e. the key for which I received the callback.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:57
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

I don't necessarily know how can I better describe it. The MapState/ValueState is always implicitly scoped to the current key. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current key. It will not have anything you stored for different keys. ValueState will have the value that you stored there for OnTimerContext.getCurrentKey(in onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah [hidden email]
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Timeout Callbacks issue -Flink

jaswin.shah@outlook.com
In reply to this post by jaswin.shah@outlook.com
Yeah yes, I got it what u tried to convey.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 16:48
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Where did I say you cannot get the key for which the onTimer method is called? You can and you do it with OnTimerContext.getCurrentKey, the way you do it. My only point is that if I understand your code correctly you will only ever have a single entry in the MapState for every key. Remember that my assumption is that the


String searchKey = cartMessage.createJoinStringCondition(); // if my assumption is correct you could also use Context.getCurrentKey();


is the same with what you use in the keyBy:


.keyBy(new CartJoinColumnsSelector(), ...)


Best,

Dawid



On 25/05/2020 13:06, Jaswin Shah wrote:
Like, callback mechanism is designed to get a callback for key registered when the timer reaches the expiry registered. So, at the moment flink gives the callback, there must be a mechanism to get a key for which I am receiving the call back, right? If this is not possible what is sole purpose of callback mechanism? As anyone will for sure would like to get the key for which callback arrived to take some action on that key.
If this is not possible, is callback mechanism only for state clearance, in that case only TTL would be sufficient for me.

Regards,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 25 May 2020 16:06
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
OMG!!! If this is the case Dawid, I think I am solving the problem in an incorrect way. 
Here I would like to explain my use-case:
Basically, I have two streams 1 and 2, and I am storing events of stream1 to MapState and when ever events are arrived from stream2, I check in MapState if that event with same key is already present in MapState. If present I take it, match and purge from MapState. While an event arrives, I register that event with its key to timerService to with expiryTimeout. When expiryTimeout for that key/event reaches and no event from stream2 is matched yet, callback is received to onTimer method where I fetch the event from MapState registered for that key and send that to sideoutput which I send to some sink.
As you are telling ctx.getCurrentKey() will give me currently scoped key only to KeyedCoProcessFunction which actually a bit scaring me here.
If this is a case, I don't understand why callback mechanism (onTimer) is then designed in flink.
Or if this is the only case you talked, what is the way to get the key registered in past from callback method i.e. the key for which I received the callback.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:57
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

I don't necessarily know how can I better describe it. The MapState/ValueState is always implicitly scoped to the current key. It will be scoped this way in all functions of the operator. In processElement1, processElement2, onTimer. It will always hold whatever you stored there for the current key. It will not have anything you stored for different keys. ValueState will have the value that you stored there for OnTimerContext.getCurrentKey(in onTimer)/Context.getCurrentKey(in processElementX).


I really encourage you to analyze the example I posted as it it quite similar to what you are doing.

Best,

Dawid


 On 25/05/2020 11:51, Jaswin Shah wrote:
correcting: 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the events from other stream.

I hope here my understanding is correct.
Please correct me if I am wrong here.


From: Jaswin Shah [hidden email]
Sent: 25 May 2020 15:19
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
By ctx.getCurrentKey()=> I meant to get the key registered at a timestamp when callback timeout for a key was registered.
This was a reason to use getCurrentKey().
So, with that I am fetching the events registered at that point of time for which till the current moment I didn't receive the callbacks.

I hope here my understanding is correct.
Please correct me if I am wrong here.



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 15:14
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

You are right that a ValueState can keep a single value at any point of time. It is scoped to the current key of the operator though. So it keeps a single value for a key.


If your cartMessage.createJoinStringCondition()/pgMessage.createJoinStringCondition()/new CartJoinColumnsSelector()/new PGJoinColumnsSelector() are basically the same thing a ValueState should be enough. It will always be scoped to the result of new CartJoinColumnsSelector()/new PGJoinColumnsSelector(). I assumed it is the same because you are always using the ctx.getCurrent in the onTimer method.


See this example [1]. There even though a ValueState is used, we calculate counts per key.



Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example



On 25/05/2020 11:17, Jaswin Shah wrote:
If I understand correctly, you are trying to tell that I should have valueState of Map?

From: Jaswin Shah
Sent: 25 May 2020 14:43
To: Dawid Wysakowicz [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]; [hidden email] [hidden email]
Subject: Re: Timeout Callbacks issue -Flink
 
Thanks for responding Dawid.
I would like to know more about MapState solution you talked about. As per my understanding valueState maintains a single value at any point of time. So, here what I want to maintain is the first streams information until matching event have not found in second stream. So, in that case how valueState could benefit me? Can you please explain me that, might be I have understood it incorrectly what you are trying to convey here.

Thanks,
Jaswin



From: Dawid Wysakowicz
Sent: Monday, May 25, 2020 14:23
To: Jaswin Shah; [hidden email]; [hidden email]; [hidden email]
Subject: Re: Timeout Callbacks issue -Flink

Hi Jaswin,

I can't see any obvious problems in your code. It looks rather correct. What exactly do you mean that "callback is coming earlier than registered callback timeout"? Could you explain that with some examples?


As for the different timezones. Flink does not make any assumptions on the timestamp. It uses it simply as longs. I'd suggest revisiting your timestamp extraction logic to make sure it performs the extraction correctly. I don't know how your data encodes the timestamps, but I think you have a bug or two there ;)


The "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" gives me an impression that this field has timestamps in UTC, but you are parsing it in your JVM local time zone. You treat the 'Z' as a literal (https://stackoverflow.com/questions/19112357/java-simpledateformatyyyy-mm-ddthhmmssz-gives-timezone-as-ist). I don't know what the other field represents but you are also parsing it in a local timezone. If the field represents local date it is probably correct.


To mitigate those problems I'd strongly recommend using the java.time API. For the extractCartTimeStamp you could use Instant.parse("...").toEpochMilli. It expects the format you are receiving. For the extractPGTimeStamp you could use LocalDateTime.parse("..."), by default it uses the format you are receiving. Then you should convert the local date time to an instant LocalDateTime.parse("...").atZone(/* the zone which this date represents */).toInstant().toEpochMilli(); This has nothing to do with Flink though ;)


BTW one Flink issue I can see is that I think you don't need to use a MapState there. Any kind of state in a KeyedCoProcessFunction is always scoped to the current key. Therefore if you only ever put items under the currentKey you will have at most single element in your map. Think of the MapState as a map of maps MapState<UserKey, Value> = Map<CurrentKey, Map<UserKey, Value>>. In your case a ValueState should be enough imo.


Best,

Dawid


On 23/05/2020 14:39, Jaswin Shah wrote:
++
Here, I am registering the callback time for an even with processing time and calculating the time value as events time + expiryTimeout value.

Can this be the issue here due to hybrid timings usage?
Also, do we need any special handling if we use event time semantics for callback timeouts registrations?

Thanks,
Jaswin

From: Jaswin Shah [hidden email]
Sent: 23 May 2020 17:18
To: [hidden email] [hidden email]; Arvid Heise [hidden email]; Yun Tang [hidden email]
Subject: Timeout Callbacks issue -Flink
 
Hi,
I am running flink job with following functionality:
  1. I consume stream1 and stream2 from two kafka topics and assign the watermarks to the events of two streams by extracting the timestamps from the events in streams.
  2. Then, I am connecting two streams and calling KeyedCoProcessFunction on connectedStream.
  3. I have processElement1 method and processElement2 methods which receive the events of two streams 1 and 2 and do the join logic as shown in below code snippet.
  4. I have shared mapstate for two streams.
  5. When an event comes to processElement method, I register the callback time for that message to ensure if corresponding matching message is not arrived from other stream, I will send the message to sideOutput on invocation of callback method i.e. onTimer.
Something is getting wrong in the callback times registrations for events due to which for many messages of stream2 the callback is coming earlier than registered callback timeout.
Also, the events from stream 2 are based on GMT times +5:30 as I can see in the timevalue in event message, for stream1 it;s normal TZ only. Though I am weak in analysing the timeout formats so could be wrong in analysis this side.

Below is code snippets I have implemented for KeyedCoProcessFunctions and timestamp calculations and watermarks registrations.
/**
 * CoProcessFuntion to process cart and pg messages connected using connect operator.
 * @author jaswin.shah
 * @version $Id: CartPGCoprocessFunction.java, v 0.1 2020-05-15 11:52 PM jaswin.shah Exp $$
 */
public class CartPGCoprocessFunction extends KeyedCoProcessFunction<String,CartMessage, PaymentNotifyRequestWrapper, ResultMessage> {

    private static final Logger logger = LoggerFactory.getLogger(CartPGCoprocessFunction.class);

    /**
     * Map state for cart messages, orderId+mid is key and cartMessage is value.
     */
    private static MapState<String, CartPG> cartPgState = null;

    /**
     * Intializations for cart and pg mapStates
     *
     * @param config
     */
    @Override
    public void open(Configuration config) {

        MapStateDescriptor<String, CartPG> cartPgMapStateDescriptor = new MapStateDescriptor<> (
            Constants.CART_DATA,
            TypeInformation.of(String.class),
            TypeInformation.of(CartPG.class)
        );
        cartPgState = getRuntimeContext().getMapState(cartPgMapStateDescriptor);
    }

    /**
     *
     * @return
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultMessage> out) throws Exception {
        logger.info("On timer called key is {}",ctx.getCurrentKey());
        String searchKey = ctx.getCurrentKey();
        CartPG  cartPg = cartPgState.get(searchKey);
        if(Objects.nonNull(cartPg)) {
            ctx.output(CartPGSideOutput.getOutputTag(), cartPgState.get(ctx.getCurrentKey()));
            cartPgState.remove(searchKey);
        }
    }

    /**
     * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from pgMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in cartMapState.
     * @param cartMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement1(CartMessage cartMessage, Context context, Collector<ResultMessage> collector) throws Exception {
        Long cartEventTimeStamp = context.timestamp();
        logger.info("cart time : {} ",cartEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(cartEventTimeStamp+ ConfigurationsManager.getMaxWaitTimeForPGMessage());

        String searchKey = cartMessage.createJoinStringCondition();

        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getPgMessage())) {
            generateResultMessage(cartMessage,cartPG.getPgMessage(),collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setCartMessage(cartMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry is present.
     * 2. If present, match, checkDescripancy, process and delete entry from cartMapState.
     * 3. If not present, add orderId+mid as key and cart object as value in pgMapState.
     * @param pgMessage
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context context, Collector<ResultMessage> collector) throws Exception {

        Long pgEventTimeStamp = context.timestamp();
        logger.info("pg time : {} ",pgEventTimeStamp);
        context.timerService().registerProcessingTimeTimer(pgEventTimeStamp+ConfigurationsManager.getMaxWaitTimeForCartMessage());

        String searchKey = pgMessage.createJoinStringCondition();
        CartPG cartPG = cartPgState.get(searchKey);

        if(Objects.nonNull(cartPG) && Objects.nonNull(cartPG.getCartMessage())) {
            generateResultMessage(cartPG.getCartMessage(),pgMessage,collector);
            cartPgState.remove(searchKey);
        } else {
            cartPG = new CartPG();
            cartPG.setPgMessage(pgMessage);
            cartPgState.put(searchKey,cartPG);
        }
    }

    /**
     * Create ResultMessage from cart and pg messages.
     *
     * @param cartMessage
     * @param pgMessage
     * @return
     */
    private void generateResultMessage(CartMessage cartMessage, PaymentNotifyRequestWrapper pgMessage,Collector<ResultMessage> collector) {
        ResultMessage resultMessage = new ResultMessage();
        Payment payment = null;

        //Logic should be in cart: check
        for (Payment pay : cartMessage.getPayments()) {
            if (StringUtils.equals(Constants.FORWARD_PAYMENT, pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
                payment = pay;
                break;
            }
        }
        if(Objects.isNull(payment)) {
            return;
        }

        resultMessage.setOrderId(cartMessage.getId());
        resultMessage.setMid(payment.getMid());

        resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
        resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());

        resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
        resultMessage.setPgOrderCompletionTime(pgMessage.getCreatedTime());

        resultMessage.setPgOrderAmount(pgMessage.getOrderAmount().getValue());
        resultMessage.setCartOrderAmount(String.valueOf(Math.round(cartMessage.getGrandtotal())));

        resultMessage.setCartPaymethod(payment.getPayment_method());
        resultMessage.setPgPaymethod(pgMessage.getPaymentView().getPayOptionInfos()[0].getPayMethod());

        checkDescripancyAndCollectResult(resultMessage,collector);
    }

    /**
     * Evaluate if there is descripancy of any fields between the messages from two different systems.
     * Write all the descripancy logic here.
     *
     * @param resultMessage
     */
    private void checkDescripancyAndCollectResult(ResultMessage resultMessage, Collector<ResultMessage> collector) {

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartOrderStatus(), resultMessage.getPgOrderStatus())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.ORDER_STATUS_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!resultMessage.getCartOrderAmount().equals(resultMessage.getPgOrderAmount())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_AMOUNT_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }

        if (!StringUtils.equalsIgnoreCase(resultMessage.getCartPaymethod(), resultMessage.getPgPaymethod())) {
            resultMessage.setDescripancyType(DescripancyTypeEnum.PAY_METHOD_DESCRIPANCY);
            collector.collect(resultMessage.clone());
        }
    }
}

/**
 * Connect to cart and pg streams and process
 *
 * @param cartStream
 * @param pgStream
 * @return
 */
private SingleOutputStreamOperator<ResultMessage> connectCartPGStreamsAndProcess(SingleOutputStreamOperator<CartMessage> cartStream, SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream) {
    return cartStream.connect(pgStream).keyBy(new CartJoinColumnsSelector(),new PGJoinColumnsSelector())
        .process(new CartPGCoprocessFunction());
}


private final static SimpleDateFormat cartInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final static SimpleDateFormat pgInputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

public static Long extractCartTimeStamp(CartMessage cartMessage){
    try {
        Date orderTimeStamp = cartInputFormat.parse(cartMessage.fetchOrderCompletionTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting cart message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}

public static Long extractPGTimeStamp(PaymentNotifyRequestWrapper pgMessage){
    try {
        Date orderTimeStamp = pgInputFormat.parse(pgMessage.getPaymentView().getPaidTime());
        return orderTimeStamp.getTime();
    } catch (ParseException e) {
        logger.error("Exception in converting pg message timeStamp..",e);
    }
    return Instant.now().toEpochMilli();
}


private SingleOutputStreamOperator<CartMessage> processCartStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {
    //1. Consume cartStream
    SingleOutputStreamOperator<CartMessage> cartStream = executionEnvironment.addSource(createCartConsumer());
    cartStream.name(Constants.CART_SYSTEM);

    //2. Filter cart messages
    SingleOutputStreamOperator<CartMessage> filteredCartStream = cartStream.filter(new CartFilterFunction())    ;

    //3. Map carts data
    filteredCartStream = CartMappingService.mapCartsData(filteredCartStream);

    //4. Assign timestamps and watermarks
    filteredCartStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CartMessage>(Time.milliseconds(Long.parseLong(parameter.get(Constants.CART_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(CartMessage cartMessage) {
            return DateTimeUtils.extractCartTimeStamp(cartMessage);
        }
    });
    return filteredCartStream;
}

private SingleOutputStreamOperator<PaymentNotifyRequestWrapper> processPgStream(ParameterTool parameter, StreamExecutionEnvironment executionEnvironment) {

    //1. Consume pg streams
    SingleOutputStreamOperator<PaymentNotifyRequestWrapper> pgStream = executionEnvironment.addSource(createPGConsumer());

    pgStream.name(Constants.PG_SYSTEM);

    //2. Assign timestamps and watermarks to pg messages
    pgStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PaymentNotifyRequestWrapper>(Time.milliseconds(Long.parseLong(parameter.get(Constants.PG_MAX_OUT_OF_ORDERNESS)))) {
        @Override
        public long extractTimestamp(PaymentNotifyRequestWrapper pgMessage) {
            return DateTimeUtils.extractPGTimeStamp(pgMessage);
        }
    });
    return pgStream;
}

Can anyone please help what can be the issue here and if there is somewrong time values handled in the code here.

Help will be highly appreciated.