/** * */ package com.xyz.topology.netflow.beam; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Properties; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.beam.runners.flink.FlinkPipelineRunner; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import com.xyz.schemas.Test; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * @author kaniska * */ public class BeamKafkaFlinkAvroProducerTest { private static final String TOPIC = "topic3"; private static BeamKafkaOptions options; private static Properties props = new Properties(); public static void setup(String[] args) { PipelineOptionsFactory.register(BeamKafkaOptions.class); options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class); options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); // options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection()); // options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList()); options.setKafkaTopic(TOPIC); options.setStreaming(true); options.setCheckpointingInterval(1000L); options.setNumberOfExecutionRetries(5); options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " " + options.getGroup()); props.setProperty("zookeeper.connect", options.getZookeeper()); props.setProperty("bootstrap.servers", options.getBroker()); props.setProperty("group.id", options.getGroup()); props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); } private void produceData1(){ FlinkKafkaProducer08 kafkaSink = new FlinkKafkaProducer08<>(TOPIC, new AvroSerializationSchema(User.class), props); Pipeline pipeline = Pipeline.create(options); pipeline .apply(Create.of( new User("Joe", 3, "red"), new User("Mary", 4, "blue"), new User("Mark", 1, "green"), new User("Julia", 5, "purple"))) //.withCoder(AvroCoder.of(User.class))) .apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); pipeline.run(); } private static void produceAvroData2(){ TypeInformation info = TypeExtractor.getForClass(Test.class); TypeInformationSerializationSchema schema =new TypeInformationSerializationSchema(info, new ExecutionConfig()); // AvroSerializationSchema schema = new AvroSerializationSchema(Test.class); FlinkKafkaProducer08 kafkaSink = new FlinkKafkaProducer08<>(TOPIC, schema, props); Pipeline pipeline = Pipeline.create(options); pipeline .apply(Create.of( new Test("Joe", 6)) .withCoder(AvroCoder.of(Test.class))). apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); pipeline.run(); } private static void produceSimpleData() throws IOException{ Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("serializer.class", "kafka.serializer.DefaultEncoder"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer kafkaProducer = new Producer(config); Test test = new Test("Don", 32); // serializing in avro format //DatumWriter datumWriter = new SpecificDatumWriter(NetFlowPkt // .class); DatumWriter datumWriter = new SpecificDatumWriter(Test .class); ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); datumWriter.write(test, encoder); encoder.flush(); byte[] serializedBytes = out.toByteArray(); out.close(); KeyedMessage message = new KeyedMessage("topic3", serializedBytes); kafkaProducer.send(message); kafkaProducer.close(); } public static void main(String args[]){ setup(args); try { //produceSimpleData(); produceAvroData2(); } catch (Exception e) { e.printStackTrace(); } } /** public void testDeSerialization() { try { TypeInformation info = TypeExtractor.getForClass(User.class); TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig()); User[] types = { new User(72, new Date(763784523L), new Date(88234L)), new User(-1, new Date(11111111111111L)), new User(42), new User(17, new Date(222763784523L)) }; for (User val : types) { byte[] serialized = schema.serialize(val); User deser = schema.deserialize(serialized); assertEquals(val, deser); } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } **/ private static class AvroSerializationSchema implements SerializationSchema { private final Class avroType; private transient GenericDatumWriter writer; private transient BinaryEncoder encoder; private T obj; public AvroSerializationSchema(Class avroType) { this.avroType = avroType; } @Override public byte[] serialize(Object elem) { obj = (T)elem; ensureInitialized(); // TODO Auto-generated method stub //return SerializationUtils.serialize((Serializable) obj); ByteArrayOutputStream out = new ByteArrayOutputStream(); encoder = EncoderFactory.get().binaryEncoder(out, null); byte[] serializedBytes = null; try { writer.write(obj, encoder); encoder.flush(); serializedBytes = out.toByteArray(); out.close(); } catch (IOException e) { e.printStackTrace(); } return serializedBytes; } private void ensureInitialized() { if (writer == null) { if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { writer = new SpecificDatumWriter(avroType); //if(obj instanceof GenericRecord) { // writer = new GenericDatumWriter(((GenericRecord)obj).getSchema()); //}else { // writer = new SpecificDatumWriter(avroType); //} } else { writer = new ReflectDatumWriter(avroType); } } } } }