Hi, I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, however I’ve encountered problem with Avro writing to hdfs. Currently we’re using Bucketing sink – which is deprecated. I’ve managed to replace few Bucketing sinks
with StreamingFileSink with row format. However I don’t have any idea how to tackle Avro and Writer<> implementation.
@Override Above function createSFSink() I took from
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to write empty GenericRecord map just to get rid of compilation error – failed (still giving
improper type error). I’ve also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed |
Hi Alan,
In the upcoming Flink 1.11 release, there will be support for Avro using the AvroWriterFactory as seen in [1]. Do you think that this can solve your problem? You can also download the current release-1.11 branch and also test it out to see if it fits your needs. Cheers, Kostas [1] https://issues.apache.org/jira/browse/FLINK-11395 On Tue, Jun 9, 2020 at 4:23 PM Alan Żur <[hidden email]> wrote: > > > > > > Hi, > > > > I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, however I’ve encountered problem with Avro writing to hdfs. Currently we’re using Bucketing sink – which is deprecated. I’ve managed to replace few Bucketing sinks with StreamingFileSink with row format. However I don’t have any idea how to tackle Avro and Writer<> implementation. > > > > @Override > protected void applySink(DataStream<Feature> outputStream) { > outputStream > .keyBy(Feature::getSessionId) > .addSink(createSink()) > .uid(UID_PART.concat("sink-v1")) > .name(UID_PART.concat("hdfs_bucketing_sink")); > } > > private SinkFunction<GenericRecord> createSFSink() { > return StreamingFileSink > .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)), > ParquetAvroWriters.forGenericRecord(new ComboFeatureAvroWriter().createSchema())) > .build(); > } > > private BucketingSink<Feature> createSink() { > return new BucketingSink<Feature>(hdfsPath) > .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", ZoneOffset.UTC)) > .setBatchSize(batchSize) > .setBatchRolloverInterval(batchRollingInterval) > .setInactiveBucketCheckInterval(checkInactiveBucketInterval) > .setInactiveBucketThreshold(inactiveBucketThreshold) > .setUseTruncate(useTruncate) > .setWriter(new ComboFeatureAvroWriter()); > } > > Above function createSFSink() I took from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to write empty GenericRecord map just to get rid of compilation error – failed (still giving improper type error). I’ve also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed |
Hi Kostas,
is this release available in maven central or should I download project from github? Thanks, Alan -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Alan,
Unfortunately not but the release is expected to come out in the next couple of weeks, so then it will be available. Until then, you can either copy the code of the AvroWriterFactory to your project and use it from there, or download the project from github, as you said. Cheers, Kostas On Wed, Jun 10, 2020 at 9:24 AM Alan Żur <[hidden email]> wrote: > > Hi Kostas, > > is this release available in maven central or should I download project from > github? > > Thanks, > Alan > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kostas,
I'll try it by copying this class to my project for now, and wait for release. I'm not expecting to finish my migration by then ;) Have a nice day and thanks for updade - I'll keep this thread opened in case I encounter any problems. Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |