Parquet S3 Sink Part files are not rolling over with checkpoint

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Parquet S3 Sink Part files are not rolling over with checkpoint

Roshan Punnoose
Hi,

I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:

2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
And a part of my code:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// env.setParallelism(2);
env.enableCheckpointing(60000L);
///PROPERTIES Added
Schema schema = bro_conn.getClassSchema();

OutputFileConfig config = OutputFileConfig
.builder()
.withPartSuffix(".parquet")
.build();

final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
// .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
// .withBucketAssigner(new PartitioningBucketAssigner())
.build();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis", new SimpleStringSchema(), consumerConfig));

kinesis.flatMap(new JsonAvroParser())
.addSink(sink);


env.execute("Bro Conn");
```

I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.

Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Kostas Kloudas-2
Hi Roshan,

Your logs refer to a simple run without any failures or re-running
from a savepoint, right?

I am asking because I am trying to reproduce it by running a modified
ParquetStreamingFileSinkITCase [1] and so far I cannot.
The ITCase runs against the local filesystem, and not S3, but I added
the OutputFileConfig and it seems that the part counter is increases
as expected.

Is there any other information that would help us reproduce the issue?

Cheers,
Kostas

[1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:

>
> Hi,
>
> I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>
> 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
> 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
> 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
> 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
> 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> And a part of my code:
>
> ```
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> //        env.setParallelism(2);
>         env.enableCheckpointing(60000L);
> ///PROPERTIES Added
>         Schema schema = bro_conn.getClassSchema();
>
>         OutputFileConfig config = OutputFileConfig
>                 .builder()
>                 .withPartSuffix(".parquet")
>                 .build();
>
>         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
> //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                 .withOutputFileConfig(config)
> //                .withBucketAssigner(new PartitioningBucketAssigner())
>                 .build();
>
>         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>                 "kinesis", new SimpleStringSchema(), consumerConfig));
>
>         kinesis.flatMap(new JsonAvroParser())
>                 .addSink(sink);
>
>
>         env.execute("Bro Conn");
>
> ```
>
> I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>
> Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Roshan Punnoose
Sorry realized this came off the user list by mistake. Adding the thread back in.

On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular. 

On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
Yes, this is why I reached out for further information.

Incrementing the part counter is the responsibility of the
StreamingFileSink, whose code is FS-agnostic, so it should also fail
in the local FS.
Now if it is on the S3 side, it would help if you have any more info,
for example any logs from S3, to see if anything went wrong on their
end.

So your logs refer to normal execution, i.e. no failures and no
restarting, right?

Cheers,
Kostas

On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>
> Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>
> On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Roshan,
>>
>> Your logs refer to a simple run without any failures or re-running
>> from a savepoint, right?
>>
>> I am asking because I am trying to reproduce it by running a modified
>> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>> The ITCase runs against the local filesystem, and not S3, but I added
>> the OutputFileConfig and it seems that the part counter is increases
>> as expected.
>>
>> Is there any other information that would help us reproduce the issue?
>>
>> Cheers,
>> Kostas
>>
>> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>
>> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>> >
>> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> > And a part of my code:
>> >
>> > ```
>> >
>> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> > //        env.setParallelism(2);
>> >         env.enableCheckpointing(60000L);
>> > ///PROPERTIES Added
>> >         Schema schema = bro_conn.getClassSchema();
>> >
>> >         OutputFileConfig config = OutputFileConfig
>> >                 .builder()
>> >                 .withPartSuffix(".parquet")
>> >                 .build();
>> >
>> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> >                 .withOutputFileConfig(config)
>> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>> >                 .build();
>> >
>> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>> >
>> >         kinesis.flatMap(new JsonAvroParser())
>> >                 .addSink(sink);
>> >
>> >
>> >         env.execute("Bro Conn");
>> >
>> > ```
>> >
>> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>> >
>> > Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Roshan Punnoose
Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins 

On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
Sorry realized this came off the user list by mistake. Adding the thread back in.

On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular. 

On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
Yes, this is why I reached out for further information.

Incrementing the part counter is the responsibility of the
StreamingFileSink, whose code is FS-agnostic, so it should also fail
in the local FS.
Now if it is on the S3 side, it would help if you have any more info,
for example any logs from S3, to see if anything went wrong on their
end.

So your logs refer to normal execution, i.e. no failures and no
restarting, right?

Cheers,
Kostas

On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>
> Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>
> On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Roshan,
>>
>> Your logs refer to a simple run without any failures or re-running
>> from a savepoint, right?
>>
>> I am asking because I am trying to reproduce it by running a modified
>> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>> The ITCase runs against the local filesystem, and not S3, but I added
>> the OutputFileConfig and it seems that the part counter is increases
>> as expected.
>>
>> Is there any other information that would help us reproduce the issue?
>>
>> Cheers,
>> Kostas
>>
>> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>
>> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>> >
>> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> > And a part of my code:
>> >
>> > ```
>> >
>> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> > //        env.setParallelism(2);
>> >         env.enableCheckpointing(60000L);
>> > ///PROPERTIES Added
>> >         Schema schema = bro_conn.getClassSchema();
>> >
>> >         OutputFileConfig config = OutputFileConfig
>> >                 .builder()
>> >                 .withPartSuffix(".parquet")
>> >                 .build();
>> >
>> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> >                 .withOutputFileConfig(config)
>> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>> >                 .build();
>> >
>> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>> >
>> >         kinesis.flatMap(new JsonAvroParser())
>> >                 .addSink(sink);
>> >
>> >
>> >         env.execute("Bro Conn");
>> >
>> > ```
>> >
>> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>> >
>> > Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Kostas Kloudas-2
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote:

>
> Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
>>>>
>>>> Yes, this is why I reached out for further information.
>>>>
>>>> Incrementing the part counter is the responsibility of the
>>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>> in the local FS.
>>>> Now if it is on the S3 side, it would help if you have any more info,
>>>> for example any logs from S3, to see if anything went wrong on their
>>>> end.
>>>>
>>>> So your logs refer to normal execution, i.e. no failures and no
>>>> restarting, right?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>>>> >
>>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>>>> >
>>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>>> >>
>>>> >> Hi Roshan,
>>>> >>
>>>> >> Your logs refer to a simple run without any failures or re-running
>>>> >> from a savepoint, right?
>>>> >>
>>>> >> I am asking because I am trying to reproduce it by running a modified
>>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>>> >> the OutputFileConfig and it seems that the part counter is increases
>>>> >> as expected.
>>>> >>
>>>> >> Is there any other information that would help us reproduce the issue?
>>>> >>
>>>> >> Cheers,
>>>> >> Kostas
>>>> >>
>>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>> >>
>>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>>>> >> >
>>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > And a part of my code:
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> >> >
>>>> >> > //        env.setParallelism(2);
>>>> >> >         env.enableCheckpointing(60000L);
>>>> >> > ///PROPERTIES Added
>>>> >> >         Schema schema = bro_conn.getClassSchema();
>>>> >> >
>>>> >> >         OutputFileConfig config = OutputFileConfig
>>>> >> >                 .builder()
>>>> >> >                 .withPartSuffix(".parquet")
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>>>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>>>> >> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> >> >                 .withOutputFileConfig(config)
>>>> >> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>>> >> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>>>> >> >
>>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>>> >> >                 .addSink(sink);
>>>> >> >
>>>> >> >
>>>> >> >         env.execute("Bro Conn");
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>>>> >> >
>>>> >> > Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Roshan Punnoose
Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know.

On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[hidden email]> wrote:
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote:
>
> Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
>>>>
>>>> Yes, this is why I reached out for further information.
>>>>
>>>> Incrementing the part counter is the responsibility of the
>>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>> in the local FS.
>>>> Now if it is on the S3 side, it would help if you have any more info,
>>>> for example any logs from S3, to see if anything went wrong on their
>>>> end.
>>>>
>>>> So your logs refer to normal execution, i.e. no failures and no
>>>> restarting, right?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>>>> >
>>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>>>> >
>>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>>> >>
>>>> >> Hi Roshan,
>>>> >>
>>>> >> Your logs refer to a simple run without any failures or re-running
>>>> >> from a savepoint, right?
>>>> >>
>>>> >> I am asking because I am trying to reproduce it by running a modified
>>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>>> >> the OutputFileConfig and it seems that the part counter is increases
>>>> >> as expected.
>>>> >>
>>>> >> Is there any other information that would help us reproduce the issue?
>>>> >>
>>>> >> Cheers,
>>>> >> Kostas
>>>> >>
>>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>> >>
>>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>>>> >> >
>>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > And a part of my code:
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> >> >
>>>> >> > //        env.setParallelism(2);
>>>> >> >         env.enableCheckpointing(60000L);
>>>> >> > ///PROPERTIES Added
>>>> >> >         Schema schema = bro_conn.getClassSchema();
>>>> >> >
>>>> >> >         OutputFileConfig config = OutputFileConfig
>>>> >> >                 .builder()
>>>> >> >                 .withPartSuffix(".parquet")
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>>>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>>>> >> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> >> >                 .withOutputFileConfig(config)
>>>> >> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>>> >> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>>>> >> >
>>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>>> >> >                 .addSink(sink);
>>>> >> >
>>>> >> >
>>>> >> >         env.execute("Bro Conn");
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>>>> >> >
>>>> >> > Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Roshan Punnoose
Btw, I ran the same exact code on a local Flink cluster run with `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the part files do not roll over; however, with the local filesystem it works perfectly. Should I be looking at the S3Committer in Flink to see if there is something odd going on?

On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <[hidden email]> wrote:
Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know.

On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[hidden email]> wrote:
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote:
>
> Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
>>>>
>>>> Yes, this is why I reached out for further information.
>>>>
>>>> Incrementing the part counter is the responsibility of the
>>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>> in the local FS.
>>>> Now if it is on the S3 side, it would help if you have any more info,
>>>> for example any logs from S3, to see if anything went wrong on their
>>>> end.
>>>>
>>>> So your logs refer to normal execution, i.e. no failures and no
>>>> restarting, right?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>>>> >
>>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>>>> >
>>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>>> >>
>>>> >> Hi Roshan,
>>>> >>
>>>> >> Your logs refer to a simple run without any failures or re-running
>>>> >> from a savepoint, right?
>>>> >>
>>>> >> I am asking because I am trying to reproduce it by running a modified
>>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>>> >> the OutputFileConfig and it seems that the part counter is increases
>>>> >> as expected.
>>>> >>
>>>> >> Is there any other information that would help us reproduce the issue?
>>>> >>
>>>> >> Cheers,
>>>> >> Kostas
>>>> >>
>>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>> >>
>>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>>>> >> >
>>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > And a part of my code:
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> >> >
>>>> >> > //        env.setParallelism(2);
>>>> >> >         env.enableCheckpointing(60000L);
>>>> >> > ///PROPERTIES Added
>>>> >> >         Schema schema = bro_conn.getClassSchema();
>>>> >> >
>>>> >> >         OutputFileConfig config = OutputFileConfig
>>>> >> >                 .builder()
>>>> >> >                 .withPartSuffix(".parquet")
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>>>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>>>> >> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> >> >                 .withOutputFileConfig(config)
>>>> >> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>>>> >> >                 .build();
>>>> >> >
>>>> >> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>>> >> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>>>> >> >
>>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>>> >> >                 .addSink(sink);
>>>> >> >
>>>> >> >
>>>> >> >         env.execute("Bro Conn");
>>>> >> >
>>>> >> > ```
>>>> >> >
>>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>>>> >> >
>>>> >> > Thanks again!
Reply | Threaded
Open this post in threaded view
|

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

Kostas Kloudas-2
I would say so, yes.
Also could you set the paths where you want to use Presto to "s3p", as
described in [1], just to be sure that there is not ambiguity.

You could also make use of [2].

And thanks for looking into it!

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific
[2] https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose <[hidden email]> wrote:

>
> Btw, I ran the same exact code on a local Flink cluster run with `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the part files do not roll over; however, with the local filesystem it works perfectly. Should I be looking at the S3Committer in Flink to see if there is something odd going on?
>
> On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <[hidden email]> wrote:
>>
>> Nope just the s3a. I'll keep looking around to see if there is anything else I can see. If you think of anything else to try, let me know.
>>
>> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <[hidden email]> wrote:
>>>
>>> It should not be a problem because from what you posted, you are using
>>> "s3a" as the scheme for s3.
>>> Are you using "s3p" for Presto? This should also be done in order for
>>> Flink to understand where to use the one or the other.
>>>
>>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <[hidden email]> wrote:
>>> >
>>> > Lastly, could it be the way I built the flink image for kube? I added both the presto and Hadoop plugins
>>> >
>>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <[hidden email]> wrote:
>>> >>
>>> >> Sorry realized this came off the user list by mistake. Adding the thread back in.
>>> >>
>>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <[hidden email]> wrote:
>>> >>>
>>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so don't know all the places to look for the logs. Been looking at the task manager logs and don't see any exceptions there. Not sure where to look for s3 exceptions in particular.
>>> >>>
>>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <[hidden email]> wrote:
>>> >>>>
>>> >>>> Yes, this is why I reached out for further information.
>>> >>>>
>>> >>>> Incrementing the part counter is the responsibility of the
>>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>> >>>> in the local FS.
>>> >>>> Now if it is on the S3 side, it would help if you have any more info,
>>> >>>> for example any logs from S3, to see if anything went wrong on their
>>> >>>> end.
>>> >>>>
>>> >>>> So your logs refer to normal execution, i.e. no failures and no
>>> >>>> restarting, right?
>>> >>>>
>>> >>>> Cheers,
>>> >>>> Kostas
>>> >>>>
>>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <[hidden email]> wrote:
>>> >>>> >
>>> >>>> > Surprisingly the same code running against the local filesystem works perfectly. The part counter increments correctly.
>>> >>>> >
>>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <[hidden email]> wrote:
>>> >>>> >>
>>> >>>> >> Hi Roshan,
>>> >>>> >>
>>> >>>> >> Your logs refer to a simple run without any failures or re-running
>>> >>>> >> from a savepoint, right?
>>> >>>> >>
>>> >>>> >> I am asking because I am trying to reproduce it by running a modified
>>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>> >>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>> >>>> >> the OutputFileConfig and it seems that the part counter is increases
>>> >>>> >> as expected.
>>> >>>> >>
>>> >>>> >> Is there any other information that would help us reproduce the issue?
>>> >>>> >>
>>> >>>> >> Cheers,
>>> >>>> >> Kostas
>>> >>>> >>
>>> >>>> >> [1] https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>> >>>> >>
>>> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <[hidden email]> wrote:
>>> >>>> >> >
>>> >>>> >> > Hi,
>>> >>>> >> >
>>> >>>> >> > I am trying to get the parquet writer to write to s3; however, the files do not seem to be rolling over. The same file "part-0-0.parquet" is being created each time. Like the 'partCounter" is not being updated? Maybe the Bucket is being recreated each time? I don't really know... Here are some logs:
>>> >>>> >> >
>>> >>>> >> > 2020-04-09 01:28:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>> >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=2.
>>> >>>> >> > 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >>>> >> > 2020-04-09 01:29:10,350 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>> >>>> >> > 2020-04-09 01:29:10,520 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 received completion notification for checkpoint with id=3.
>>> >>>> >> > 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >>>> >> > And a part of my code:
>>> >>>> >> >
>>> >>>> >> > ```
>>> >>>> >> >
>>> >>>> >> > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> >>>> >> >
>>> >>>> >> > //        env.setParallelism(2);
>>> >>>> >> >         env.enableCheckpointing(60000L);
>>> >>>> >> > ///PROPERTIES Added
>>> >>>> >> >         Schema schema = bro_conn.getClassSchema();
>>> >>>> >> >
>>> >>>> >> >         OutputFileConfig config = OutputFileConfig
>>> >>>> >> >                 .builder()
>>> >>>> >> >                 .withPartSuffix(".parquet")
>>> >>>> >> >                 .build();
>>> >>>> >> >
>>> >>>> >> >         final StreamingFileSink<GenericRecord> sink = StreamingFileSink
>>> >>>> >> >                 .forBulkFormat(new Path("s3a://<bucket>/bro_conn/"), ParquetAvroWriters.forGenericRecord(schema))
>>> >>>> >> > //                .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> >>>> >> >                 .withOutputFileConfig(config)
>>> >>>> >> > //                .withBucketAssigner(new PartitioningBucketAssigner())
>>> >>>> >> >                 .build();
>>> >>>> >> >
>>> >>>> >> >         DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>>> >>>> >> >                 "kinesis", new SimpleStringSchema(), consumerConfig));
>>> >>>> >> >
>>> >>>> >> >         kinesis.flatMap(new JsonAvroParser())
>>> >>>> >> >                 .addSink(sink);
>>> >>>> >> >
>>> >>>> >> >
>>> >>>> >> >         env.execute("Bro Conn");
>>> >>>> >> >
>>> >>>> >> > ```
>>> >>>> >> >
>>> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom image to add the presto/hadoop plugin.
>>> >>>> >> >
>>> >>>> >> > Thanks again!