Rolling File Sink Exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Rolling File Sink Exception

clay4444
When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows:

public class StringCompressWriter<T> extends StreamWriterBase<T> {

    private static final long serialVersionUID = 1L;

    private String charsetName;

    private transient Charset charset;

    private transient CompressionOutputStream outStream;


    public StringCompressWriter() {
        this("UTF-8");
    }

    public StringCompressWriter(String charsetName) {
        this.charsetName = charsetName;
    }

    protected StringCompressWriter(StringCompressWriter<T> other) {
        super(other);
        this.charsetName = other.charsetName;
    }


    /**
     * open & write
     * @return
     */
    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);

        this.charset = Charset.forName(charsetName);

        Configuration conf = fs.getConf();

        CompressionCodecFactory codecFactory = new
CompressionCodecFactory(conf);
        CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");

        FSDataOutputStream dataOutputStream = getStream();
        Compressor compressor = CodecPool.getCompressor(codec,
fs.getConf());
        outStream = codec.createOutputStream(dataOutputStream, compressor);
    }

    @Override
    public void write(T element) throws IOException {
        getStream(); // Throws if the stream is not open
        outStream.write(element.toString().getBytes(charset));
        outStream.write('\n');
    }

    @Override
    public void close() throws IOException {
        if (outStream != null) {
            outStream.close();
//            outStream = null;
        }
        super.close();
    }

    @Override
    public Writer<T> duplicate() {
        return new StringCompressWriter<>(this);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), charsetName);
    }

    @Override
    public boolean equals(Object other) {
        if (this == other) {
            return true;
        }
        if (other == null) {
            return false;
        }
        if (getClass() != other.getClass()) {
            return false;
        }
        StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
        // field comparison
        return Objects.equals(charsetName, writer.charsetName)
                && super.equals(other);
    }
}

But when I run my app on yarn, the taskmanager always reports the following
error:
2018-09-03 15:25:54,187 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
to FAILED.
java.nio.channels.ClosedChannelException
        at
org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
        at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
        at
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
        at
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
        at
com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
        at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
        at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
2018-09-03 15:25:54,191 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
to FAILING.

Moreover, when I looked at the data, I found that the data stream did not
seem to be closed properly.

hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l

text: Unexpected end of ZLIB input stream
3268

Can someone tell me what happened?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Rolling File Sink Exception

Chesnay Schepler
You're closing the stream and then call super.close() which calls flush,
which fails since you already closed the stream.

If you don't close the stream the problem should disappear.

On 03.09.2018 09:30, clay4444 wrote:

> When I want to write compressed string data to hdfs, I found that flink only
> provides StringWritter, so I used a custom writter, as follows:
>
> public class StringCompressWriter<T> extends StreamWriterBase<T> {
>
>      private static final long serialVersionUID = 1L;
>
>      private String charsetName;
>
>      private transient Charset charset;
>
>      private transient CompressionOutputStream outStream;
>
>
>      public StringCompressWriter() {
>          this("UTF-8");
>      }
>
>      public StringCompressWriter(String charsetName) {
>          this.charsetName = charsetName;
>      }
>
>      protected StringCompressWriter(StringCompressWriter<T> other) {
>          super(other);
>          this.charsetName = other.charsetName;
>      }
>
>
>      /**
>       * open & write
>       * @return
>       */
>      @Override
>      public void open(FileSystem fs, Path path) throws IOException {
>          super.open(fs, path);
>
>          this.charset = Charset.forName(charsetName);
>
>          Configuration conf = fs.getConf();
>
>          CompressionCodecFactory codecFactory = new
> CompressionCodecFactory(conf);
>          CompressionCodec codec = codecFactory.getCodecByName("GzipCodec");
>
>          FSDataOutputStream dataOutputStream = getStream();
>          Compressor compressor = CodecPool.getCompressor(codec,
> fs.getConf());
>          outStream = codec.createOutputStream(dataOutputStream, compressor);
>      }
>
>      @Override
>      public void write(T element) throws IOException {
>          getStream(); // Throws if the stream is not open
>          outStream.write(element.toString().getBytes(charset));
>          outStream.write('\n');
>      }
>
>      @Override
>      public void close() throws IOException {
>          if (outStream != null) {
>              outStream.close();
> //            outStream = null;
>          }
>          super.close();
>      }
>
>      @Override
>      public Writer<T> duplicate() {
>          return new StringCompressWriter<>(this);
>      }
>
>      @Override
>      public int hashCode() {
>          return Objects.hash(super.hashCode(), charsetName);
>      }
>
>      @Override
>      public boolean equals(Object other) {
>          if (this == other) {
>              return true;
>          }
>          if (other == null) {
>              return false;
>          }
>          if (getClass() != other.getClass()) {
>              return false;
>          }
>          StringCompressWriter<T> writer = (StringCompressWriter<T>) other;
>          // field comparison
>          return Objects.equals(charsetName, writer.charsetName)
>                  && super.equals(other);
>      }
> }
>
> But when I run my app on yarn, the taskmanager always reports the following
> error:
> 2018-09-03 15:25:54,187 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
> bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING
> to FAILED.
> java.nio.channels.ClosedChannelException
> at
> org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982)
> at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942)
> at
> org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
> at
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83)
> at
> org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99)
> at
> com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> 2018-09-03 15:25:54,191 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING
> to FAILING.
>
> Moreover, when I looked at the data, I found that the data stream did not
> seem to be closed properly.
>
> hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l
>
> text: Unexpected end of ZLIB input stream
> 3268
>
> Can someone tell me what happened?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>