We are creating files in S3 and we want to update the S3 object metadata with some security-related information for governance purposes.
Right now Apache Flink totally abstracts how and when S3 object gets created in the system. Is there a way that we can pass the S3 object metadata and update it for the object created. If not, How can we know when Apache Flink has created an S3 file. Deterministically. Since once its created in S3 we can write Java code after that to add those metadata information? -- Thank you and regards, Dhurandar |
Hi Dhurandar, With my understand I think what you need is to get notified when a file is written successfully (committed) on the S3 FileSystem. However, currently there is no public API for the listener and there an issue tracking it [1]. With the current version, one possible method comes to me is that may have to use reflection to access some internal states of StreamFileSink to get the committed files. As a whole, you may need to implement a customized StreamingFileSink and override the notifyCheckpointComplete method, where the new S3 file get committed and visible: class CustomizedStreamingFileSink extends StreamingFileSink { public void notifyCheckpointComplete(long checkpointId) throws Exception { // 1. First use reflection to get the list of files will be committed in this call. // The list of files should be get via StreamingFile -> ( StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there will be multiple Buckets) -> (for each Bucket) pendingFileRecoverablesPerCheckpoint // Then we could get the iterator of pending files to commit in this time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2] // Then you could get the S3 object names via (PendingFileRecover if 1.11 is used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName. super.notifyCheckpointComplete(checkpointId); // Get files committed normally. // 3. Then here could start writing meta info for S3 objects recorded in step 1. } } For a single file it may get committed multiple times, therefore the writing meta info action must also be able to handle the repeat writing. Another possible method will be to use a seperate source operator to periodly scans the S3 file system to detect the newly added files and modify their meta data. There should be embedding source function ContinuousFileMonitoringFunction[3] for this work, and I think it might be modified or reused for scanning the files. Best, Yun [2] https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268 ------------------------------------------------------------------ |
In reply to this post by dhurandar S
Hi, Maybe a bit crazy idea, but you could also try extending the S3 filesystem and add the metadata there. You could write a thin wrapper for the existing filesystem. If you'd like to go that route you might want to check this page[1]. You could use that filesystem with your custom scheme. Best, Dawid [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/#file-systems On 19/06/2020 21:19, dhurandar S wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |