Trigger and completed Checkpointing do not appeared

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Trigger and completed Checkpointing do not appeared

Abdullah bin Omar
Hi,

I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




    // *************************************************************************

    // PROGRAM

    // *************************************************************************


    public static void main(String[] args) throws Exception {


        // Checking input parameters

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);


        // set up the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        

     // start a checkpoint every 1000 ms

     env.enableCheckpointing(1000);

     

     // to set minimum progress time to happen between checkpoints

     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

     

     // checkpoints have to complete within 10000 ms, or are discarded

     env.getCheckpointConfig().setCheckpointTimeout(10000);

     

     // set mode to exactly-once (this is the default)

     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  // AT_LEAST_ONCE

     

     // allow only one checkpoint to be in progress at the same time

     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


     // enable externalized checkpoints which are retained after job cancellation

     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

     

     //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

                                                    // number of restart attempts , delay in each restart



        // make parameters available in the web interface

        env.getConfig().setGlobalJobParameters(params);


        // get input data

        DataStream<String> text = null;

        if (params.has("input")) {

            // union all the inputs from text files

            for (String input : params.getMultiParameterRequired("input")) {

                if (text == null) {

                    text = env.readTextFile(input);

                } else {

                    text = text.union(env.readTextFile(input));

                }

            }

            Preconditions.checkNotNull(text, "Input DataStream should not be null.");

        } else {

            System.out.println("Executing WordCount example with default input data set.");

            System.out.println("Use --input to specify file input.");

            // get default test text data

            //text = env.fromElements(WordCountData.WORDS);

        }


        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(value -> value.f0)

                        .sum(1);


        // emit result

        if (params.has("output")) {

            counts.writeAsText(params.get("output"));

        } else {

            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

        }

        // execute program

        env.execute("Checkpointing");

    }


    // *************************************************************************

    // USER FUNCTIONS

    // *************************************************************************


    /**

     * Implements the string tokenizer that splits sentences into words as a user-defined

     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the

     * form of "(word,1)" ({@code Tuple2<String, Integer>}).

     */

    public static final class Tokenizer

            implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}



Screen Shot 2021-03-08 at 2.55.29 PM.png (194K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Abdullah bin Omar
Hi,

Please read the previous email (and also this email) to answer me.

Here in the attached pic, the interval showed 1s. and the job is finished 1939ms

According to the code in the previous email, at least there should be some checkpoint triggered and completed.

However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email)

What is the problem? Why does the completed checkpointing not work in here?

Thank you




On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <[hidden email]> wrote:
Hi,

I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




    // *************************************************************************

    // PROGRAM

    // *************************************************************************


    public static void main(String[] args) throws Exception {


        // Checking input parameters

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);


        // set up the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        

     // start a checkpoint every 1000 ms

     env.enableCheckpointing(1000);

     

     // to set minimum progress time to happen between checkpoints

     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

     

     // checkpoints have to complete within 10000 ms, or are discarded

     env.getCheckpointConfig().setCheckpointTimeout(10000);

     

     // set mode to exactly-once (this is the default)

     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  // AT_LEAST_ONCE

     

     // allow only one checkpoint to be in progress at the same time

     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


     // enable externalized checkpoints which are retained after job cancellation

     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

     

     //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

                                                    // number of restart attempts , delay in each restart



        // make parameters available in the web interface

        env.getConfig().setGlobalJobParameters(params);


        // get input data

        DataStream<String> text = null;

        if (params.has("input")) {

            // union all the inputs from text files

            for (String input : params.getMultiParameterRequired("input")) {

                if (text == null) {

                    text = env.readTextFile(input);

                } else {

                    text = text.union(env.readTextFile(input));

                }

            }

            Preconditions.checkNotNull(text, "Input DataStream should not be null.");

        } else {

            System.out.println("Executing WordCount example with default input data set.");

            System.out.println("Use --input to specify file input.");

            // get default test text data

            //text = env.fromElements(WordCountData.WORDS);

        }


        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(value -> value.f0)

                        .sum(1);


        // emit result

        if (params.has("output")) {

            counts.writeAsText(params.get("output"));

        } else {

            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

        }

        // execute program

        env.execute("Checkpointing");

    }


    // *************************************************************************

    // USER FUNCTIONS

    // *************************************************************************


    /**

     * Implements the string tokenizer that splits sentences into words as a user-defined

     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the

     * form of "(word,1)" ({@code Tuple2<String, Integer>}).

     */

    public static final class Tokenizer

            implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}



Screen Shot 2021-03-08 at 5.48.34 PM.png (811K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Alexey Trenikhun
The picture in first e-mail shows that job was completed in 93ms


From: Abdullah bin Omar <[hidden email]>
Sent: Monday, March 8, 2021 3:53 PM
To: [hidden email] <[hidden email]>
Subject: Re: Trigger and completed Checkpointing do not appeared
 
Hi,

Please read the previous email (and also this email) to answer me.

Here in the attached pic, the interval showed 1s. and the job is finished 1939ms

According to the code in the previous email, at least there should be some checkpoint triggered and completed.

However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email)

What is the problem? Why does the completed checkpointing not work in here?

Thank you




On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <[hidden email]> wrote:
Hi,

I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




    // *************************************************************************

    // PROGRAM

    // *************************************************************************


    public static void main(String[] args) throws Exception {


        // Checking input parameters

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);


        // set up the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        

     // start a checkpoint every 1000 ms

     env.enableCheckpointing(1000);

     

     // to set minimum progress time to happen between checkpoints

     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

     

     // checkpoints have to complete within 10000 ms, or are discarded

     env.getCheckpointConfig().setCheckpointTimeout(10000);

     

     // set mode to exactly-once (this is the default)

     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  // AT_LEAST_ONCE

     

     // allow only one checkpoint to be in progress at the same time

     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


     // enable externalized checkpoints which are retained after job cancellation

     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

     

     //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

                                                    // number of restart attempts , delay in each restart



        // make parameters available in the web interface

        env.getConfig().setGlobalJobParameters(params);


        // get input data

        DataStream<String> text = null;

        if (params.has("input")) {

            // union all the inputs from text files

            for (String input : params.getMultiParameterRequired("input")) {

                if (text == null) {

                    text = env.readTextFile(input);

                } else {

                    text = text.union(env.readTextFile(input));

                }

            }

            Preconditions.checkNotNull(text, "Input DataStream should not be null.");

        } else {

            System.out.println("Executing WordCount example with default input data set.");

            System.out.println("Use --input to specify file input.");

            // get default test text data

            //text = env.fromElements(WordCountData.WORDS);

        }


        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(value -> value.f0)

                        .sum(1);


        // emit result

        if (params.has("output")) {

            counts.writeAsText(params.get("output"));

        } else {

            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

        }

        // execute program

        env.execute("Checkpointing");

    }


    // *************************************************************************

    // USER FUNCTIONS

    // *************************************************************************


    /**

     * Implements the string tokenizer that splits sentences into words as a user-defined

     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the

     * form of "(word,1)" ({@code Tuple2<String, Integer>}).

     */

    public static final class Tokenizer

            implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}


Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Abdullah bin Omar
Hi, 

Thank you for your reply.

I checked by changing enable checkpointing time (I mean by changing the all possible time; however, it still does not show the triggered and completed checkpoint).

Now the code is run in more than 1500 ms (because I increase the size of the input file so that it can take more time to complete the job. As the interval shows 1s, so I increase the runtime more than 1s to observe whether the checkpoint will appear or not. However, it still does not in the apache flink UI)

See the attached pic at the attachment.

what is the problem? why the checkpoint trigger or completed can not show in the apache flink UI?

Thank you!



On Mon, Mar 8, 2021 at 7:19 PM Abdullah bin Omar <[hidden email]> wrote:
Yes, later I increased the size of the input.txt file. Now the job is completed more than 1500 ms.

Thank you!

On Mon, Mar 8, 2021 at 7:16 PM Alexey Trenikhun <[hidden email]> wrote:
The picture in first e-mail shows that job was completed in 93ms


From: Abdullah bin Omar <[hidden email]>
Sent: Monday, March 8, 2021 3:53 PM
To: [hidden email] <[hidden email]>
Subject: Re: Trigger and completed Checkpointing do not appeared
 
Hi,

Please read the previous email (and also this email) to answer me.

Here in the attached pic, the interval showed 1s. and the job is finished 1939ms

According to the code in the previous email, at least there should be some checkpoint triggered and completed.

However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email)

What is the problem? Why does the completed checkpointing not work in here?

Thank you




On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <[hidden email]> wrote:
Hi,

I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




    // *************************************************************************

    // PROGRAM

    // *************************************************************************


    public static void main(String[] args) throws Exception {


        // Checking input parameters

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);


        // set up the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        

     // start a checkpoint every 1000 ms

     env.enableCheckpointing(1000);

     

     // to set minimum progress time to happen between checkpoints

     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

     

     // checkpoints have to complete within 10000 ms, or are discarded

     env.getCheckpointConfig().setCheckpointTimeout(10000);

     

     // set mode to exactly-once (this is the default)

     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  // AT_LEAST_ONCE

     

     // allow only one checkpoint to be in progress at the same time

     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


     // enable externalized checkpoints which are retained after job cancellation

     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

     

     //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

     env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

                                                    // number of restart attempts , delay in each restart



        // make parameters available in the web interface

        env.getConfig().setGlobalJobParameters(params);


        // get input data

        DataStream<String> text = null;

        if (params.has("input")) {

            // union all the inputs from text files

            for (String input : params.getMultiParameterRequired("input")) {

                if (text == null) {

                    text = env.readTextFile(input);

                } else {

                    text = text.union(env.readTextFile(input));

                }

            }

            Preconditions.checkNotNull(text, "Input DataStream should not be null.");

        } else {

            System.out.println("Executing WordCount example with default input data set.");

            System.out.println("Use --input to specify file input.");

            // get default test text data

            //text = env.fromElements(WordCountData.WORDS);

        }


        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field "1"

                        .keyBy(value -> value.f0)

                        .sum(1);


        // emit result

        if (params.has("output")) {

            counts.writeAsText(params.get("output"));

        } else {

            System.out.println("Printing result to stdout. Use --output to specify output path.");

            counts.print();

        }

        // execute program

        env.execute("Checkpointing");

    }


    // *************************************************************************

    // USER FUNCTIONS

    // *************************************************************************


    /**

     * Implements the string tokenizer that splits sentences into words as a user-defined

     * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the

     * form of "(word,1)" ({@code Tuple2<String, Integer>}).

     */

    public static final class Tokenizer

            implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}



Screen Shot 2021-03-08 at 5.48.34 PM.png (811K) Download Attachment
Screen Shot 2021-03-08 at 7.27.25 PM.png (716K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Smile
Hi,

Could you please change the source to an endless one? For example a Kafka
source or a custom source that implements SourceFunction([1])?
env.readTextFile() won't wait for all data to be finished, but exit
immediately after telling readers what to read. So it may exit before the
first checkpoint being triggered. See [2] for more information.

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Abdullah bin Omar
Hi Smile,

Thank you for your reply.

I read [1] according to the last email. I will have to add implements SourceFunction <Long>  CheckpointedFunction with the main class. Then calling run() and cancel() inside the main class. Is it correct?

I just run the sample code from apache flink. I can not understand everything.

In this case, could you please inform me where I will have to change in my code (it is in the first email) and which one to replace? If you can answer, it will be really helpful for me as a newbie.

Thank you in advance!

On Tue, Mar 9, 2021 at 4:07 AM Abdullah bin Omar <[hidden email]> wrote:
Hi Smile,

Thank you for your reply.

I read [1] according to the last email. I will have to add implements SourceFunction <Long>  CheckpointedFunction with the main class. Then calling run() and cancel() inside the main class. Is it correct?

I just run the sample code from apache flink. I can not understand everything.

In this case, could you please inform me where I will have to change in my code (it is in the first email) and which one to replace? If you can answer, it will be really helpful for me as a newbie.

Thank you in advance!

        

On Mon, Mar 8, 2021 at 10:21 PM Smile <[hidden email]> wrote:
Hi,

Could you please change the source to an endless one? For example a Kafka
source or a custom source that implements SourceFunction([1])?
env.readTextFile() won't wait for all data to be finished, but exit
immediately after telling readers what to read. So it may exit before the
first checkpoint being triggered. See [2] for more information.

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Smile
Hi,

After implementing SourceFunction, you can use it to create a DataStream
using env.addSource() in your main method.
For example, if you have your custom source class with the name CustomSource
that implements SourceFunction<String>, then it can be used for getting
input data and the if-statement after it can be removed:

// get input data
DataStream<String> text = env.addSource(new CustomSource());


ExampleCountSource in [1] implements SourceFunction<Long>, which can be used
to get a DataStream with type Long, not String, such as:

DataStream<Long> numbers = env.addSource(new ExampleCountSource());


If you only want to have a look at how checkpoint being triggered, see [2]
for another sample that has a custom endless source named TransactionSource.
When enabled checkpoint it can be triggered with your rules. It might be
easier for a beginner than implement it by yourself.
However, it may not restore from a checkpoint perfectly since it doesn't
implement CheckpointedFunction. That is to say, if you want your source to
be restored successfully after failures, CheckpointedFunction is also
necessary and ExampleCountSource in [1] is a good example.


[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html

Regards,
Smile




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Trigger and completed Checkpointing do not appeared

Abdullah bin Omar
Hi, 

CheckpointedFunction is needed to trigger and complete the checkpoint, is it right?

What is the work for the code in here [1]?  it [1] said to enable checkpointing or some other function. What is the difference between [1] and [2]?

If we use ExampleCountSource[2] example, what will be the import portion at the beginning of code and the pom.xml? 


Thank you for your service!



On Tue, Mar 9, 2021 at 6:21 AM Smile <[hidden email]> wrote:
Hi,

After implementing SourceFunction, you can use it to create a DataStream
using env.addSource() in your main method.
For example, if you have your custom source class with the name CustomSource
that implements SourceFunction<String>, then it can be used for getting
input data and the if-statement after it can be removed:

// get input data
DataStream<String> text = env.addSource(new CustomSource());


ExampleCountSource in [1] implements SourceFunction<Long>, which can be used
to get a DataStream with type Long, not String, such as:

DataStream<Long> numbers = env.addSource(new ExampleCountSource());


If you only want to have a look at how checkpoint being triggered, see [2]
for another sample that has a custom endless source named TransactionSource.
When enabled checkpoint it can be triggered with your rules. It might be
easier for a beginner than implement it by yourself.
However, it may not restore from a checkpoint perfectly since it doesn't
implement CheckpointedFunction. That is to say, if you want your source to
be restored successfully after failures, CheckpointedFunction is also
necessary and ExampleCountSource in [1] is a good example.


[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html

Regards,
Smile




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:Re: Trigger and completed Checkpointing do not appeared

Smile
Hi,

In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered.
If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor, FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, SourceFunction should be import.

By the way, I'm not sure whether this mail will be displayed well because it's the first time for me to write such a formatted one. If not, please let me know.


------------------------------------------------------------------------------------
Detailed reply for question 1:

CheckpointedFunction is not necessary to trigger or complete a checkpoint. 

A job will trigger a checkpoint when all its tasks are running and checkpointing was enabled using code in [1], such as env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a checkpoint because the source was not running at the time of the first checkpoint (rather than checkpoint was not enabled).

However, for some functions and operators, checkpoints make no sense. 

Take the code in that word count demo for an example:

source → flatMap → keyBy → sum → print

Assume the data:

aaa bbb aaa
bbb ccc
aaa
bbb
aaa ccc ddd

And assume the job failed because of somewhat error after dealing with the first 3 lines.

aaa bbb aaa
bbb ccc
aaa
-- job fail
-- job recover
bbb
aaa ccc ddd

When the source operator and the sum operator recover from a failure, they'll need a checkpoint.
The source operator wants to know where to start (the 4th line) because some data may already be done before the failure. The sum operator wants to know what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that when new sentences coming they can be calculated correctly.

However, the flatMap operator doesn't need a checkpoint at all. Whenever a sentence comes, split it. This operator requires nothing from a checkpoint to recover. 

CheckpointedFunction in [2] is to distinguish these stateful functions from all the functions (It's not the only way, but the most flexible one). See [3] and [4] for more information.

------------------------------------------------------------------------------------
Detailed reply for question 2:

Here's my sample code for ExampleCountSource.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
    private long count = 0L;
    private volatile boolean isRunning = true;

    private transient ListState<Long> checkpointedCount;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRunning && count < 1000) {
            // this synchronized block ensures that state checkpointing,
            // internal state updates and emission of elements are an atomic operation
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.checkpointedCount = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<>("count", Long.class));

        if (context.isRestored()) {
            for (Long count : this.checkpointedCount.get()) {
                this.count = count;
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkpointedCount.clear();
        this.checkpointedCount.add(count);
    }
}


[1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java
[2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[3]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html
[4]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html

Regards,
Smile