Hi!
I am quite new to Apache Flink but we have been evaluating it for some weeks to read from kafka and transform and write to hdfs. With kafka to hdfs with exactly-once configured this works as expected but when we replace the source with kafka to hdfs files hangs in .in-progress.
We first experienced this also with kafka but after properly configuring checkpoints we got expected results.
We have simply changed kafka source to hdfs source and kept streaming mode although this is obviously a bounded data set so there might be some issue with this that we do not understand.
Any help with this is highly appreciated!
Regards /Johan Rask
And I also copy of the gist here.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://<server><dir>/checkpoints",true));
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
StreamingFileSink<JsonObject> hdfsSink = StreamingFileSink
.<JsonObject>forRowFormat(new Path("hdfs://<server>/<dir>"), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new EventTimeDateTimeBuckerAssigner<>("'/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"))
.build();
env.readTextFile("hdfs://<server><dir><file>")
.map(Parser::parse)
.addSink(hdfsSink);
env.execute("some-pipeline");
}
hdfs -> hdfs results in the following. However if I use kafka as source, it works properly.
rw-rw----+ 3 someuser supergroup 87792789 2019-11-16 20:57 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-62.inprogress.8f9c6104-4c6c-4eee-8650-dd5d1d12d668
-rw-rw----+ 3 someuser supergroup 64696413 2019-11-16 20:58 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-63.inprogress.42589a04-601b-496d-ae20-7db1d56089dc
... rest is removed for clarity