public class ElasticSearchTest1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set elasticsearch connection details Map config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", ""); List transports = new ArrayList<>(); transports.add(new InetSocketAddress(InetAddress.getByName(""), 9300)); //Set properties for Kafka Streaming Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ""+":9092"); properties.setProperty("group.id", "testGroup"); properties.setProperty("auto.offset.reset", "latest"); //Create consumer for log records FlinkKafkaConsumer011 inputConsumer1 = new FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), properties); DataStream firstStream = env .addSource(inputConsumer1) .flatMap(new CreateRecordOne()); firstStream .addSink(new ElasticsearchSink(config, transports, new ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1"))); FlinkKafkaConsumer011 inputConsumer2 = new FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), properties); DataStream secondStream = env .addSource(inputConsumer2) .flatMap(new CreateRecordTwo()); secondStream .addSink(new ElasticsearchSink(config, transports, new ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2"))); env.execute("Elastic Search Test"); } } public class ElasticSearchOutputRecord implements ElasticsearchSinkFunction { String index; String type; // Initialize filter function public ElasticSearchOutputRecord(String index, String type) { this.index = index; this.type = type; } // construct index request @Override public void process( RecordOne record, RuntimeContext ctx, RequestIndexer indexer) { // construct JSON document to index Map json = new HashMap<>(); json.put("item_one", record.item1); json.put("item_two", record.item2); IndexRequest rqst = Requests.indexRequest() .index(index) // index name .type(type) // mapping name .source(json); indexer.add(rqst); } } public class ElasticSearchOutputRecord2 implements ElasticsearchSinkFunction { String index; String type; // Initialize filter function public ElasticSearchOutputRecord2(String index, String type) { this.index = index; this.type = type; } // construct index request @Override public void process( RecordTwo record, RuntimeContext ctx, RequestIndexer indexer) { // construct JSON document to index Map json = new HashMap<>(); json.put("item_three", record.item3); json.put("item_four", record.item4); IndexRequest rqst = Requests.indexRequest() .index(index) // index name .type(type) // mapping name .source(json); indexer.add(rqst); } } public class CreateRecordOne implements FlatMapFunction { static final Logger log = LoggerFactory.getLogger(CreateRecordOne.class); @Override public void flatMap(ObjectNode value, Collector out) throws Exception { try { out.collect(new RecordOne(value.get("item1").asText(),value.get("item2").asText())); } catch(Exception e) { log.error("error while creating RecordOne", e); } } } public class CreateRecordTwo implements FlatMapFunction { static final Logger log = LoggerFactory.getLogger(CreateRecordTwo.class); @Override public void flatMap(ObjectNode value, Collector out) throws Exception { try { out.collect(new RecordTwo(value.get("item1").asText(),value.get("item2").asText())); } catch(Exception e) { log.error("error while creating RecordTwo", e); } } } public class RecordOne { public String item1; public String item2; public RecordOne() {}; public RecordOne ( String item1, String item2 ) { this.item1 = item1; this.item2 = item2; } } public class RecordTwo { public String item3; public String item4; public RecordTwo() {}; public RecordTwo ( String item3, String item4 ) { this.item3 = item3; this.item4 = item4; } }