Streaming Protobuf into Parquet file not working with StreamingFileSink

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming Protobuf into Parquet file not working with StreamingFileSink

Rafi Aroch
Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Piotr Nowojski-3
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Kostas Kloudas-2
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Rafi Aroch
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Kostas Kloudas-2
Hi Rafi, 

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Rafi Aroch
Hi Kostas, 

Yes I have. 

Rafi 

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <[hidden email]> wrote:
Hi Rafi, 

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Rafi Aroch
Hi Kostas,

Thank you.
I'm currently testing my job against a small file, so it's finishing before the checkpointing starts.
But also if it was a larger file and checkpoint did happen, there would always be the tailing events starting after the last checkpoint until the source has finished.
So would these events be lost?
In this case, any flow which is  (bounded stream) => (StreamingFileSink) would not give the expected results...

The other alternative would be using BucketingSink, but it would not guaranty exactly-once into S3 which is not preferable. 

Can you suggest any workaround? Somehow making sure checkpointing is triggered at the end?

Rafi


On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <[hidden email]> wrote:
Sorry Rafi,

I just read your previous response where you say that you have already activated checkpointing.
My bad for not paying attention.

Unfortunately, currently in-progress files only roll (or get finalized) on checkpoint barriers and NOT when calling close(). 
This is due to the fact that at the function level, Flink does not differentiate between failures and normal termination.
But there are plans to fix it: https://issues.apache.org/jira/browse/FLINK-2646

So given the above, you should check if checkpoints go through your pipeline or not before your source 
stream reaches its end. If there are no checkpoints, then your in-progress files will not be finalized and 
Parquet, for example, will not write the footer that is needed to be able to properly read the file.

Kostas


On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <[hidden email]> wrote:
Hi Kostas, 

Yes I have. 

Rafi 

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <[hidden email]> wrote:
Hi Rafi, 

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi



--

Kostas Kloudas | Software Engineer



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Piotr Nowojski-3
Hi Rafi,

There is also an ongoing effort to support bounded streams in DataStream API [1], which might provide the backbone for the functionalists that you need. 

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11875

On 25 Mar 2019, at 10:00, Kostas Kloudas <[hidden email]> wrote:

Hi Rafi,

Although not the most elegant, but one solution could be to write your program using the file 
source in PROCESS_CONTINUOUSLY mode, as described here 
and when you are sure that the processing of your file is done, then you cancel the job.

As I said, this is not the most elegant solution but it will do the job.

Cheers,
Kostas



On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <[hidden email]> wrote:
Hi Kostas,

Thank you.
I'm currently testing my job against a small file, so it's finishing before the checkpointing starts.
But also if it was a larger file and checkpoint did happen, there would always be the tailing events starting after the last checkpoint until the source has finished.
So would these events be lost?
In this case, any flow which is  (bounded stream) => (StreamingFileSink) would not give the expected results...

The other alternative would be using BucketingSink, but it would not guaranty exactly-once into S3 which is not preferable. 

Can you suggest any workaround? Somehow making sure checkpointing is triggered at the end?

Rafi


On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <[hidden email]> wrote:
Sorry Rafi,

I just read your previous response where you say that you have already activated checkpointing.
My bad for not paying attention.

Unfortunately, currently in-progress files only roll (or get finalized) on checkpoint barriers and NOT when calling close(). 
This is due to the fact that at the function level, Flink does not differentiate between failures and normal termination.
But there are plans to fix it: https://issues.apache.org/jira/browse/FLINK-2646

So given the above, you should check if checkpoints go through your pipeline or not before your source 
stream reaches its end. If there are no checkpoints, then your in-progress files will not be finalized and 
Parquet, for example, will not write the footer that is needed to be able to properly read the file.

Kostas


On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <[hidden email]> wrote:
Hi Kostas, 

Yes I have. 

Rafi 

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <[hidden email]> wrote:
Hi Rafi, 

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi



--
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Reply | Threaded
Open this post in threaded view
|

Re: Streaming Protobuf into Parquet file not working with StreamingFileSink

Rafi Aroch
Thanks Piotr & Kostas.

Really looking forward to this :)

Rafi


On Wed, Mar 27, 2019 at 10:58 AM Piotr Nowojski <[hidden email]> wrote:
Hi Rafi,

There is also an ongoing effort to support bounded streams in DataStream API [1], which might provide the backbone for the functionalists that you need. 

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11875

On 25 Mar 2019, at 10:00, Kostas Kloudas <[hidden email]> wrote:

Hi Rafi,

Although not the most elegant, but one solution could be to write your program using the file 
source in PROCESS_CONTINUOUSLY mode, as described here 
and when you are sure that the processing of your file is done, then you cancel the job.

As I said, this is not the most elegant solution but it will do the job.

Cheers,
Kostas



On Mon, Mar 25, 2019 at 9:49 AM Rafi Aroch <[hidden email]> wrote:
Hi Kostas,

Thank you.
I'm currently testing my job against a small file, so it's finishing before the checkpointing starts.
But also if it was a larger file and checkpoint did happen, there would always be the tailing events starting after the last checkpoint until the source has finished.
So would these events be lost?
In this case, any flow which is  (bounded stream) => (StreamingFileSink) would not give the expected results...

The other alternative would be using BucketingSink, but it would not guaranty exactly-once into S3 which is not preferable. 

Can you suggest any workaround? Somehow making sure checkpointing is triggered at the end?

Rafi


On Thu, Mar 21, 2019 at 9:40 PM Kostas Kloudas <[hidden email]> wrote:
Sorry Rafi,

I just read your previous response where you say that you have already activated checkpointing.
My bad for not paying attention.

Unfortunately, currently in-progress files only roll (or get finalized) on checkpoint barriers and NOT when calling close(). 
This is due to the fact that at the function level, Flink does not differentiate between failures and normal termination.
But there are plans to fix it: https://issues.apache.org/jira/browse/FLINK-2646

So given the above, you should check if checkpoints go through your pipeline or not before your source 
stream reaches its end. If there are no checkpoints, then your in-progress files will not be finalized and 
Parquet, for example, will not write the footer that is needed to be able to properly read the file.

Kostas


On Thu, Mar 21, 2019 at 8:03 PM Rafi Aroch <[hidden email]> wrote:
Hi Kostas, 

Yes I have. 

Rafi 

On Thu, Mar 21, 2019, 20:47 Kostas Kloudas <[hidden email]> wrote:
Hi Rafi, 

Have you enabled checkpointing for you job?

Cheers,
Kostas

On Thu, Mar 21, 2019 at 5:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Piotr and Kostas,

Thanks for your reply.

The issue is that I don't see any committed files, only in-progress.
I tried to debug the code for more details. I see that in BulkPartWriter I do reach the write methods and see events getting written, but I never reach the closeForCommit. I reach straight to the close function where all parts are disposed.

In my job I have a finite stream (source is reading from parquet file/s). Doing some windowed aggregation and writing back to a parquet file. 
As far as I know, it should commit files during checkpoints and when the stream has finished. I did enabled checkpointing.
I did verify that if I connect to other sinks, I see the events.

Let me know if I can provide any further information that could be helpful.

Would appreciate your help.

Thanks,
Rafi


On Thu, Mar 21, 2019 at 5:20 PM Kostas Kloudas <[hidden email]> wrote:
Hi Rafi,

Piotr is correct. In-progress files are not necessarily readable.
The valid files are the ones that are "committed" or finalized.

Cheers,
Kostas

On Thu, Mar 21, 2019 at 2:53 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but shouldn’t you be just reading committed files and ignore in-progress? Maybe Kostas could add more insight to this topic.

Piotr Nowojski

On 20 Mar 2019, at 12:23, Rafi Aroch <[hidden email]> wrote:

Hi,

I'm trying to stream events in Prorobuf format into a parquet file.
I looked into both streaming-file options: BucketingSink & StreamingFileSink.
I first tried using the newer StreamingFileSink with the forBulkFormat API. I noticed there's currently support only for the Avro format with the ParquetAvroWriters.
I followed the same convention as Avro and wrote a ParquetProtoWriters builder class:

public class ParquetProtoWriters {

private static final int pageSize = 64 * 1024;

public static <T extends Message> ParquetWriterFactory<T> forType(final Class<T> protoClass) {
final ParquetBuilder<T> builder = (out) -> createProtoParquetWriter(protoClass, out);
return new ParquetWriterFactory<>(builder);
}

private static <T extends Message> ParquetWriter<T> createProtoParquetWriter(
Class<T> type,
OutputFile out) throws IOException {

return ProtoParquetWriter.<T>builder(out)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withProtoClass(type)
.build();
}
}
And then I use it as follows:
StreamingFileSink
.forBulkFormat(new Path("some-path), ParquetProtoWriters.forType(SomeProtoType.class))
.build();
I ran tests on the ParquetProtoWriters itself and it writes everything properly and i'm able to read the files.

When I use the sink as part of a job I see illegal Parquet files created:
# parquet-tools cat .part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea
.part-0-0.inprogress.b3a65d7f-f297-46bd-bdeb-ad24106ad3ea is not a Parquet file (too small length: 4)

Can anyone suggest what am I missing here?

When trying to use the BucketingSink, I wrote a Writer class for Protobuf and everything worked perfectly:
public class FlinkProtoParquetWriter<T extends MessageOrBuilder> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private Path path;
private Class<? extends Message> protoClass;
private transient ParquetWriter<T> writer;

private int position;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

public FlinkProtoParquetWriter(Class<? extends Message> protoClass) {
this.protoClass = protoClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new FlinkProtoParquetWriter<>(protoClass);
}

private ParquetWriter<T> createWriter() throws IOException {
return ProtoParquetWriter
.<T>builder(path)
.withPageSize(pageSize)
.withCompressionCodec(compressionCodecName)
.withProtoClass(protoClass)
.build();
}
}

Rafi



--
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen