StreamingFileSink Not Flushing All Data

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink Not Flushing All Data

austin.ce
Hi there,

Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.

For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.

Here's a runnable example of the issue:

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

class Scratch {
public static class Record implements Serializable {
private static final long serialVersionUID = 1L;

String id;

public Record() {}

public Record(String id) {
this.id = id;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}
}

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
String fullText = "hey\nyou\nwork";
byte[] fullTextData = fullText.getBytes();
fileEntry.setSize(fullTextData.length);
taos.putArchiveEntry(fileEntry);
taos.write(fullTextData, 0, fullTextData.length);
taos.closeArchiveEntry();
taos.flush();
taos.close();

StreamingFileSink<Record> textSink = StreamingFileSink
.forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
new BulkWriter.Factory<Record>() {
@Override
public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));

return new BulkWriter<Record>() {
@Override
public void addElement(Record record) throws IOException {
TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
fileEntry.setSize(fullTextData.length);
compressedOutputStream.putArchiveEntry(fileEntry);
compressedOutputStream.write(fullTextData, 0, fullTextData.length);
compressedOutputStream.closeArchiveEntry();
}

@Override
public void flush() throws IOException {
compressedOutputStream.flush();
}

@Override
public void finish() throws IOException {
this.flush();
}
};
}
})
.withBucketCheckInterval(1000)
.build();

env
.fromElements(new Record("1"), new Record("2"))
.addSink(textSink)
.name("Streaming File Sink")
.uid("streaming-file-sink");
env.execute("streaming file sink test");
}
}

From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:

~/Downloads » stat test.tgz
  File: test.tgz
  Size: 114       Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d Inode: 30041077    Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:30:06.009028283 -0500
Modify: 2020-02-21 19:30:44.509424406 -0500
Change: 2020-02-21 19:30:44.509424406 -0500
 Birth: -

~/Downloads » tar -tvf test.tgz
-rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt

~/Downloads » hd test.tgz
00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
00000070  00 00                                             |..|
00000072



text-output/37 » tar -xzf part-0-0

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error is not recoverable: exiting now

text-output/37 » stat part-0-0
  File: part-0-0
  Size: 10              Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d      Inode: 4590487     Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:33:06.258888702 -0500
Modify: 2020-02-21 19:33:04.466870139 -0500
Change: 2020-02-21 19:33:05.294878716 -0500
 Birth: -

text-output/37 » hd part-0-0
00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
0000000a

Is there anything simple I'm missing?

Best,
Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Dawid Wysakowicz-2

Hi Austing,

If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.

I think you can also adjust that behavior with:

forBulkFormat(...)

.withRollingPolicy(/* your custom logic */)

I also cc Kostas who should be able to correct me if I am wrong.

Best,

Dawid

On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
Hi there,

Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `<a class="moz-txt-link-freetext" href="file://">file://` protocol.

For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.

Here's a runnable example of the issue:

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

class Scratch {
  public static class Record implements Serializable {
    private static final long serialVersionUID = 1L;

    String id;

    public Record() {}

    public Record(String id) {
      this.id = id;
    }

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }
  }

  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
    TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
    String fullText = "hey\nyou\nwork";
    byte[] fullTextData = fullText.getBytes();
    fileEntry.setSize(fullTextData.length);
    taos.putArchiveEntry(fileEntry);
    taos.write(fullTextData, 0, fullTextData.length);
    taos.closeArchiveEntry();
    taos.flush();
    taos.close();

    StreamingFileSink<Record> textSink = StreamingFileSink
        .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
            new BulkWriter.Factory<Record>() {
              @Override
              public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
                final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));

                return new BulkWriter<Record>() {
                  @Override
                  public void addElement(Record record) throws IOException {
                    TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
                    byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
                    fileEntry.setSize(fullTextData.length);
                    compressedOutputStream.putArchiveEntry(fileEntry);
                    compressedOutputStream.write(fullTextData, 0, fullTextData.length);
                    compressedOutputStream.closeArchiveEntry();
                  }

                  @Override
                  public void flush() throws IOException {
                    compressedOutputStream.flush();
                  }

                  @Override
                  public void finish() throws IOException {
                    this.flush();
                  }
                };
              }
            })
        .withBucketCheckInterval(1000)
        .build();

    env
        .fromElements(new Record("1"), new Record("2"))
        .addSink(textSink)
        .name("Streaming File Sink")
        .uid("streaming-file-sink");
    env.execute("streaming file sink test");
  }
}

From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:

~/Downloads » stat test.tgz
  File: test.tgz
  Size: 114       Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d Inode: 30041077    Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:30:06.009028283 -0500
Modify: 2020-02-21 19:30:44.509424406 -0500
Change: 2020-02-21 19:30:44.509424406 -0500
 Birth: -

~/Downloads » tar -tvf test.tgz
-rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt

~/Downloads » hd test.tgz
00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
00000070  00 00                                             |..|
00000072



text-output/37 » tar -xzf part-0-0

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error is not recoverable: exiting now

text-output/37 » stat part-0-0
  File: part-0-0
  Size: 10              Blocks: 8          IO Block: 4096   regular file
Device: 801h/2049d      Inode: 4590487     Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:33:06.258888702 -0500
Modify: 2020-02-21 19:33:04.466870139 -0500
Change: 2020-02-21 19:33:05.294878716 -0500
 Birth: -

text-output/37 » hd part-0-0
00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
0000000a

Is there anything simple I'm missing?

Best,
Austin

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Kostas Kloudas-5
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:

>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

austin.ce
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Rafi Aroch
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Kostas Kloudas-5
Hi Austin and Rafi,

@Rafi Thanks for providing the pointers!
Unfortunately there is no progress on the FLIP (or the issue).

@ Austin In the meantime, what you could do --assuming that your input is bounded --  you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround.
 
Hopefully the FLIP and related issue is going to be prioritised soon.

Cheers,
Kostas

On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <[hidden email]> wrote:
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

austin.ce
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas -- strange though, as I wasn't using a bounded source when I first ran into this issue. I have updated the example repo to use an unbounded source[1], and the same file corruption problems remain. 

Anything else I could be doing wrong with the compression stream? 

Thanks again,
Austin


On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin and Rafi,

@Rafi Thanks for providing the pointers!
Unfortunately there is no progress on the FLIP (or the issue).

@ Austin In the meantime, what you could do --assuming that your input is bounded --  you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround.
 
Hopefully the FLIP and related issue is going to be prioritised soon.

Cheers,
Kostas

On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <[hidden email]> wrote:
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Kostas Kloudas-5
Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already merged in 1.10, 
would upgrading to 1.10 and using the newly introduced CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas



On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas -- strange though, as I wasn't using a bounded source when I first ran into this issue. I have updated the example repo to use an unbounded source[1], and the same file corruption problems remain. 

Anything else I could be doing wrong with the compression stream? 

Thanks again,
Austin


On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin and Rafi,

@Rafi Thanks for providing the pointers!
Unfortunately there is no progress on the FLIP (or the issue).

@ Austin In the meantime, what you could do --assuming that your input is bounded --  you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround.
 
Hopefully the FLIP and related issue is going to be prioritised soon.

Cheers,
Kostas

On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <[hidden email]> wrote:
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

austin.ce
Hey Kostas,

We’re a little bit off from a 1.10 update but I can certainly see if that CompressWriterFactory might solve my use case for when we do. 

If there is anything I can do to help document that feature, please let me know. 

Thanks!

Austin 

On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already merged in 1.10, 
would upgrading to 1.10 and using the newly introduced CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas



On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas -- strange though, as I wasn't using a bounded source when I first ran into this issue. I have updated the example repo to use an unbounded source[1], and the same file corruption problems remain. 

Anything else I could be doing wrong with the compression stream? 

Thanks again,
Austin


On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin and Rafi,

@Rafi Thanks for providing the pointers!
Unfortunately there is no progress on the FLIP (or the issue).

@ Austin In the meantime, what you could do --assuming that your input is bounded --  you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround.
 
Hopefully the FLIP and related issue is going to be prioritised soon.

Cheers,
Kostas

On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <[hidden email]> wrote:
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

Kostas Kloudas-5
Thanks Austin,

If the CompressionWriterFactory works for you in 1.10, then you can copy it as is in 1.9 and use it. The BulkWriter interfaces have not changed between the versions (as far as I recall). But please keep in mind that there is a bug in the CompressWriterFactory with a pending PR that fixes it (https://github.com/apache/flink/pull/11307). So if you copy and try to use it please include the patch from that PR.

As for the documentation, if you are willing to contribute that would be a great help. You can open an issue and submit a PR with an example, as done for the other bulk formats in the documentation here:

Let us know if it works for you!

Cheers,
Kostas 

On Thu, Mar 5, 2020 at 1:43 AM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Kostas,

We’re a little bit off from a 1.10 update but I can certainly see if that CompressWriterFactory might solve my use case for when we do. 

If there is anything I can do to help document that feature, please let me know. 

Thanks!

Austin 

On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already merged in 1.10, 
would upgrading to 1.10 and using the newly introduced CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas



On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas -- strange though, as I wasn't using a bounded source when I first ran into this issue. I have updated the example repo to use an unbounded source[1], and the same file corruption problems remain. 

Anything else I could be doing wrong with the compression stream? 

Thanks again,
Austin


On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin and Rafi,

@Rafi Thanks for providing the pointers!
Unfortunately there is no progress on the FLIP (or the issue).

@ Austin In the meantime, what you could do --assuming that your input is bounded --  you could simply not stop the job after the whole input is processed, then wait until the output is committed, and then cancel the job. I know and I agree that this is not an elegant solution but it is a temporary workaround.
 
Hopefully the FLIP and related issue is going to be prioritised soon.

Cheers,
Kostas

On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch <[hidden email]> wrote:
Hi,

This happens because StreamingFileSink does not support a finite input stream.
In the docs it's mentioned under "Important Considerations": 

image.png

This behaviour often surprises users...

There's a FLIP and an issue about fixing this. I'm not sure what's the status though, maybe Kostas can share.

Thanks,
Rafi


On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Dawid and Kostas,

Sorry for the late reply + thank you for the troubleshooting. I put together an example repo that reproduces the issue[1], because I did have checkpointing enabled in my previous case -- still must be doing something wrong with that config though.

Thanks!
Austin



On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas <[hidden email]> wrote:
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
<[hidden email]> wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. If you don't have checkpoints enabled it might happen that not all data is flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
>     private static final long serialVersionUID = 1L;
>
>     String id;
>
>     public Record() {}
>
>     public Record(String id) {
>       this.id = id;
>     }
>
>     public String getId() {
>       return id;
>     }
>
>     public void setId(String id) {
>       this.id = id;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>     TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz")));
>     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test"));
>     String fullText = "hey\nyou\nwork";
>     byte[] fullTextData = fullText.getBytes();
>     fileEntry.setSize(fullTextData.length);
>     taos.putArchiveEntry(fileEntry);
>     taos.write(fullTextData, 0, fullTextData.length);
>     taos.closeArchiveEntry();
>     taos.flush();
>     taos.close();
>
>     StreamingFileSink<Record> textSink = StreamingFileSink
>         .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
>             new BulkWriter.Factory<Record>() {
>               @Override
>               public BulkWriter<Record> create(FSDataOutputStream out) throws IOException {
>                 final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
>                 return new BulkWriter<Record>() {
>                   @Override
>                   public void addElement(Record record) throws IOException {
>                     TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id));
>                     byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
>                     fileEntry.setSize(fullTextData.length);
>                     compressedOutputStream.putArchiveEntry(fileEntry);
>                     compressedOutputStream.write(fullTextData, 0, fullTextData.length);
>                     compressedOutputStream.closeArchiveEntry();
>                   }
>
>                   @Override
>                   public void flush() throws IOException {
>                     compressedOutputStream.flush();
>                   }
>
>                   @Override
>                   public void finish() throws IOException {
>                     this.flush();
>                   }
>                 };
>               }
>             })
>         .withBucketCheckInterval(1000)
>         .build();
>
>     env
>         .fromElements(new Record("1"), new Record("2"))
>         .addSink(textSink)
>         .name("Streaming File Sink")
>         .uid("streaming-file-sink");
>     env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114       Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d Inode: 30041077    Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:30:06.009028283 -0500
> Modify: 2020-02-21 19:30:44.509424406 -0500
> Change: 2020-02-21 19:30:44.509424406 -0500
>  Birth: -
>
> ~/Downloads » tar -tvf test.tgz
> -rw-r--r-- 0/0              12 2020-02-21 19:35 test.txt
>
> ~/Downloads » hd test.tgz
> 00000000  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20  |............1.. |
> 00000010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03  |..afO...+.<.....|
> 00000020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c  |....w0......7..L|
> 00000030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f  |Ez..4.."......%.|
> 00000040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63  |.c.....w.X...."c|
> 00000050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6  |6x|....i_....S..|
> 00000060  94 97 bf 5b 94 52 4a 7d  e7 00 4d ce eb e7 00 08  |...[.RJ}..M.....|
> 00000070  00 00                                             |..|
> 00000072
>
>
>
> text-output/37 » tar -xzf part-0-0
>
> gzip: stdin: unexpected end of file
> tar: Child returned status 1
> tar: Error is not recoverable: exiting now
>
> text-output/37 » stat part-0-0
>   File: part-0-0
>   Size: 10              Blocks: 8          IO Block: 4096   regular file
> Device: 801h/2049d      Inode: 4590487     Links: 1
> Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
> Access: 2020-02-21 19:33:06.258888702 -0500
> Modify: 2020-02-21 19:33:04.466870139 -0500
> Change: 2020-02-21 19:33:05.294878716 -0500
>  Birth: -
>
> text-output/37 » hd part-0-0
> 00000000  1f 8b 08 00 00 00 00 00  00 ff                    |..........|
> 0000000a
>
> Is there anything simple I'm missing?
>
> Best,
> Austin
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink Not Flushing All Data

hubertchen
In reply to this post by Kostas Kloudas-5
Hi Kostas,

I'm confused about the implementation of the temporary workaround. Would it
be possible to get a little more detail?

> you could simply not stop the job after the whole input is processed

How does one determine when the job has processed the whole input?  

> then wait until the output is committed

Isn't it necessary to manually commit the in-progress files rather than
waiting? The reason being that the source task could have successfully
completed and checkpoints would no longer be propagated downstream.
Therefore, any bulk-encoded format would not commit.

Best,
Hubert



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/