Pending parquet file with Bucking Sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Pending parquet file with Bucking Sink

xiatao123
Hi All,
  Do you guys write parquet file using Bucking Sink? I run into an issue with all the parquet files are in the pending status.  Any ideas?  
processedStream is a DataStream of NDEvent.
Output files are all like this one "_part-0-0.pending"
val parquetSink = new BucketingSink[NDEvent]("/tmp/")
parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
processedStream.addSink(parquetSink)
public class SinkParquetWriter<T> implements Writer<T> {
    transient ParquetWriter writer = null;
String schema = null;

public SinkParquetWriter(String schema) {
this.writer = writer;
this.schema = schema;
}

public void open(FileSystem fileSystem, Path path) throws IOException {
writer = AvroParquetWriter.builder(path)
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
}

public long flush() throws IOException {
return writer.getDataSize();
}

public long getPos() throws IOException {
return writer.getDataSize();
}

public void close() throws IOException {
writer.close();
}

public void write(T t) throws IOException {
writer.write(t);
}

public Writer<T> duplicate() {
return new SinkParquetWriter<T>(schema);
}
}

Thanks,
Tao
Reply | Threaded
Open this post in threaded view
|

Re: Pending parquet file with Bucking Sink

vipul singh
Hi Tao,

Is checkpointing enabled in your app? The pending files should be moved to non-pending files after checkpoint interval.

Please take a look at this link:
"If checkpointing is not enabled the pending files will never be moved to the finished state"

Thanks,
Vipul



On Mon, Dec 18, 2017 at 4:30 PM, Tao Xia <[hidden email]> wrote:
Hi All,
  Do you guys write parquet file using Bucking Sink? I run into an issue with all the parquet files are in the pending status.  Any ideas?  
processedStream is a DataStream of NDEvent.
Output files are all like this one "_part-0-0.pending"
val parquetSink = new BucketingSink[NDEvent]("/tmp/")
parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
processedStream.addSink(parquetSink)
public class SinkParquetWriter<T> implements Writer<T> {
    transient ParquetWriter writer = null;
String schema = null;

public SinkParquetWriter(String schema) {
this.writer = writer;
this.schema = schema;
}

public void open(FileSystem fileSystem, Path path) throws IOException {
writer = AvroParquetWriter.builder(path)
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
}

public long flush() throws IOException {
return writer.getDataSize();
}

public long getPos() throws IOException {
return writer.getDataSize();
}

public void close() throws IOException {
writer.close();
}

public void write(T t) throws IOException {
writer.write(t);
}

public Writer<T> duplicate() {
return new SinkParquetWriter<T>(schema);
}
}

Thanks,
Tao



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Pending parquet file with Bucking Sink

xiatao123
Hi Vipul,
  Thanks for the information.  Yes, I do have checkpointing enabled with 10
millisecs.
  I think the issue here is that the stream ended before the checkpoint
reached.  This is a testing code that the DataStream only have 5 events then
it ended. Once the stream ended, the checkpoint is not triggered, then the
file remains in "pending" state.
  Anyway we can force a checkpoint trigger? or let the sink know the stream
ended?
Thanks,
Tao



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Pending parquet file with Bucking Sink

Aljoscha Krettek
Hi,

Your analysis is correct. If the program ends before we can do a checkpoint files will never be moved to "final" state. We could move all files to "final" stage when the Sink is closing but the problem here is that Flink currently doesn't provide a way for user functions (which Sinks are) to distinguish between "erroneous close" and "close because of stream end" so we cannot currently do this. We are aware of the problem and this is the Jira Issue for tracking it: https://issues.apache.org/jira/browse/FLINK-2646

Best,
Aljoscha 

On 20. Dec 2017, at 19:05, xiatao123 <[hidden email]> wrote:

Hi Vipul,
 Thanks for the information.  Yes, I do have checkpointing enabled with 10
millisecs.
 I think the issue here is that the stream ended before the checkpoint
reached.  This is a testing code that the DataStream only have 5 events then
it ended. Once the stream ended, the checkpoint is not triggered, then the
file remains in "pending" state.
 Anyway we can force a checkpoint trigger? or let the sink know the stream
ended?
Thanks,
Tao



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/