Hi All,
The Flow am trying to process here is Kafka -> Flink -> Kafka Have built a custom POJO called CustomerEvent and have taken care of serialize and de-serialize schemas Operators : Map operator to read the input followed by filtering the record followed by flatmap to add more attributes When i execute the program , I observe the following : 1. The mapping is not happening correctly as seen from the output below *****Code Snip***** public CustomerEvent map(CustomerEvent input) throws Exception { CustomerEvent test = new CustomerEvent(); System.out.println("******accountNumber:" +input.accountNumber); System.out.println("******reason:" +input.reason); System.out.println("******reasonGroup:" +input.reasonGroup); test.accountNumber = input.accountNumber; test.siteID = input.siteID; test.careAgentInfo = input.careAgentInfo; test.reason = input.reason; test.reasonGroup =input.reasonGroup; test.reasonDetail = input.reasonDetail; test.actionTaken = input.actionTaken; test.system = input.system; test.startTime = input.startTime; test.endTime = input.endTime; return test; } @@@@Out from the console@@@@ ******accountNumber:{"accountNumber":"124568" ******reason:"actionTaken":"Educate" ******reasonGroup:"system":"Cognizant NA" ***Filtering*** // It just enters the Filter operator, but no result even it its valid value (DVR) 2. The Filtering operation is not executed and hence the flatmap as well. ******Code Snip******* final String[] config = new String[] { "Educate", "Education", "Customer Ed", "Cust Ed", "No Connectivity", "Partial Connectivity", "Slow Speeds", "Device Install", "DVR" }; @Override public boolean filter(CustomerEvent value) throws Exception { System.out.println("***Filtering***"); if (Arrays.asList(config).contains(value.reason)) { System.out.println("***reason field found***"); return true; } return false; } Call to operators DataStream<CustomerEvent> eoi = cstream.map(new ExtractString()) .filter(new FilterFunction<CustomerEvent>()) ... So am not able to output the events to Kafka. Can somebody pls guide me on what I am donig wrong . Thanks for the help in advance Thanks Hari |
Free forum by Nabble | Edit this page |