Writer has already been opened on BucketingSink to S3

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view

Writer has already been opened on BucketingSink to S3

Chengzhi Zhao
Hi Flink Community,

I am using flink 1.6.0 and I am using BucketingSink to S3.

After the application running for a while ~ 20 mins, I got an exception: java.lang.IllegalStateException: Writer has already been opened

I have attached the job manager log and the sink code is here:
val avroOutputPath = output
var properties = new util.HashMap[String, String]()
val stringSchema = Schema.create(Type.STRING)
val keySchema = stringSchema.toString
val valueSchema = schema.toString

val compress = true
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)

val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000)

def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
val writer = new AvroKeyValueSinkWriter[String, R](properties)
Any suggestions on why this could happen and how to debug it? Thanks for your help in advance!


log_job_manager.txt (310K) Download Attachment
Reply | Threaded
Open this post in threaded view

Re: Writer has already been opened on BucketingSink to S3

Chengzhi Zhao
After checking the code, I think the issue might be related to AvroKeyValueSinkWriter.java and led to the writer has not been closed completely. I also noticed this change and affect 1.5+ 

public void close() throws IOException {
if (keyValueWriter != null) {
} else {
// need to make sure we close this if we never created the Key/Value Writer.
I created my own AvroKeyValueSinkWriter class and implement the code similar as v1.4, it seems running fine now. 
public void close() throws IOException {
try {
} finally {
if (keyValueWriter != null) {

I am curious if anyone had the similar issue, Appreciate anyone has insights on it. Thanks! 


On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <[hidden email]> wrote:
Hi Flink Community,

I am using flink 1.6.0 and I am using BucketingSink to S3.

After the application running for a while ~ 20 mins, I got an exception: java.lang.IllegalStateException: Writer has already been opened

I have attached the job manager log and the sink code is here:
val avroOutputPath = output
var properties = new util.HashMap[String, String]()
val stringSchema = Schema.create(Type.STRING)
val keySchema = stringSchema.toString
val valueSchema = schema.toString

val compress = true
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)

val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000)

def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
val writer = new AvroKeyValueSinkWriter[String, R](properties)
Any suggestions on why this could happen and how to debug it? Thanks for your help in advance!


Reply | Threaded
Open this post in threaded view

Re: Writer has already been opened on BucketingSink to S3

Stefan Richter

thanks for putting some effort into debugging the problem. Could you open a Jira with the problem and your analysis so that we can discuss how to proceed with it?


Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <[hidden email]>:

After checking the code, I think the issue might be related to AvroKeyValueSinkWriter.java and led to the writer has not been closed completely. I also noticed this change and affect 1.5+ 

public void close() throws IOException {
if (keyValueWriter != null) {
} else {
// need to make sure we close this if we never created the Key/Value Writer.
I created my own AvroKeyValueSinkWriter class and implement the code similar as v1.4, it seems running fine now. 
public void close() throws IOException {
try {
} finally {
if (keyValueWriter != null) {

I am curious if anyone had the similar issue, Appreciate anyone has insights on it. Thanks! 


On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <[hidden email]> wrote:
Hi Flink Community,

I am using flink 1.6.0 and I am using BucketingSink to S3.

After the application running for a while ~ 20 mins, I got an exception: java.lang.IllegalStateException: Writer has already been opened

I have attached the job manager log and the sink code is here:
val avroOutputPath = output
var properties = new util.HashMap[String, String]()
val stringSchema = Schema.create(Type.STRING)
val keySchema = stringSchema.toString
val valueSchema = schema.toString

val compress = true
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)

val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000)

def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
val writer = new AvroKeyValueSinkWriter[String, R](properties)
Any suggestions on why this could happen and how to debug it? Thanks for your help in advance!


Reply | Threaded
Open this post in threaded view

Re: Writer has already been opened on BucketingSink to S3

Chengzhi Zhao
Thanks Stefan for replying, I created a JIRA ticket https://issues.apache.org/jira/browse/FLINK-10382


On Thu, Sep 20, 2018 at 7:49 AM Stefan Richter <[hidden email]> wrote:

thanks for putting some effort into debugging the problem. Could you open a Jira with the problem and your analysis so that we can discuss how to proceed with it?


Am 18.09.2018 um 23:16 schrieb Chengzhi Zhao <[hidden email]>:

After checking the code, I think the issue might be related to AvroKeyValueSinkWriter.java and led to the writer has not been closed completely. I also noticed this change and affect 1.5+ 

public void close() throws IOException {
if (keyValueWriter != null) {
} else {
// need to make sure we close this if we never created the Key/Value Writer.
I created my own AvroKeyValueSinkWriter class and implement the code similar as v1.4, it seems running fine now. 
public void close() throws IOException {
try {
} finally {
if (keyValueWriter != null) {

I am curious if anyone had the similar issue, Appreciate anyone has insights on it. Thanks! 


On Mon, Sep 17, 2018 at 12:01 PM Chengzhi Zhao <[hidden email]> wrote:
Hi Flink Community,

I am using flink 1.6.0 and I am using BucketingSink to S3.

After the application running for a while ~ 20 mins, I got an exception: java.lang.IllegalStateException: Writer has already been opened

I have attached the job manager log and the sink code is here:
val avroOutputPath = output
var properties = new util.HashMap[String, String]()
val stringSchema = Schema.create(Type.STRING)
val keySchema = stringSchema.toString
val valueSchema = schema.toString

val compress = true
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)

val sink = new BucketingSink[tuple.Tuple2[String, R]](output)
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/"))
sink.setBatchSize(1024 * 1024 * batchSize) // this is 64 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000)

def getWriter(): BucketingSink[tuple.Tuple2[String, R]] = {
val writer = new AvroKeyValueSinkWriter[String, R](properties)
Any suggestions on why this could happen and how to debug it? Thanks for your help in advance!
