/** * */ package com.xyz.topology.netflow.beam; import java.util.Properties; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.beam.runners.flink.FlinkPipelineRunner; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.joda.time.Duration; import com.xyz.schemas.Test; public class BeamKafkaFlinkAvroConsumerTest { private static final String TOPIC = "topic3"; private static BeamKafkaOptions options; private static Properties props = new Properties(); public static void setup(String[] args) { PipelineOptionsFactory.register(BeamKafkaOptions.class); options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class); options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); // options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection()); // options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList()); options.setKafkaTopic(TOPIC); options.setStreaming(true); options.setCheckpointingInterval(1000L); options.setNumberOfExecutionRetries(5); options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " " + options.getGroup()); props.setProperty("zookeeper.connect", options.getZookeeper()); props.setProperty("bootstrap.servers", options.getBroker()); props.setProperty("group.id", options.getGroup()); } public static UnboundedSource consumeMessages() { // AvroDeserializationSchema schema = new // AvroDeserializationSchema(Test.class); TypeInformation info = TypeExtractor.getForClass(Test.class); TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig()); FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC, schema, props); return UnboundedFlinkSource.of(kafkaConsumer); } public static void main(String args[]) { setup(args); Pipeline pipeline = Pipeline.create(options); PCollection users = pipeline.apply(Read.named("StreamingWordCount").from(consumeMessages())) .apply(Window. into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); // users.apply(ConsoleIO.Write.create()); PCollection counts = users.apply(Count.globally()); // .apply(ConsoleIO.Write.create()); // .apply(TextIO.Write.to("outputKafka.txt")); System.out.println("***************** " + counts); PipelineResult result = pipeline.run(); System.out.println("***************** " + result.toString()); } public static class FormatAsStringFn extends DoFn { @Override public void processElement(ProcessContext c) { CharSequence row = c.element().getUname(); System.out.println("$$$$$$$$$$$$$$$$"); System.out.println(row); c.output(row.toString()); } } } class AvroDeserializationSchema implements DeserializationSchema { private final Class avroType; private transient DatumReader reader; private transient BinaryDecoder decoder; public AvroDeserializationSchema(Class avroType) { this.avroType = avroType; } @Override public T deserialize(byte[] message) { ensureInitialized(); try { decoder = DecoderFactory.get().binaryDecoder(message, decoder); return reader.read(null, decoder); } catch (Exception e) { throw new RuntimeException(e); } } @Override public boolean isEndOfStream(T nextElement) { return false; } @Override public TypeInformation getProducedType() { return TypeExtractor.getForClass(avroType); } private void ensureInitialized() { if (reader == null) { if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { reader = new SpecificDatumReader(avroType); } else { reader = new ReflectDatumReader(avroType); } } } }