As for StreamingFileSink and compressed output, see the StreamingFileSink.forBulkFormat and BulkWriter.Factory classes. Simple example (using apache commons-io and commons-compress):
val writer = new BulkWriter.Factory[String] {
override def create(out: FSDataOutputStream): BulkWriter[String] = new BulkWriter[String] {
val compressed = new GzipCompressorOutputStream(out)
override def addElement(element: String): Unit = compressed.write(element.getBytes())
override def flush(): Unit = compressed.flush()
override def finish(): Unit = compressed.close()
}
}
val sink = StreamingFileSink.forBulkFormat[String](new Path("/some/path"), writer)
There are still some usability issues with StreamingFileSink (like not being able to customize the resulting file names), but they are already going to be fixed in Flink 1.10.
On Fri, Oct 11, 2019, at 23:07, John O wrote:
Hello,
Question 1
I don’t see any reference material showing how to write compressed (gzip) files with StreamingFileSink. Can someone point me in the right direction?
Question 2
We currently have a use case for a “StreamingFileProcessFunction”. Basically we need an output for the StreamingFileSink that will be used by a downstream processor. What would be the best way to implement
this feature?
Best,
Song