Hi,
I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working. The code is below: package org.apache.flink.flink_quickstart_java; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; //import org.apache.flink.streaming.examples.wordcount.util.WordCountData; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.api.common.restartstrategy.RestartStrategies; public class StreamingJob { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms env.enableCheckpointing(1000);
// to set minimum progress time to happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 10000 ms, or are discarded env.getCheckpointConfig().setCheckpointTimeout(10000);
// set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // AT_LEAST_ONCE
// allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 )); // number of restart attempts , delay in each restart // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream<String> text = null; if (params.has("input")) { // union all the inputs from text files for (String input : params.getMultiParameterRequired("input")) { if (text == null) { text = env.readTextFile(input); } else { text = text.union(env.readTextFile(input)); } } Preconditions.checkNotNull(text, "Input DataStream should not be null."); } else { System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); // get default test text data //text = env.fromElements(WordCountData.WORDS); } DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(value -> value.f0) .sum(1); // emit result if (params.has("output")) { counts.writeAsText(params.get("output")); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } // execute program env.execute("Checkpointing"); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the * form of "(word,1)" ({@code Tuple2<String, Integer>}). */ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } Screen Shot 2021-03-08 at 2.55.29 PM.png (194K) Download Attachment |
Hi, Please read the previous email (and also this email) to answer me. Here in the attached pic, the interval showed 1s. and the job is finished 1939ms According to the code in the previous email, at least there should be some checkpoint triggered and completed. However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email) What is the problem? Why does the completed checkpointing not work in here? Thank you On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <[hidden email]> wrote:
Screen Shot 2021-03-08 at 5.48.34 PM.png (811K) Download Attachment |
The picture in first e-mail shows that job was completed in 93ms
From: Abdullah bin Omar <[hidden email]>
Sent: Monday, March 8, 2021 3:53 PM To: [hidden email] <[hidden email]> Subject: Re: Trigger and completed Checkpointing do not appeared Hi,
Please read the previous email (and also this email) to answer me.
Here in the attached pic, the interval showed 1s. and the job is finished 1939ms
According to the code in the previous email, at least there should be some checkpoint triggered and completed.
However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email)
What is the problem? Why does the completed checkpointing not work in here?
Thank you
On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <[hidden email]> wrote:
|
Hi, Thank you for your reply. I checked by changing enable checkpointing time (I mean by changing the all possible time; however, it still does not show the triggered and completed checkpoint). Now the code is run in more than 1500 ms (because I increase the size of the input file so that it can take more time to complete the job. As the interval shows 1s, so I increase the runtime more than 1s to observe whether the checkpoint will appear or not. However, it still does not in the apache flink UI) See the attached pic at the attachment. what is the problem? why the checkpoint trigger or completed can not show in the apache flink UI? Thank you! On Mon, Mar 8, 2021 at 7:19 PM Abdullah bin Omar <[hidden email]> wrote:
Screen Shot 2021-03-08 at 5.48.34 PM.png (811K) Download Attachment Screen Shot 2021-03-08 at 7.27.25 PM.png (716K) Download Attachment |
Hi,
Could you please change the source to an endless one? For example a Kafka source or a custom source that implements SourceFunction([1])? env.readTextFile() won't wait for all data to be finished, but exit immediately after telling readers what to read. So it may exit before the first checkpoint being triggered. See [2] for more information. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String- Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Smile, Thank you for your reply. I read [1] according to the last email. I will have to add implements SourceFunction <Long> CheckpointedFunction with the main class. Then calling run() and cancel() inside the main class. Is it correct? I just run the sample code from apache flink. I can not understand everything. In this case, could you please inform me where I will have to change in my code (it is in the first email) and which one to replace? If you can answer, it will be really helpful for me as a newbie. Thank you in advance! On Tue, Mar 9, 2021 at 4:07 AM Abdullah bin Omar <[hidden email]> wrote:
|
Hi,
After implementing SourceFunction, you can use it to create a DataStream using env.addSource() in your main method. For example, if you have your custom source class with the name CustomSource that implements SourceFunction<String>, then it can be used for getting input data and the if-statement after it can be removed: // get input data DataStream<String> text = env.addSource(new CustomSource()); ExampleCountSource in [1] implements SourceFunction<Long>, which can be used to get a DataStream with type Long, not String, such as: DataStream<Long> numbers = env.addSource(new ExampleCountSource()); If you only want to have a look at how checkpoint being triggered, see [2] for another sample that has a custom endless source named TransactionSource. When enabled checkpoint it can be triggered with your rules. It might be easier for a beginner than implement it by yourself. However, it may not restore from a checkpoint perfectly since it doesn't implement CheckpointedFunction. That is to say, if you want your source to be restored successfully after failures, CheckpointedFunction is also necessary and ExampleCountSource in [1] is a good example. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, CheckpointedFunction is needed to trigger and complete the checkpoint, is it right? What is the work for the code in here [1]? it [1] said to enable checkpointing or some other function. What is the difference between [1] and [2]? If we use ExampleCountSource[2] example, what will be the import portion at the beginning of code and the pom.xml? Thank you for your service! On Tue, Mar 9, 2021 at 6:21 AM Smile <[hidden email]> wrote: Hi, |
Hi, In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered. If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor, FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, SourceFunction should be import. By the way, I'm not sure whether this mail will be displayed well because it's the first time for me to write such a formatted one. If not, please let me know. ------------------------------------------------------------------------------------ Detailed reply for question 1: CheckpointedFunction is not necessary to trigger or complete a checkpoint. A job will trigger a checkpoint when all its tasks are running and checkpointing was enabled using code in [1], such as env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a checkpoint because the source was not running at the time of the first checkpoint (rather than checkpoint was not enabled). However, for some functions and operators, checkpoints make no sense. Take the code in that word count demo for an example: source → flatMap → keyBy → sum → print Assume the data: aaa bbb aaa bbb ccc aaa bbb aaa ccc ddd And assume the job failed because of somewhat error after dealing with the first 3 lines. aaa bbb aaa bbb ccc aaa -- job fail -- job recover bbb aaa ccc ddd When the source operator and the sum operator recover from a failure, they'll need a checkpoint. The source operator wants to know where to start (the 4th line) because some data may already be done before the failure. The sum operator wants to know what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that when new sentences coming they can be calculated correctly. However, the flatMap operator doesn't need a checkpoint at all. Whenever a sentence comes, split it. This operator requires nothing from a checkpoint to recover. CheckpointedFunction in [2] is to distinguish these stateful functions from all the functions (It's not the only way, but the most flexible one). See [3] and [4] for more information. ------------------------------------------------------------------------------------ Detailed reply for question 2: Here's my sample code for ExampleCountSource.java
[1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [3]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html [4]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html Regards, Smile |
Free forum by Nabble | Edit this page |