Hdfs -> hdfs ends up with .progress files

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Hdfs -> hdfs ends up with .progress files

Johan Rask
Hi!

I am quite new to Apache Flink but we have been evaluating it for some weeks to read from kafka and transform and write to hdfs. With kafka to hdfs with exactly-once configured this works as expected but when we replace the source with kafka to hdfs files hangs in .in-progress.

We first experienced this also with kafka but after properly configuring checkpoints we got expected results.

We have simply changed kafka source to hdfs source and kept streaming mode although this is obviously a bounded data set so there might be some issue with this that we do not understand.

Any help with this is highly appreciated!

Regards /Johan Rask

I have created a small gist of our program here https://gist.github.com/jrask/ef4a8531b0563f1420ce276e7b0f59ce

And I also copy of the gist here.

 public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("hdfs://<server><dir>/checkpoints",true));
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

        StreamingFileSink<JsonObject> hdfsSink = StreamingFileSink
                .<JsonObject>forRowFormat(new Path("hdfs://<server>/<dir>"), new SimpleStringEncoder<>("UTF-8"))
                .withBucketAssigner(new EventTimeDateTimeBuckerAssigner<>("'/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"))
                .build();
         
        env.readTextFile("hdfs://<server><dir><file>")
            .map(Parser::parse)
            .addSink(hdfsSink);

        env.execute("some-pipeline");
    }


hdfs -> hdfs results in the following. However if I use kafka as source, it works properly.

rw-rw----+  3 someuser supergroup   87792789 2019-11-16 20:57 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-62.inprogress.8f9c6104-4c6c-4eee-8650-dd5d1d12d668
-rw-rw----+  3 someuser supergroup   64696413 2019-11-16 20:58 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-63.inprogress.42589a04-601b-496d-ae20-7db1d56089dc

... rest is removed for clarity