[Compression] Flink DataStream[GenericRecord] Avrowriters

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[Compression] Flink DataStream[GenericRecord] Avrowriters

Vijayendra Yadav
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Ravi Bhushan Ratnakar
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay

CustomAvroWriters.java (5K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Ravi Bhushan Ratnakar
There is another alternative which you could try like this
val stream:DataStream[GenericRecord] = _
val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
aof.setSchema(schema)
aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
stream:DataStream.writeUsingOutputFormat(aof)
Regards,
Ravi


On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Vijayendra Yadav
Hi Ravi,

Thanks for details. CustomAvrowriter was working for now.  Although its failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
I think I will have to try running it in an EMR/Hadoop environment to get the SNAPPY library resolved.

About this another approach of AvroOutputFormat.

Does it fit in streamingfilesink API?

StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
        .forGenericRecord(schema, codecName))

Or its different api. Could you send one sample if you have one for another sink api.

Regards,
Vijay

On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
There is another alternative which you could try like this
val stream:DataStream[GenericRecord] = _
val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
aof.setSchema(schema)
aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
stream:DataStream.writeUsingOutputFormat(aof)
Regards,
Ravi


On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Vijayendra Yadav
Hi Ravi,

With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create files, but files are not recognized as avro files by avro tools jar, when I try to deserialize it to json.

Flink Logs shows:
2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - Successfully loaded & initialized native-lzo library [hadoop-lzo rev ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.snappy]

020-07-29 23:54:28,931 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro with MPU ID

Avro tools:

java -jar avro-tools-1.7.4.jar tojson /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
Exception in thread "main" java.io.IOException: Not an Avro data file


Am I missing something ?

Regards,
Vijay



On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Ravi,

Thanks for details. CustomAvrowriter was working for now.  Although its failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
I think I will have to try running it in an EMR/Hadoop environment to get the SNAPPY library resolved.

About this another approach of AvroOutputFormat.

Does it fit in streamingfilesink API?

StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
        .forGenericRecord(schema, codecName))

Or its different api. Could you send one sample if you have one for another sink api.

Regards,
Vijay

On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
There is another alternative which you could try like this
val stream:DataStream[GenericRecord] = _
val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
aof.setSchema(schema)
aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
stream:DataStream.writeUsingOutputFormat(aof)
Regards,
Ravi


On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Ravi Bhushan Ratnakar
Hi Vijayendra,

There is an issue with the CustomeAvroWriters.java which i shared with you earlier, i am sending you the fixed version, hope this will resolve the issue of reading it from the avro tool.

Please use below supported possible string value for codecName

null - for nullCodec
deflate - for deflateCodec
snappy - for snappyCodec
bzip2 - for bzip2Codec
xz - for xzCodec

Regards,
Ravi

On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
If it is possible, please share the sample output file.
Regards,
Ravi

On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Ravi,

With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create files, but files are not recognized as avro files by avro tools jar, when I try to deserialize it to json.

Flink Logs shows:
2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - Successfully loaded & initialized native-lzo library [hadoop-lzo rev ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.snappy]

020-07-29 23:54:28,931 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro with MPU ID

Avro tools:

java -jar avro-tools-1.7.4.jar tojson /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
Exception in thread "main" java.io.IOException: Not an Avro data file


Am I missing something ?

Regards,
Vijay



On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Ravi,

Thanks for details. CustomAvrowriter was working for now.  Although its failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
I think I will have to try running it in an EMR/Hadoop environment to get the SNAPPY library resolved.

About this another approach of AvroOutputFormat.

Does it fit in streamingfilesink API?

StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
        .forGenericRecord(schema, codecName))

Or its different api. Could you send one sample if you have one for another sink api.

Regards,
Vijay

On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
There is another alternative which you could try like this
val stream:DataStream[GenericRecord] = _
val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
aof.setSchema(schema)
aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
stream:DataStream.writeUsingOutputFormat(aof)
Regards,
Ravi


On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay

CustomAvroWriters.java (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

Vijayendra Yadav
Hi Ravi,

Perfect, This is looking good.
Thanks for your help.

Regards,
Vijay

On Thu, Jul 30, 2020 at 5:39 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

There is an issue with the CustomeAvroWriters.java which i shared with you earlier, i am sending you the fixed version, hope this will resolve the issue of reading it from the avro tool.

Please use below supported possible string value for codecName

null - for nullCodec
deflate - for deflateCodec
snappy - for snappyCodec
bzip2 - for bzip2Codec
xz - for xzCodec

Regards,
Ravi

On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <[hidden email]> wrote:
If it is possible, please share the sample output file.
Regards,
Ravi

On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <[hidden email]> wrote:
Hi Ravi,

With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create files, but files are not recognized as avro files by avro tools jar, when I try to deserialize it to json.

Flink Logs shows:
2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - Successfully loaded & initialized native-lzo library [hadoop-lzo rev ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - Got brand-new compressor [.snappy]

020-07-29 23:54:28,931 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro with MPU ID

Avro tools:

java -jar avro-tools-1.7.4.jar tojson /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
Exception in thread "main" java.io.IOException: Not an Avro data file


Am I missing something ?

Regards,
Vijay



On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Ravi,

Thanks for details. CustomAvrowriter was working for now.  Although its failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
I think I will have to try running it in an EMR/Hadoop environment to get the SNAPPY library resolved.

About this another approach of AvroOutputFormat.

Does it fit in streamingfilesink API?

StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
        .forGenericRecord(schema, codecName))

Or its different api. Could you send one sample if you have one for another sink api.

Regards,
Vijay

On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
There is another alternative which you could try like this
val stream:DataStream[GenericRecord] = _
val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
aof.setSchema(schema)
aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
stream:DataStream.writeUsingOutputFormat(aof)
Regards,
Ravi


On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <[hidden email]> wrote:
Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[hidden email]> wrote:
Hi Team,

Could you please provide a sample for Enabling Compression (Snappy) of Avro:
DataStream[GenericRecord]

AvroWriters.forGenericRecord(schema)

Regards,
Vijay