Writing stream to Hadoop

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing stream to Hadoop

miki haiat
Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki 

Reply | Threaded
Open this post in threaded view
|

Re: Writing stream to Hadoop

Marvin777
I think you can look at this comment, thanks.
* <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.





2018-06-05 17:14 GMT+08:00 miki haiat <[hidden email]>:
Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki 


Reply | Threaded
Open this post in threaded view
|

Re: Writing stream to Hadoop

Kostas Kloudas
In reply to this post by miki haiat
Hi Miki,

Have you enabled checkpointing?

Kostas

On Jun 5, 2018, at 11:14 AM, miki haiat <[hidden email]> wrote:

Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("<a href="hdfs://***:9000/flink/my_city/checkpoints" class="">hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
new BucketingSink<>("<a href="hdfs://****:9000/mycity/raw" class="">hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki 


Reply | Threaded
Open this post in threaded view
|

Re: Writing stream to Hadoop

miki haiat
I saw the option of enabling checkpoint enabling-and-configuring-checkpointing 
But on 1.5 it said that the method is deprecated so im a bit confused .
/** @deprecated */
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
this.checkpointCfg.setCheckpointInterval(500L);
return this;
}



On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas <[hidden email]> wrote:
Hi Miki,

Have you enabled checkpointing?

Kostas

On Jun 5, 2018, at 11:14 AM, miki haiat <[hidden email]> wrote:

Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki 


Reply | Threaded
Open this post in threaded view
|

Re: Writing stream to Hadoop

Chesnay Schepler
This particular version of the method is deprecated, use enableCheckpointing(long checkpointingInterval) instead.

On 05.06.2018 12:19, miki haiat wrote:
I saw the option of enabling checkpoint enabling-and-configuring-checkpointing 
But on 1.5 it said that the method is deprecated so im a bit confused .
/** @deprecated */
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
    this.checkpointCfg.setCheckpointInterval(500L);
    return this;
}



On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas <[hidden email]> wrote:
Hi Miki,

Have you enabled checkpointing?

Kostas

On Jun 5, 2018, at 11:14 AM, miki haiat <[hidden email]> wrote:

Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
        new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki 



Reply | Threaded
Open this post in threaded view
|

Re: Writing stream to Hadoop

miki haiat
OMG i missed it ...

Thanks,

MIki

On Tue, Jun 5, 2018 at 1:30 PM Chesnay Schepler <[hidden email]> wrote:
This particular version of the method is deprecated, use enableCheckpointing(long checkpointingInterval) instead.

On 05.06.2018 12:19, miki haiat wrote:
I saw the option of enabling checkpoint enabling-and-configuring-checkpointing 
But on 1.5 it said that the method is deprecated so im a bit confused .
/** @deprecated */
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
    this.checkpointCfg.setCheckpointInterval(500L);
    return this;
}



On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas <[hidden email]> wrote:
Hi Miki,

Have you enabled checkpointing?

Kostas

On Jun 5, 2018, at 11:14 AM, miki haiat <[hidden email]> wrote:

Im trying to write some data to Hadoop by using this code 

The state backend is set without time    
StateBackend sb = new FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);
BucketingSink<Tuple2<IntWritable, Text>> sink =
        new BucketingSink<>("hdfs://****:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(120000);
sink.setInactiveBucketThreshold(120000);
the result is that all the files are stuck in in.programs  status and not closed.
is it related to the state backend configuration.

thanks,

Miki