When I want to write compressed string data to hdfs, I found that flink only
provides StringWritter, so I used a custom writter, as follows: public class StringCompressWriter<T> extends StreamWriterBase<T> { private static final long serialVersionUID = 1L; private String charsetName; private transient Charset charset; private transient CompressionOutputStream outStream; public StringCompressWriter() { this("UTF-8"); } public StringCompressWriter(String charsetName) { this.charsetName = charsetName; } protected StringCompressWriter(StringCompressWriter<T> other) { super(other); this.charsetName = other.charsetName; } /** * open & write * @return */ @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); this.charset = Charset.forName(charsetName); Configuration conf = fs.getConf(); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); FSDataOutputStream dataOutputStream = getStream(); Compressor compressor = CodecPool.getCompressor(codec, fs.getConf()); outStream = codec.createOutputStream(dataOutputStream, compressor); } @Override public void write(T element) throws IOException { getStream(); // Throws if the stream is not open outStream.write(element.toString().getBytes(charset)); outStream.write('\n'); } @Override public void close() throws IOException { if (outStream != null) { outStream.close(); // outStream = null; } super.close(); } @Override public Writer<T> duplicate() { return new StringCompressWriter<>(this); } @Override public int hashCode() { return Objects.hash(super.hashCode(), charsetName); } @Override public boolean equals(Object other) { if (this == other) { return true; } if (other == null) { return false; } if (getClass() != other.getClass()) { return false; } StringCompressWriter<T> writer = (StringCompressWriter<T>) other; // field comparison return Objects.equals(charsetName, writer.charsetName) && super.equals(other); } } But when I run my app on yarn, the taskmanager always reports the following error: 2018-09-03 15:25:54,187 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING to FAILED. java.nio.channels.ClosedChannelException at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) at com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-09-03 15:25:54,191 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING to FAILING. Moreover, when I looked at the data, I found that the data stream did not seem to be closed properly. hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l text: Unexpected end of ZLIB input stream 3268 Can someone tell me what happened? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You're closing the stream and then call super.close() which calls flush,
which fails since you already closed the stream. If you don't close the stream the problem should disappear. On 03.09.2018 09:30, clay4444 wrote: > When I want to write compressed string data to hdfs, I found that flink only > provides StringWritter, so I used a custom writter, as follows: > > public class StringCompressWriter<T> extends StreamWriterBase<T> { > > private static final long serialVersionUID = 1L; > > private String charsetName; > > private transient Charset charset; > > private transient CompressionOutputStream outStream; > > > public StringCompressWriter() { > this("UTF-8"); > } > > public StringCompressWriter(String charsetName) { > this.charsetName = charsetName; > } > > protected StringCompressWriter(StringCompressWriter<T> other) { > super(other); > this.charsetName = other.charsetName; > } > > > /** > * open & write > * @return > */ > @Override > public void open(FileSystem fs, Path path) throws IOException { > super.open(fs, path); > > this.charset = Charset.forName(charsetName); > > Configuration conf = fs.getConf(); > > CompressionCodecFactory codecFactory = new > CompressionCodecFactory(conf); > CompressionCodec codec = codecFactory.getCodecByName("GzipCodec"); > > FSDataOutputStream dataOutputStream = getStream(); > Compressor compressor = CodecPool.getCompressor(codec, > fs.getConf()); > outStream = codec.createOutputStream(dataOutputStream, compressor); > } > > @Override > public void write(T element) throws IOException { > getStream(); // Throws if the stream is not open > outStream.write(element.toString().getBytes(charset)); > outStream.write('\n'); > } > > @Override > public void close() throws IOException { > if (outStream != null) { > outStream.close(); > // outStream = null; > } > super.close(); > } > > @Override > public Writer<T> duplicate() { > return new StringCompressWriter<>(this); > } > > @Override > public int hashCode() { > return Objects.hash(super.hashCode(), charsetName); > } > > @Override > public boolean equals(Object other) { > if (this == other) { > return true; > } > if (other == null) { > return false; > } > if (getClass() != other.getClass()) { > return false; > } > StringCompressWriter<T> writer = (StringCompressWriter<T>) other; > // field comparison > return Objects.equals(charsetName, writer.charsetName) > && super.equals(other); > } > } > > But when I run my app on yarn, the taskmanager always reports the following > error: > 2018-09-03 15:25:54,187 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: > bucketSink (10/15) (67b40f43fc72371f19e61a6ac3f60819) switched from RUNNING > to FAILED. > java.nio.channels.ClosedChannelException > at > org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1618) > at > org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1982) > at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1942) > at > org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:83) > at > org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:99) > at > com.vipkid.bigdata.sink.StringCompressWriter.close(StringCompressWriter.java:73) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:570) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > 2018-09-03 15:25:54,191 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > TestBucketing (01e7088de6b4f9cbe22d9f6a3fdbd2fe) switched from state RUNNING > to FAILING. > > Moreover, when I looked at the data, I found that the data stream did not > seem to be closed properly. > > hdfs dfs -text /tmp/Test/2018-08-27/part-8-96 | wc -l > > text: Unexpected end of ZLIB input stream > 3268 > > Can someone tell me what happened? > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |