Flink DataStream[String] kafkacosumer avro streaming file sink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink DataStream[String] kafkacosumer avro streaming file sink

Vijayendra Yadav
Hi Flink Team,

FLINK Streaming: I have DataStream[String] from kafkaconsumer

DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

I have to sink this string stream using StreamingFileSink, which needs DataStream[GenericRecord]

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?



Regards,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: Flink DataStream[String] kafkacosumer avro streaming file sink

rmetzger0
Thank you for your question. I responded on StackOverflow.
Let's finish the discussion there.

On Fri, Jul 24, 2020 at 5:07 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Flink Team,

FLINK Streaming: I have DataStream[String] from kafkaconsumer

DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

I have to sink this string stream using StreamingFileSink, which needs DataStream[GenericRecord]

val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

Question: How to convert DataStream[String] to DataStream[GenericRecord] before Sinking so that I can write AVRO files ?



Regards,
Vijay