Not all files are processed? Stream source with ContinuousFileMonitoringFunction

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Not all files are processed? Stream source with ContinuousFileMonitoringFunction

juanmirocks
Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to monitor a folder for new incoming files. I have the problem that not all the files that are sent to the folder get processed / triggered by the function. Specific details of my workflow is that I send up to 1k files per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the `ContinuousFileMonitoringFunction` class some time ago (https://issues.apache.org/jira/browse/FLINK-8046): if two or more files shared the very same timestamp, only the first one (non-deterministically chosen) would be processed. However, I patched the file myself to fix that problem by using a LinkedHashMap to remember which files had been really processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once to the same folder) do not even get triggered/activated/registered by the class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

Reply | Threaded
Open this post in threaded view
|

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

Fabian Hueske-2
Hi,

Which file system are you reading from? If you are reading from S3, this might be cause by S3's eventual consistency property.
Have a look at FLINK-9940 [1] for a more detailed discussion.
There is also an open PR [2], that you could try to patch the source operator with.

Best, Fabian


Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <[hidden email]>:
Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to monitor a folder for new incoming files. I have the problem that not all the files that are sent to the folder get processed / triggered by the function. Specific details of my workflow is that I send up to 1k files per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the `ContinuousFileMonitoringFunction` class some time ago (https://issues.apache.org/jira/browse/FLINK-8046): if two or more files shared the very same timestamp, only the first one (non-deterministically chosen) would be processed. However, I patched the file myself to fix that problem by using a LinkedHashMap to remember which files had been really processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once to the same folder) do not even get triggered/activated/registered by the class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

Reply | Threaded
Open this post in threaded view
|

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

juanmirocks
I’m using both a local (Unix) file system and hdfs.

I’m going to check those to get ideas, thank you!

I’m also checking the internal code of the class and my own older patch code.
On Fri 12. Oct 2018 at 21:32, Fabian Hueske <[hidden email]> wrote:
Hi,

Which file system are you reading from? If you are reading from S3, this might be cause by S3's eventual consistency property.
Have a look at FLINK-9940 [1] for a more detailed discussion.
There is also an open PR [2], that you could try to patch the source operator with.

Best, Fabian


Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <[hidden email]>:
Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to monitor a folder for new incoming files. I have the problem that not all the files that are sent to the folder get processed / triggered by the function. Specific details of my workflow is that I send up to 1k files per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the `ContinuousFileMonitoringFunction` class some time ago (https://issues.apache.org/jira/browse/FLINK-8046): if two or more files shared the very same timestamp, only the first one (non-deterministically chosen) would be processed. However, I patched the file myself to fix that problem by using a LinkedHashMap to remember which files had been really processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once to the same folder) do not even get triggered/activated/registered by the class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

Reply | Threaded
Open this post in threaded view
|

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

juanmirocks
Update:

not 100% sure, but I think I fixed my bug. This is what I did:

I reduced (quite a lot) the maximum number of parallel operations in my `AsyncDataStream`. I had set it initially to 1000. The default of 100 did not work for me either. But somehow when I set the value to 10, everything is working fine now.

```
AsyncDataStream.unorderedWait(dataSource, new AsyncProcessing(), 5, TimeUnit.MINUTES, 10)
```

Perhaps too much memory was used at once and therefore some files were discarded? Don't know, but hopefully my solutions throws some clues to other people in the future.

On Sat, 13 Oct 2018 at 12:48 Juan Miguel Cejuela <[hidden email]> wrote:
I’m using both a local (Unix) file system and hdfs.

I’m going to check those to get ideas, thank you!

I’m also checking the internal code of the class and my own older patch code.
On Fri 12. Oct 2018 at 21:32, Fabian Hueske <[hidden email]> wrote:
Hi,

Which file system are you reading from? If you are reading from S3, this might be cause by S3's eventual consistency property.
Have a look at FLINK-9940 [1] for a more detailed discussion.
There is also an open PR [2], that you could try to patch the source operator with.

Best, Fabian


Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <[hidden email]>:
Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to monitor a folder for new incoming files. I have the problem that not all the files that are sent to the folder get processed / triggered by the function. Specific details of my workflow is that I send up to 1k files per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the `ContinuousFileMonitoringFunction` class some time ago (https://issues.apache.org/jira/browse/FLINK-8046): if two or more files shared the very same timestamp, only the first one (non-deterministically chosen) would be processed. However, I patched the file myself to fix that problem by using a LinkedHashMap to remember which files had been really processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once to the same folder) do not even get triggered/activated/registered by the class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net

--
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net