Hi
I am new to the flink world, and trying to understand. Currently, I am using Flink 1.3.2 on a small cluster of 4 nodes, I have configured checkpoint directory at HDFS, and run streaming word count example with my own custom input file of 63M entries, I enabled checkpoint every one second {/env.enableCheckpointing(1000)/} The problem I am facing is checkpoint is only triggered once after 1 second, but no checkpoint afterwards, I run application for more than 5 minutes, but checkpoint history shows only 1 checkpoint triggered and was successful. I don't know why checkpoint not triggering after every second? Please suggest me what is wrong? Thanks in anticipation. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Did you check task manager and job manager logs for any problems? Piotrek > On 5 Feb 2018, at 03:19, syed <[hidden email]> wrote: > > Hi > I am new to the flink world, and trying to understand. Currently, I am using > Flink 1.3.2 on a small cluster of 4 nodes, > I have configured checkpoint directory at HDFS, and run streaming word count > example with my own custom input file of 63M entries, > I enabled checkpoint every one second {/env.enableCheckpointing(1000)/} > > The problem I am facing is checkpoint is only triggered once after 1 second, > but no checkpoint afterwards, I run application for more than 5 minutes, but > checkpoint history shows only 1 checkpoint triggered and was successful. I > don't know why checkpoint not triggering after every second? > Please suggest me what is wrong? > Thanks in anticipation. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Please analyse what was going on the TaskManager and JobManager before this “task is not being executed at the moment”. What is the reason why it is not being executed? Was there some exception? Depending on your setup, you might need to check your stdout/stderr files (if your code is printing some errors). Other issue might be if your operators/functions are initialising very slowly or being stuck somewhere. Thanks, Piotrek
|
I ran into a similar issue.
Since it is a "Custom File Source", the first source just listing folder/file path for all existing files. Next operator "Split Reader" will read the content of the file. "Custom File Source" went to "finished" state after first couple secs. That's way we got this error message "Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint". Because the "Custom File Source" finished already. Is this by design? Although the "Custom File Source" finished in secs, the rest of the pipeline can running for hours or days. Whenever anything went wrong, the pipeline will restart and start to reading from the beginning again, since there is not any checkpoint. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
It’s not considered as a bug, only a missing not yet implemented feature (check my previous responses for the Jira ticket). Generally speaking using file input stream for DataStream programs is not very popular, thus this was so far low on our priority list. Piotrek > On 10 May 2018, at 06:26, xiatao123 <[hidden email]> wrote: > > I ran into a similar issue. > > Since it is a "Custom File Source", the first source just listing > folder/file path for all existing files. Next operator "Split Reader" will > read the content of the file. > "Custom File Source" went to "finished" state after first couple secs. > That's way we got this error message "Custom File Source (1/1) is not being > executed at the moment. Aborting checkpoint". Because the "Custom File > Source" finished already. > > Is this by design? Although the "Custom File Source" finished in secs, the > rest of the pipeline can running for hours or days. Whenever anything went > wrong, the pipeline will restart and start to reading from the beginning > again, since there is not any checkpoint. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for the reply Piotr. Which jira ticket were you refer to? We were trying to use the same code for normal stream process to process very old historical backfill data. The problem for me right now is that, backfill x years of data will be very slow. And I cannot have any checkpoint during the whole time since FileSource is "Finished". When anything goes wrong in the middle, the whole pipeline will start over from beginning again. Anyway I can skip the checkpoint of "Source: Custom File Source" but still having checkpoint on "Split Reader: Custom File Source"? Thanks, Tao On Fri, May 11, 2018 at 4:34 AM, Piotr Nowojski <[hidden email]> wrote: Hi, |
Hi,
1. What if you set `org.apache.flink.streaming.api.functions.source.FileProcessingMode#PROCESS_CONTINUOUSLY`? This will prevent split source from finishing, so checkpointing should work fine. Downside is that you would have to on your own, manually, determine whether the job has finished/completed or not. Other things that come to my mind would require some coding: 2. Look at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#createFileInput, copy it’s code and replace `ContinuousFileMonitoringFunction` with something that finishes on some custom event/action/condition. The code that you would have to modify/replace is alongside usages of `FileProcessingMode monitoringMode`. 3. Probably even more complicated, you could modify ContinuousFileReaderOperator to be a source function, with statically precomputed list of files/splits to process (they would have to be assigned/distributed taking parallelism into account). Thus your source functions could complete not when splits are generated, but when they have finished reading splits. Piotrek
|
Great, I will give a try. Thanks, Tao On Tue, May 15, 2018 at 12:50 AM, Piotr Nowojski <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |