Hi,
FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the stream will not end.
Simple `writeAsCsv(…)` on the other hand only flushes the output file on a stream end (see `OutputFormatSinkFunction`).
You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support `exactly-once`)
Piotrek
> On 2 Feb 2018, at 18:32, geoff halmo <
[hidden email]> wrote:
>
> Hi Flink community:
>
> I am testing Flink but can't write the final(18 or so elements out to disk)
>
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
>
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
>
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
>
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 1506553200000,2017-09-27T:19:00-4:00[user@computer]
>
> When I use .print instead of .writeCsv, the last line on console is
> 1506826800000,2017-09-30T23:00-400[America/New_York],21353