|
I’m trying to stream data to a file on an S3 compatible system (MINIO):
DataStream<Row> resultStream = tEnv.toAppendStream(log_counts, Types.ROW(Types.STRING, Types.STRING, Types.LONG));
final StreamingFileSink<Row> sink =
StreamingFileSink.forRowFormat(
new Path("s3://argo-artifacts/"), new SimpleStringEncoder<Row>("UTF-8"))
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
resultStream.addSink(sink);
... No file is generated. However this method successfully writes a file to the bucket:
resultStream.writeAsText("s3://argo-artifacts/output.txt");
Here are my config settings:
state.backend: filesystem
state.checkpoints.dir: s3://flink/checkpoints
state.savepoints.dir: s3://flink/savepoints
s3.endpoint: http://10.43.42.255:9000
# s3.endpoint: http://10.43.70.109:9000
s3.path-style-access: true
s3.path.style.access: true
s3.access-key: qCEwcLzhi7xfhl5R6sXLn93a5brgRtBs
s3.secret-key: fJWZFviCxWUrfjQoXZ4UAjN4YviQXQOz
-- Robert Cullen 240-475-4490
|