Checkpointing & File stream with

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpointing & File stream with

Sung Gon Yi
Hello,

I work on joining two streams, one is from Kafka and another is from a file (small size).
Stream processing works well, but checkpointing is failed with following message.
The file only has less than 100 lines and the pipeline related file reading is finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) of job d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. Aborting checkpoint.
——

Custom File Source is related following codes
——
DataStream<String> specificationFileStream = env.readTextFile(specFile)
——

To perform checkpointing successfully, I write a code of custom source function to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon

Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing & File stream with

Yun Tang
Hi Sung

How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type when reading data from HDFS. FileProcessingMode.PROCESS_CONTINUOUSLY would periodically monitor the source while default FileProcessingMode.PROCESS_ONCE would only process once the data and exit.


Best
Yun Tang

From: Sung Gon Yi <[hidden email]>
Sent: Tuesday, June 18, 2019 14:13
To: [hidden email]
Subject: Checkpointing & File stream with
 
Hello,

I work on joining two streams, one is from Kafka and another is from a file (small size).
Stream processing works well, but checkpointing is failed with following message.
The file only has less than 100 lines and the pipeline related file reading is finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) of job d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. Aborting checkpoint.
——

Custom File Source is related following codes
——
DataStream<String> specificationFileStream = env.readTextFile(specFile)
——

To perform checkpointing successfully, I write a code of custom source function to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon

Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing & File stream with

Sung Gon Yi
It works well now with following codes:
——
TextInputFormat specFileFormat = new TextInputFormat(new Path(specFile));
specFileFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> specificationFileStream = env
.readFile(specFileFormat, specFile, FileProcessingMode.PROCESS_CONTINUOUSLY, 100L, BasicTypeInfo.STRING_TYPE_INFO)
——

Thanks.

On 18 Jun 2019, at 3:38 PM, Yun Tang <[hidden email]> wrote:

Hi Sung

How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type when reading data from HDFS.FileProcessingMode.PROCESS_CONTINUOUSLY would periodically monitor the source while default FileProcessingMode.PROCESS_ONCE would only process once the data and exit.


Best
Yun Tang

From: Sung Gon Yi <[hidden email]>
Sent: Tuesday, June 18, 2019 14:13
To: [hidden email]
Subject: Checkpointing & File stream with
 
Hello,

I work on joining two streams, one is from Kafka and another is from a file (small size).
Stream processing works well, but checkpointing is failed with following message.
The file only has less than 100 lines and the pipeline related file reading is finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) of job d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. Aborting checkpoint.
——

Custom File Source is related following codes
——
DataStream<String> specificationFileStream = env.readTextFile(specFile)
——

To perform checkpointing successfully, I write a code of custom source function to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon