FLINK Streaming: I have DataStream[String] from kafkaconsumer
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
I have to sink this string stream using StreamingFileSink, which needs DataStream[GenericRecord]
val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
.build()
input.addSink(sink)
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html
Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?