File not generated using StreamingFileSink path 1.12.0

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

File not generated using StreamingFileSink path 1.12.0

Robert Cullen

I’m trying to stream data to a file on an S3 compatible system (MINIO):

DataStream<Row> resultStream = tEnv.toAppendStream(log_counts, Types.ROW(Types.STRING, Types.STRING, Types.LONG));

final StreamingFileSink<Row> sink =
        StreamingFileSink.forRowFormat(
                new Path("s3://argo-artifacts/"), new SimpleStringEncoder<Row>("UTF-8"))
                .withBucketAssigner(new KeyBucketAssigner())
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();

resultStream.addSink(sink);

... No file is generated. However this method successfully writes a file to the bucket:

resultStream.writeAsText("s3://argo-artifacts/output.txt");

Here are my config settings:

    state.backend: filesystem
    state.checkpoints.dir: s3://flink/checkpoints
    state.savepoints.dir: s3://flink/savepoints
    s3.endpoint: http://10.43.42.255:9000
    # s3.endpoint: http://10.43.70.109:9000
    s3.path-style-access: true
    s3.path.style.access: true
    s3.access-key: qCEwcLzhi7xfhl5R6sXLn93a5brgRtBs
    s3.secret-key: fJWZFviCxWUrfjQoXZ4UAjN4YviQXQOz
--
Robert Cullen
240-475-4490