I am reading files from a folder suppose
/files/* Files are pushed into that folder. /files/file1_2018_03_09.csv /files/file2_2018_03_09.csv Flink is reading files from the folder fine but as the no of files grows how do I move the files into another folder? Currently I am using cronjob to move files every 10 minutes but I get error in flink that file is not found. Is there anyway to move files not being read by flink? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Why don’t you let your flink job move them once it’s done?
> On 9. Mar 2018, at 03:12, flinkuser101 <[hidden email]> wrote: > > I am reading files from a folder suppose > > /files/* > > Files are pushed into that folder. > > /files/file1_2018_03_09.csv > /files/file2_2018_03_09.csv > > Flink is reading files from the folder fine but as the no of files grows how > do I move the files into another folder? Currently I am using cronjob to > move files every 10 minutes but I get error in flink that file is not found. > Is there anyway to move files not being read by flink? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Is there any way to do that? I have been searching for way to do that but in
vain. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
FileSystem class provides by Flink.
|
Where does Flink expose filesystem? Is it from env? or inputstream?
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(url.toString())); DataStream<String> inputStream = env.readFile(format,url.toString(),FileProcessingMode.PROCESS_CONTINUOUSLY, 1); DataStream parsedStream = inputStream .map(new MapFunction<String, Content>() {}) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Path has a method getFileSystem
> On 10. Mar 2018, at 10:36, flinkuser101 <[hidden email]> wrote: > > Where does Flink expose filesystem? Is it from env? or inputstream? > > TextInputFormat format = new TextInputFormat(new > org.apache.flink.core.fs.Path(url.toString())); > DataStream<String> inputStream = > env.readFile(format,url.toString(),FileProcessingMode.PROCESS_CONTINUOUSLY, > 1); > > DataStream parsedStream = inputStream > .map(new MapFunction<String, Content>() {}) > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by flinkuser101
Alternatively static method FileSystem.get
> On 10. Mar 2018, at 10:36, flinkuser101 <[hidden email]> wrote: > > Where does Flink expose filesystem? Is it from env? or inputstream? > > TextInputFormat format = new TextInputFormat(new > org.apache.flink.core.fs.Path(url.toString())); > DataStream<String> inputStream = > env.readFile(format,url.toString(),FileProcessingMode.PROCESS_CONTINUOUSLY, > 1); > > DataStream parsedStream = inputStream > .map(new MapFunction<String, Content>() {}) > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This post was updated on .
From flink thread it seems like there is thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ContinuousFileMonitoringFunction-deleting-file-after-processing-td9604.html#a9606> showing this. But just wonder is this resolved?Where to find if final split is complete for which file? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
so, if you want to have your file parsed try to stay away from flink file
parser (v1.4). Use nifi to parse files and then you could use Kafka or Flink. My data pipeline looks like: ftp <-> nifi <-> kafka <-> flink -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |