Hi,
I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs: 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2). 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3). 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3. 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI And a part of my code: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ``` I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin. Thanks again! |
Hi Roshan,
Your logs refer to a simple run without any failures or re-running from a savepoint, right? I am asking because I am trying to reproduce it by running a modified ParquetStreamingFileSinkITCase [1] and so far I cannot. The ITCase runs against the local filesystem, and not S3, but I added the OutputFileConfig and it seems that the part counter is increases as expected. Is there any other information that would help us reproduce the issue? Cheers, Kostas [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote: > > Hi, > > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs: > > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2). > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3). > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3. > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI > And a part of my code: > > ``` > > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > > // env.setParallelism(2); > env.enableCheckpointing(60000L); > ///PROPERTIES Added > Schema schema = bro_conn.getClassSchema(); > > OutputFileConfig config = OutputFileConfig > .builder() > .withPartSuffix(".parquet") > .build(); > > final StreamingFileSink<GenericRecord> sink = StreamingFileSink > .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema)) > // .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withOutputFileConfig(config) > // .withBucketAssigner(new PartitioningBucketAssigner()) > .build(); > > DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( > "kinesis", new SimpleStringSchema(), consumerConfig)); > > kinesis.flatMap(new JsonAvroParser()) > .addSink(sink); > > > env.execute("Bro Conn"); > > ``` > > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin. > > Thanks again! |
Sorry realized this came off the user list by mistake. Adding the thread back in. On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
|
Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
|
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3. Are you using "s3p" for Presto? This should also be done in order for Flink to understand where to use the one or the other. On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote: > > Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins > > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote: >> >> Sorry realized this came off the user list by mistake. Adding the thread back in. >> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote: >>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular. >>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote: >>>> >>>> Yes, this is why I reached out for further information. >>>> >>>> Incrementing the part counter is the responsibility of the >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail >>>> in the local FS. >>>> Now if it is on the S3 side, it would help if you have any more info, >>>> for example any logs from S3, to see if anything went wrong on their >>>> end. >>>> >>>> So your logs refer to normal execution, i.e. no failures and no >>>> restarting, right? >>>> >>>> Cheers, >>>> Kostas >>>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote: >>>> > >>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly. >>>> > >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote: >>>> >> >>>> >> Hi Roshan, >>>> >> >>>> >> Your logs refer to a simple run without any failures or re-running >>>> >> from a savepoint, right? >>>> >> >>>> >> I am asking because I am trying to reproduce it by running a modified >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >>>> >> The ITCase runs against the local filesystem, and not S3, but I added >>>> >> the OutputFileConfig and it seems that the part counter is increases >>>> >> as expected. >>>> >> >>>> >> Is there any other information that would help us reproduce the issue? >>>> >> >>>> >> Cheers, >>>> >> Kostas >>>> >> >>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >>>> >> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote: >>>> >> > >>>> >> > Hi, >>>> >> > >>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs: >>>> >> > >>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2). >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3). >>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3. >>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>>> >> > And a part of my code: >>>> >> > >>>> >> > ``` >>>> >> > >>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >> > >>>> >> > // env.setParallelism(2); >>>> >> > env.enableCheckpointing(60000L); >>>> >> > ///PROPERTIES Added >>>> >> > Schema schema = bro_conn.getClassSchema(); >>>> >> > >>>> >> > OutputFileConfig config = OutputFileConfig >>>> >> > .builder() >>>> >> > .withPartSuffix(".parquet") >>>> >> > .build(); >>>> >> > >>>> >> > final StreamingFileSink<GenericRecord> sink = StreamingFileSink >>>> >> > .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema)) >>>> >> > // .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>> >> > .withOutputFileConfig(config) >>>> >> > // .withBucketAssigner(new PartitioningBucketAssigner()) >>>> >> > .build(); >>>> >> > >>>> >> > DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( >>>> >> > "kinesis", new SimpleStringSchema(), consumerConfig)); >>>> >> > >>>> >> > kinesis.flatMap(new JsonAvroParser()) >>>> >> > .addSink(sink); >>>> >> > >>>> >> > >>>> >> > env.execute("Bro Conn"); >>>> >> > >>>> >> > ``` >>>> >> > >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin. >>>> >> > >>>> >> > Thanks again! |
Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know. On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[hidden email]> wrote: It should not be a problem because from what you posted, you are using |
Btw, I ran the same exact code on a local Flink cluster run with `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the part files do not roll over; however, with the local filesystem it works perfectly. Should I be looking at the S3Committer in Flink to see if there is something odd going on? On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <[hidden email]> wrote:
|
I would say so, yes.
Also could you set the paths where you want to use Presto to "s3p", as described in [1], just to be sure that there is not ambiguity. You could also make use of [2]. And thanks for looking into it! Cheers, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific [2] https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose <[hidden email]> wrote: > > Btw, I ran the same exact code on a local Flink cluster run with `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the part files do not roll over; however, with the local filesystem it works perfectly. Should I be looking at the S3Committer in Flink to see if there is something odd going on? > > On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <[hidden email]> wrote: >> >> Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know. >> >> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[hidden email]> wrote: >>> >>> It should not be a problem because from what you posted, you are using >>> "s3a" as the scheme for s3. >>> Are you using "s3p" for Presto? This should also be done in order for >>> Flink to understand where to use the one or the other. >>> >>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote: >>> > >>> > Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins >>> > >>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote: >>> >> >>> >> Sorry realized this came off the user list by mistake. Adding the thread back in. >>> >> >>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote: >>> >>> >>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular. >>> >>> >>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote: >>> >>>> >>> >>>> Yes, this is why I reached out for further information. >>> >>>> >>> >>>> Incrementing the part counter is the responsibility of the >>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail >>> >>>> in the local FS. >>> >>>> Now if it is on the S3 side, it would help if you have any more info, >>> >>>> for example any logs from S3, to see if anything went wrong on their >>> >>>> end. >>> >>>> >>> >>>> So your logs refer to normal execution, i.e. no failures and no >>> >>>> restarting, right? >>> >>>> >>> >>>> Cheers, >>> >>>> Kostas >>> >>>> >>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote: >>> >>>> > >>> >>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly. >>> >>>> > >>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote: >>> >>>> >> >>> >>>> >> Hi Roshan, >>> >>>> >> >>> >>>> >> Your logs refer to a simple run without any failures or re-running >>> >>>> >> from a savepoint, right? >>> >>>> >> >>> >>>> >> I am asking because I am trying to reproduce it by running a modified >>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot. >>> >>>> >> The ITCase runs against the local filesystem, and not S3, but I added >>> >>>> >> the OutputFileConfig and it seems that the part counter is increases >>> >>>> >> as expected. >>> >>>> >> >>> >>>> >> Is there any other information that would help us reproduce the issue? >>> >>>> >> >>> >>>> >> Cheers, >>> >>>> >> Kostas >>> >>>> >> >>> >>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java >>> >>>> >> >>> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote: >>> >>>> >> > >>> >>>> >> > Hi, >>> >>>> >> > >>> >>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs: >>> >>>> >> > >>> >>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2). >>> >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2. >>> >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3). >>> >>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3. >>> >>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI >>> >>>> >> > And a part of my code: >>> >>>> >> > >>> >>>> >> > ``` >>> >>>> >> > >>> >>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>>> >> > >>> >>>> >> > // env.setParallelism(2); >>> >>>> >> > env.enableCheckpointing(60000L); >>> >>>> >> > ///PROPERTIES Added >>> >>>> >> > Schema schema = bro_conn.getClassSchema(); >>> >>>> >> > >>> >>>> >> > OutputFileConfig config = OutputFileConfig >>> >>>> >> > .builder() >>> >>>> >> > .withPartSuffix(".parquet") >>> >>>> >> > .build(); >>> >>>> >> > >>> >>>> >> > final StreamingFileSink<GenericRecord> sink = StreamingFileSink >>> >>>> >> > .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema)) >>> >>>> >> > // .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>> >>>> >> > .withOutputFileConfig(config) >>> >>>> >> > // .withBucketAssigner(new PartitioningBucketAssigner()) >>> >>>> >> > .build(); >>> >>>> >> > >>> >>>> >> > DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( >>> >>>> >> > "kinesis", new SimpleStringSchema(), consumerConfig)); >>> >>>> >> > >>> >>>> >> > kinesis.flatMap(new JsonAvroParser()) >>> >>>> >> > .addSink(sink); >>> >>>> >> > >>> >>>> >> > >>> >>>> >> > env.execute("Bro Conn"); >>> >>>> >> > >>> >>>> >> > ``` >>> >>>> >> > >>> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin. >>> >>>> >> > >>> >>>> >> > Thanks again! |
Free forum by Nabble | Edit this page |