Avro writer has already been opened

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Avro writer has already been opened

Chengzhi Zhao
Hi, there,

I am using avro format and write data to S3, recently upgraded from Flink 1.3.2 to 1.5 and noticed the following errors as below: 

I am using RocksDB and checkpointDataUri is an S3 location.
My writer looks like something below.

val writer = new AvroKeyValueSinkWriter[String, R](properties).duplicate()
sink.setWriter(writer.duplicate())


2018-07-24 17:50:44,012 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed (4/4) (28f918a31d273e176409de3d4cb46c3c) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Writer has already been opened
	at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
	at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
2018-07-24 17:50:44,015 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 28 of job cc73a9db44814dc3d5a5ce497c8b0389 because: Writer has already been opened
2018-07-24 17:50:44,016 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Enrollment Log Member and Chapter (cc73a9db44814dc3d5a5ce497c8b0389) switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Writer has already been opened
	at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
	at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
Any help would be greatly appreciated. Thanks!

Regards,
Chengzhi