Troubles with Avro migration from 1.7 to 1.10

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Troubles with Avro migration from 1.7 to 1.10

Alan Żur

 

 

Hi,

 

I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, however I’ve encountered problem with Avro writing to hdfs. Currently we’re using Bucketing sink – which is deprecated. I’ve managed to replace few Bucketing sinks with StreamingFileSink with row format. However I don’t have any idea how to tackle Avro and Writer<> implementation.

 

@Override
protected void applySink(DataStream<Feature> outputStream) {
    outputStream
            .keyBy(Feature::getSessionId)
            .addSink(createSink())
            .uid(UID_PART.concat("sink-v1"))
            .name(UID_PART.concat("hdfs_bucketing_sink"));
}

private SinkFunction<GenericRecord> createSFSink() {
    return StreamingFileSink
            .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
                    ParquetAvroWriters.forGenericRecord(new ComboFeatureAvroWriter().createSchema()))
            .build();
}

private BucketingSink<Feature> createSink() {
    return new BucketingSink<Feature>(hdfsPath)
            .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", ZoneOffset.UTC))
            .setBatchSize(batchSize)
            .setBatchRolloverInterval(batchRollingInterval)
            .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
            .setInactiveBucketThreshold(inactiveBucketThreshold)
            .setUseTruncate(useTruncate)
            .setWriter(new ComboFeatureAvroWriter());
}

Above function createSFSink() I took from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to write empty GenericRecord map just to get rid of compilation error – failed (still giving improper type error). I’ve also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed

Reply | Threaded
Open this post in threaded view
|

Re: Troubles with Avro migration from 1.7 to 1.10

Kostas Kloudas-2
Hi Alan,

In the upcoming Flink 1.11 release, there will be support for Avro
using the AvroWriterFactory as seen in [1].
Do you think that this can solve your problem?

You can also download the current release-1.11 branch and also test it
out to see if it fits your needs.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-11395

On Tue, Jun 9, 2020 at 4:23 PM Alan Żur <[hidden email]> wrote:

>
>
>
>
>
> Hi,
>
>
>
> I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, however I’ve encountered problem with Avro writing to hdfs. Currently we’re using Bucketing sink – which is deprecated. I’ve managed to replace few Bucketing sinks with StreamingFileSink with row format. However I don’t have any idea how to tackle Avro and Writer<> implementation.
>
>
>
> @Override
> protected void applySink(DataStream<Feature> outputStream) {
>     outputStream
>             .keyBy(Feature::getSessionId)
>             .addSink(createSink())
>             .uid(UID_PART.concat("sink-v1"))
>             .name(UID_PART.concat("hdfs_bucketing_sink"));
> }
>
> private SinkFunction<GenericRecord> createSFSink() {
>     return StreamingFileSink
>             .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
>                     ParquetAvroWriters.forGenericRecord(new ComboFeatureAvroWriter().createSchema()))
>             .build();
> }
>
> private BucketingSink<Feature> createSink() {
>     return new BucketingSink<Feature>(hdfsPath)
>             .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", ZoneOffset.UTC))
>             .setBatchSize(batchSize)
>             .setBatchRolloverInterval(batchRollingInterval)
>             .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
>             .setInactiveBucketThreshold(inactiveBucketThreshold)
>             .setUseTruncate(useTruncate)
>             .setWriter(new ComboFeatureAvroWriter());
> }
>
> Above function createSFSink() I took from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to write empty GenericRecord map just to get rid of compilation error – failed (still giving improper type error). I’ve also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed
Reply | Threaded
Open this post in threaded view
|

Re: Troubles with Avro migration from 1.7 to 1.10

Alan Żur
Hi Kostas,

is this release available in maven central or should I download project from
github?

Thanks,
Alan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Troubles with Avro migration from 1.7 to 1.10

Kostas Kloudas-2
Hi Alan,

Unfortunately not but the release is expected to come out in the next
couple of weeks, so then it will be available.
Until then, you can either copy the code of the AvroWriterFactory to
your project and use it from there, or download the project from
github, as you said.

Cheers,
Kostas

On Wed, Jun 10, 2020 at 9:24 AM Alan Żur <[hidden email]> wrote:

>
> Hi Kostas,
>
> is this release available in maven central or should I download project from
> github?
>
> Thanks,
> Alan
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Troubles with Avro migration from 1.7 to 1.10

Alan Żur
Hi Kostas,

I'll try it by copying this class to my project for now, and wait for
release. I'm not expecting to finish my migration by then ;) Have a nice day
and thanks for updade - I'll keep this thread opened in case I encounter any
problems. Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/