Reading csv-files

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading csv-files

Esa Heikkinen-2

I'd want to read csv-files, which includes time series data and one
column is timestamp.

Is it better to use addSource() (like in Data-artisans
RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not
found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa

Reply | Threaded
Open this post in threaded view
|

Re: Reading csv-files

Fabian Hueske-2
Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.
If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.
This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).
Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

Unfortunately, Flink does not provide good built-in support to read files in a specific order.
If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.

If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:

I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa


Reply | Threaded
Open this post in threaded view
|

Re: Reading csv-files

Esa Heikkinen-2


Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa


Fabian Hueske kirjoitti 27.2.2018 klo 22:32:
Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.
If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.
This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).
Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

Unfortunately, Flink does not provide good built-in support to read files in a specific order.
If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.

If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:

I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa



Reply | Threaded
Open this post in threaded view
|

Re: Reading csv-files

Fabian Hueske-2
Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a DataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the files in the order of increasing timestamps.
As I said before, you can do that by providing a custom InputSplitAssigner that hands out the splits in order of their timestamps.
The timestamp order would need to be encoded in the file name because the assigner cannot look into the file.
Reading unsplitted files in a single task makes the problem a bit easier to handle, but parallel reads are also possible.

The RideCleansing example that you are referring to, does not have these problems because the source reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian


2018-02-27 22:14 GMT+01:00 Esa Heikkinen <[hidden email]>:


Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa


Fabian Hueske kirjoitti 27.2.2018 klo 22:32:
Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.
If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.
This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).
Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

Unfortunately, Flink does not provide good built-in support to read files in a specific order.
If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.

If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:

I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa




Reply | Threaded
Open this post in threaded view
|

RE: Reading csv-files

Esa Heikkinen

Because I have no time to learn all features of Flink and because there can be some issues in this my case, I am very interested about implementing external “logs replayer” or some batch to stream data converter.

 

Do you have any ideas or suggestions how to build this kind of logs replayer ? Or could it be even found at the ready ?

Could Kafka do something like this ?

 

I think I also can write this logs replayer by Python.

 

What kind of parallel streams would be best and easiest for Flink ?

 

By the way, I am writing conference paper about comparing Flink and my LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log File Analyzer for Mining Expected Behavior from Log Files):

https://www.researchgate.net/profile/Timo_Haemaelaeinen/publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf

 

LOGDIG is very simple and slow analyzer and it runs only in local computer (at this moment), but it is capable to analyze very complex cases from many parallel log files. The analysis of LOGDIG is close to CEP. I have written it by Python.

 

I don’t know whether Flink is the best benchmarking target, but I do not know better. I also tried Spark, but it also had its own problems. For example CEP is not good in Spark than in Flink.

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 27, 2018 11:27 PM
To: Esa Heikkinen <[hidden email]>
Cc: [hidden email]
Subject: Re: Reading csv-files

 

Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a DataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the files in the order of increasing timestamps.

As I said before, you can do that by providing a custom InputSplitAssigner that hands out the splits in order of their timestamps.
The timestamp order would need to be encoded in the file name because the assigner cannot look into the file.

Reading unsplitted files in a single task makes the problem a bit easier to handle, but parallel reads are also possible.

 

The RideCleansing example that you are referring to, does not have these problems because the source reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian

 

 

2018-02-27 22:14 GMT+01:00 Esa Heikkinen <[hidden email]>:

 

Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa

 

Fabian Hueske kirjoitti 27.2.2018 klo 22:32:

Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.

If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.

This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).

Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

 

Unfortunately, Flink does not provide good built-in support to read files in a specific order.

If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.


If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

 

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:


I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Reading csv-files

Fabian Hueske-2
Hi Esa,

IMO, the easiest approach would be to implement a custom source function that reads the CSV files line-wise (in the correct timestamp order) and extracts timestamps.
At the end of each file, you can emit a watermark.
The order of files can either be hardcoded or determined from the file name.

This approach is similar to the source function in the RideCleansing exercise [1] (without the alignment of timestamps with the actual time).

Once you have a DataStream with correctly assigned timestamps and watermarks, you should be able to use the CEP library.

Best, Fabian

2018-02-28 10:47 GMT+01:00 Esa Heikkinen <[hidden email]>:

Because I have no time to learn all features of Flink and because there can be some issues in this my case, I am very interested about implementing external “logs replayer” or some batch to stream data converter.

 

Do you have any ideas or suggestions how to build this kind of logs replayer ? Or could it be even found at the ready ?

Could Kafka do something like this ?

 

I think I also can write this logs replayer by Python.

 

What kind of parallel streams would be best and easiest for Flink ?

 

By the way, I am writing conference paper about comparing Flink and my LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log File Analyzer for Mining Expected Behavior from Log Files):

https://www.researchgate.net/profile/Timo_Haemaelaeinen/publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf

 

LOGDIG is very simple and slow analyzer and it runs only in local computer (at this moment), but it is capable to analyze very complex cases from many parallel log files. The analysis of LOGDIG is close to CEP. I have written it by Python.

 

I don’t know whether Flink is the best benchmarking target, but I do not know better. I also tried Spark, but it also had its own problems. For example CEP is not good in Spark than in Flink.

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 27, 2018 11:27 PM
To: Esa Heikkinen <[hidden email]>
Cc: [hidden email]
Subject: Re: Reading csv-files

 

Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a DataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the files in the order of increasing timestamps.

As I said before, you can do that by providing a custom InputSplitAssigner that hands out the splits in order of their timestamps.
The timestamp order would need to be encoded in the file name because the assigner cannot look into the file.

Reading unsplitted files in a single task makes the problem a bit easier to handle, but parallel reads are also possible.

 

The RideCleansing example that you are referring to, does not have these problems because the source reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian

 

 

2018-02-27 22:14 GMT+01:00 Esa Heikkinen <[hidden email]>:

 

Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa

 

Fabian Hueske kirjoitti 27.2.2018 klo 22:32:

Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.

If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.

This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).

Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

 

Unfortunately, Flink does not provide good built-in support to read files in a specific order.

If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.


If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

 

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:


I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa

 

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Reading csv-files

Esa Heikkinen

Hi

 

Should the custom source function be written by Java, but no Scala, like in that RideCleansing exercise ?

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Thursday, March 1, 2018 11:23 AM
To: Esa Heikkinen <[hidden email]>
Cc: [hidden email]
Subject: Re: Reading csv-files

 

Hi Esa,

IMO, the easiest approach would be to implement a custom source function that reads the CSV files line-wise (in the correct timestamp order) and extracts timestamps.

At the end of each file, you can emit a watermark.

The order of files can either be hardcoded or determined from the file name.

 

This approach is similar to the source function in the RideCleansing exercise [1] (without the alignment of timestamps with the actual time).

Once you have a DataStream with correctly assigned timestamps and watermarks, you should be able to use the CEP library.

Best, Fabian

 

2018-02-28 10:47 GMT+01:00 Esa Heikkinen <[hidden email]>:

Because I have no time to learn all features of Flink and because there can be some issues in this my case, I am very interested about implementing external “logs replayer” or some batch to stream data converter.

 

Do you have any ideas or suggestions how to build this kind of logs replayer ? Or could it be even found at the ready ?

Could Kafka do something like this ?

 

I think I also can write this logs replayer by Python.

 

What kind of parallel streams would be best and easiest for Flink ?

 

By the way, I am writing conference paper about comparing Flink and my LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log File Analyzer for Mining Expected Behavior from Log Files):

https://www.researchgate.net/profile/Timo_Haemaelaeinen/publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf

 

LOGDIG is very simple and slow analyzer and it runs only in local computer (at this moment), but it is capable to analyze very complex cases from many parallel log files. The analysis of LOGDIG is close to CEP. I have written it by Python.

 

I don’t know whether Flink is the best benchmarking target, but I do not know better. I also tried Spark, but it also had its own problems. For example CEP is not good in Spark than in Flink.

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 27, 2018 11:27 PM
To: Esa Heikkinen <
[hidden email]>
Cc:
[hidden email]
Subject: Re: Reading csv-files

 

Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a DataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the files in the order of increasing timestamps.

As I said before, you can do that by providing a custom InputSplitAssigner that hands out the splits in order of their timestamps.
The timestamp order would need to be encoded in the file name because the assigner cannot look into the file.

Reading unsplitted files in a single task makes the problem a bit easier to handle, but parallel reads are also possible.

 

The RideCleansing example that you are referring to, does not have these problems because the source reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian

 

 

2018-02-27 22:14 GMT+01:00 Esa Heikkinen <[hidden email]>:

 

Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa

 

Fabian Hueske kirjoitti 27.2.2018 klo 22:32:

Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.

If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.

This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).

Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

 

Unfortunately, Flink does not provide good built-in support to read files in a specific order.

If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.


If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

 

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:


I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Reading csv-files

Fabian Hueske-2
That does not matter.

2018-03-01 13:32 GMT+01:00 Esa Heikkinen <[hidden email]>:

Hi

 

Should the custom source function be written by Java, but no Scala, like in that RideCleansing exercise ?

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Thursday, March 1, 2018 11:23 AM


To: Esa Heikkinen <[hidden email]>
Cc: [hidden email]
Subject: Re: Reading csv-files

 

Hi Esa,

IMO, the easiest approach would be to implement a custom source function that reads the CSV files line-wise (in the correct timestamp order) and extracts timestamps.

At the end of each file, you can emit a watermark.

The order of files can either be hardcoded or determined from the file name.

 

This approach is similar to the source function in the RideCleansing exercise [1] (without the alignment of timestamps with the actual time).

Once you have a DataStream with correctly assigned timestamps and watermarks, you should be able to use the CEP library.

Best, Fabian

 

2018-02-28 10:47 GMT+01:00 Esa Heikkinen <[hidden email]>:

Because I have no time to learn all features of Flink and because there can be some issues in this my case, I am very interested about implementing external “logs replayer” or some batch to stream data converter.

 

Do you have any ideas or suggestions how to build this kind of logs replayer ? Or could it be even found at the ready ?

Could Kafka do something like this ?

 

I think I also can write this logs replayer by Python.

 

What kind of parallel streams would be best and easiest for Flink ?

 

By the way, I am writing conference paper about comparing Flink and my LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log File Analyzer for Mining Expected Behavior from Log Files):

https://www.researchgate.net/profile/Timo_Haemaelaeinen/publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf

 

LOGDIG is very simple and slow analyzer and it runs only in local computer (at this moment), but it is capable to analyze very complex cases from many parallel log files. The analysis of LOGDIG is close to CEP. I have written it by Python.

 

I don’t know whether Flink is the best benchmarking target, but I do not know better. I also tried Spark, but it also had its own problems. For example CEP is not good in Spark than in Flink.

 

Best, Esa

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, February 27, 2018 11:27 PM
To: Esa Heikkinen <
[hidden email]>
Cc:
[hidden email]
Subject: Re: Reading csv-files

 

Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a DataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the files in the order of increasing timestamps.

As I said before, you can do that by providing a custom InputSplitAssigner that hands out the splits in order of their timestamps.
The timestamp order would need to be encoded in the file name because the assigner cannot look into the file.

Reading unsplitted files in a single task makes the problem a bit easier to handle, but parallel reads are also possible.

 

The RideCleansing example that you are referring to, does not have these problems because the source reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian

 

 

2018-02-27 22:14 GMT+01:00 Esa Heikkinen <[hidden email]>:

 

Hi

Thanks for the answer. All csv-files are already present and they will not change during the processing.

Because Flink can read many streams in parallel, i think it is also possbile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to streams internally in Flink ? But the problem may be how to synchronize parallel reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, which generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know if it otherwise appropriate to my purpose.

Best, Esa

 

Fabian Hueske kirjoitti 27.2.2018 klo 22:32:

Hi Esa,

Reading records from files with timestamps that need watermarks can be tricky.

If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order.

This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first).

Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.

 

Unfortunately, Flink does not provide good built-in support to read files in a specific order.

If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order.


If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks.

In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written.

Best, Fabian

 

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <[hidden email]>:


I'd want to read csv-files, which includes time series data and one column is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exercise) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa