Unexpected Results from Map / Filter Operators

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Unexpected Results from Map / Filter Operators

Coder88
Hi All,

The Flow am trying to process here is Kafka -> Flink -> Kafka

Have built a custom POJO called CustomerEvent and have taken care of serialize and de-serialize schemas

Operators : Map operator to read the input followed by filtering the record followed by flatmap to add more attributes

When i execute the program , I observe the following :

1. The mapping is not happening correctly as seen from the output below
*****Code Snip*****

public CustomerEvent
          map(CustomerEvent input) throws Exception {
          CustomerEvent test = new CustomerEvent();
          System.out.println("******accountNumber:" +input.accountNumber);
          System.out.println("******reason:" +input.reason);
          System.out.println("******reasonGroup:" +input.reasonGroup);
          test.accountNumber = input.accountNumber;
          test.siteID = input.siteID;
                test.careAgentInfo = input.careAgentInfo;
                test.reason = input.reason;
                test.reasonGroup =input.reasonGroup;
                test.reasonDetail = input.reasonDetail;
                test.actionTaken = input.actionTaken;
                test.system = input.system;
                test.startTime = input.startTime;
                test.endTime = input.endTime;
            return test;
          }
@@@@Out from the console@@@@

******accountNumber:{"accountNumber":"124568"
******reason:"actionTaken":"Educate"
******reasonGroup:"system":"Cognizant NA"
***Filtering*** // It just enters the Filter operator, but no result even it its valid value (DVR)

2. The Filtering operation is not executed and hence the flatmap as well.

******Code Snip*******

final String[] config = new String[] { "Educate", "Education", "Customer Ed", "Cust Ed",
                                                        "No Connectivity", "Partial Connectivity", "Slow Speeds", "Device Install", "DVR" };

                                        @Override
                                        public boolean filter(CustomerEvent value) throws Exception {
                                                System.out.println("***Filtering***");
                                                if (Arrays.asList(config).contains(value.reason)) {
                                                        System.out.println("***reason field found***");
                                                        return true;
                                                }
                                                return false;
                                        }

Call to operators

DataStream<CustomerEvent> eoi =
                        cstream.map(new ExtractString())

                        .filter(new FilterFunction<CustomerEvent>()) ...

So am not able to output the events to Kafka. Can somebody pls guide me on what I am donig wrong .

Thanks for the help in advance

Thanks
Hari