Streaming to Parquet Files in HDFS

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming to Parquet Files in HDFS

William Speirs
I'm trying to stream log messages (syslog fed into Kafak) into Parquet files on HDFS via Flink. I'm able to read, parse, and construct objects for my messages in Flink; however, writing to Parquet is tripping me up. I do *not* need to have this be real-time; a delay of a few minutes, even up to an hour, is fine.

I've found the following articles talking about this being very difficult:

All of these posts speak of troubles using the check-pointing mechanisms and Parquets need to perform batch writes. I'm not experienced enough with Flink's check-pointing or Parquet's file format to completely understand the issue. So my questions are as follows:

1) Is this possible in Flink in an exactly-once way? If not, is it possible in a way that _might_ cause duplicates during an error?

2) Is there another/better format to use other than Parquet that offers compression and the ability to be queried by something like Drill or Impala?

3) Any further recommendations for solving the overall problem: ingesting syslogs and writing them to a file(s) that is searchable by an SQL(-like) framework?

Thanks!

Bill-
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

hao gao
Hi Bill,

I wrote those two medium posts you mentioned above. But clearly, the techlab one is much better
I would suggest just "close the file when checkpointing" which is the easiest way. If you use BucketingSink, you can modify the code to make it work. Just replace the code from line 691 to 693 with closeCurrentPartFile() 
This should guarantee exactly-once. You may have some files with underscore prefix when flink job failed. But usually those files are ignored by the query engine/ readers for example,  Presto

If you use 1.6 and later, I think the issue is already addressed 

Thanks
Hao

On Fri, Sep 28, 2018 at 1:57 PM William Speirs <[hidden email]> wrote:
I'm trying to stream log messages (syslog fed into Kafak) into Parquet files on HDFS via Flink. I'm able to read, parse, and construct objects for my messages in Flink; however, writing to Parquet is tripping me up. I do *not* need to have this be real-time; a delay of a few minutes, even up to an hour, is fine.

I've found the following articles talking about this being very difficult:

All of these posts speak of troubles using the check-pointing mechanisms and Parquets need to perform batch writes. I'm not experienced enough with Flink's check-pointing or Parquet's file format to completely understand the issue. So my questions are as follows:

1) Is this possible in Flink in an exactly-once way? If not, is it possible in a way that _might_ cause duplicates during an error?

2) Is there another/better format to use other than Parquet that offers compression and the ability to be queried by something like Drill or Impala?

3) Any further recommendations for solving the overall problem: ingesting syslogs and writing them to a file(s) that is searchable by an SQL(-like) framework?

Thanks!

Bill-


--
Thanks
 - Hao 
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Fabian Hueske-2
Hi Bill,

Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the previously mentioned StreamingFileSink [1], [2].

Best, Fabian


Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao <[hidden email]>:
Hi Bill,

I wrote those two medium posts you mentioned above. But clearly, the techlab one is much better
I would suggest just "close the file when checkpointing" which is the easiest way. If you use BucketingSink, you can modify the code to make it work. Just replace the code from line 691 to 693 with closeCurrentPartFile() 
This should guarantee exactly-once. You may have some files with underscore prefix when flink job failed. But usually those files are ignored by the query engine/ readers for example,  Presto

If you use 1.6 and later, I think the issue is already addressed 

Thanks
Hao

On Fri, Sep 28, 2018 at 1:57 PM William Speirs <[hidden email]> wrote:
I'm trying to stream log messages (syslog fed into Kafak) into Parquet files on HDFS via Flink. I'm able to read, parse, and construct objects for my messages in Flink; however, writing to Parquet is tripping me up. I do *not* need to have this be real-time; a delay of a few minutes, even up to an hour, is fine.

I've found the following articles talking about this being very difficult:

All of these posts speak of troubles using the check-pointing mechanisms and Parquets need to perform batch writes. I'm not experienced enough with Flink's check-pointing or Parquet's file format to completely understand the issue. So my questions are as follows:

1) Is this possible in Flink in an exactly-once way? If not, is it possible in a way that _might_ cause duplicates during an error?

2) Is there another/better format to use other than Parquet that offers compression and the ability to be queried by something like Drill or Impala?

3) Any further recommendations for solving the overall problem: ingesting syslogs and writing them to a file(s) that is searchable by an SQL(-like) framework?

Thanks!

Bill-


--
Thanks
 - Hao 
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

bdas77
Nice to see this finally!

On Mon, Oct 1, 2018 at 1:53 AM Fabian Hueske <[hidden email]> wrote:
Hi Bill,

Flink 1.6.0 supports writing Avro records as Parquet files to HDFS via the previously mentioned StreamingFileSink [1], [2].

Best, Fabian


Am Fr., 28. Sep. 2018 um 23:36 Uhr schrieb hao gao <[hidden email]>:
Hi Bill,

I wrote those two medium posts you mentioned above. But clearly, the techlab one is much better
I would suggest just "close the file when checkpointing" which is the easiest way. If you use BucketingSink, you can modify the code to make it work. Just replace the code from line 691 to 693 with closeCurrentPartFile() 
This should guarantee exactly-once. You may have some files with underscore prefix when flink job failed. But usually those files are ignored by the query engine/ readers for example,  Presto

If you use 1.6 and later, I think the issue is already addressed 

Thanks
Hao

On Fri, Sep 28, 2018 at 1:57 PM William Speirs <[hidden email]> wrote:
I'm trying to stream log messages (syslog fed into Kafak) into Parquet files on HDFS via Flink. I'm able to read, parse, and construct objects for my messages in Flink; however, writing to Parquet is tripping me up. I do *not* need to have this be real-time; a delay of a few minutes, even up to an hour, is fine.

I've found the following articles talking about this being very difficult:

All of these posts speak of troubles using the check-pointing mechanisms and Parquets need to perform batch writes. I'm not experienced enough with Flink's check-pointing or Parquet's file format to completely understand the issue. So my questions are as follows:

1) Is this possible in Flink in an exactly-once way? If not, is it possible in a way that _might_ cause duplicates during an error?

2) Is there another/better format to use other than Parquet that offers compression and the ability to be queried by something like Drill or Impala?

3) Any further recommendations for solving the overall problem: ingesting syslogs and writing them to a file(s) that is searchable by an SQL(-like) framework?

Thanks!

Bill-


--
Thanks
 - Hao 

Thank you
~/Das
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
In reply to this post by Fabian Hueske-2
Hi Fabian, Kostas,

From the description of this ticket
https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
output parquet file with StreamingFileSink will span multiple checkpoints.
However, when I tried (as in the here below code snippet) I still see that
one "part-X-X" file is created after each checkpoint. Is there any other
configuration that I'm missing?

BTW, I have another question regarding
StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
notes at the end of this page  StreamingFileSink
<https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html>
, buck-enconding can only combined with OnCheckpointRollingPolicy, which
rolls on every checkpoint. So setting that CheckInterval makes no
difference. So why should we expose that withBucketCheckInterval method?

Thanks and best regards,
Averell

        def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
ClassTag[T]): StreamingFileSink[T] = {
                StreamingFileSink.forBulkFormat(new Path(outputPath),
ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
String]]
                                .withBucketCheckInterval(5L * 60L * 1000L)
                                .withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH"))
                                .build()
        }




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi Averell,

You are right that for Bulk Formats like Parquet, we roll on every checkpoint.

This is currently a limitation that has to do with the fact that bulk formats gather
and rely on metadata that they keep internally and which we cannot checkpoint
in Flink,as they do not expose them.

Setting the checkpoint interval affects how big your part files are going to be and,
in some cases, how efficient your compression is going to be. In some cases, the
more the data to be compressed, the better to compression ratio.

Exposing the withBucketCheckInterval() you are right that it does not
serve much for the moment.

Cheers,
Kostas

> On Oct 5, 2018, at 1:54 AM, Averell <[hidden email]> wrote:
>
> Hi Fabian, Kostas,
>
> From the description of this ticket
> https://issues.apache.org/jira/browse/FLINK-9753, I understand that now my
> output parquet file with StreamingFileSink will span multiple checkpoints.
> However, when I tried (as in the here below code snippet) I still see that
> one "part-X-X" file is created after each checkpoint. Is there any other
> configuration that I'm missing?
>
> BTW, I have another question regarding
> StreamingFileSink.BulkFormatBuilder.withBucketCheckInterval(). As per the
> notes at the end of this page  StreamingFileSink
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html>
> , buck-enconding can only combined with OnCheckpointRollingPolicy, which
> rolls on every checkpoint. So setting that CheckInterval makes no
> difference. So why should we expose that withBucketCheckInterval method?
>
> Thanks and best regards,
> Averell
>
> def buildSink[T <: MyBaseRecord](outputPath: String)(implicit ct:
> ClassTag[T]): StreamingFileSink[T] = {
> StreamingFileSink.forBulkFormat(new Path(outputPath),
> ParquetAvroWriters.forReflectRecord(ct.runtimeClass)).asInstanceOf[StreamingFileSink.BulkFormatBuilder[T,
> String]]
> .withBucketCheckInterval(5L * 60L * 1000L)
> .withBucketAssigner(new DateTimeBucketAssigner[T]("yyyy-MM-dd--HH"))
> .build()
> }
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
Hi Kostas,

Thanks for the info.
Just one more question regarding writing parquet. I need to write my stream
as parquet to S3. As per this ticket
https://issues.apache.org/jira/browse/FLINK-9752
<https://issues.apache.org/jira/browse/FLINK-9752>  , it is now not
supported. Is there any ready-to-use solution that supports copying/moving
file from HDFS to S3 (something like a trigger from Flink after it has
finished writing to HDFS).

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi Averell,

There is no such “out-of-the-box” solution, but there is an open PR for adding 
S3 support to the StreamingFileSink [1].

Cheers,
Kostas


On Oct 5, 2018, at 11:14 AM, Averell <[hidden email]> wrote:

Hi Kostas,

Thanks for the info.
Just one more question regarding writing parquet. I need to write my stream
as parquet to S3. As per this ticket
https://issues.apache.org/jira/browse/FLINK-9752
<https://issues.apache.org/jira/browse/FLINK-9752>  , it is now not
supported. Is there any ready-to-use solution that supports copying/moving
file from HDFS to S3 (something like a trigger from Flink after it has
finished writing to HDFS).

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
What a great news.
Thanks for that, Kostas.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
This post was updated on .
In reply to this post by Kostas Kloudas
Hi Kostas,

I tried your PR - trying to write to S3 from Flink running on AWS EMR, and I got
the following error. I copied the three jar files
flink-hadoop-fs-1.7-SNAPSHOT.jar, flink-s3-fs-base-1.7-SNAPSHOT.jar,
flink-s3-fs-hadoop-1.7-SNAPSHOT.jar to lib/ directory. Do I need to make any
change to HADOOP configurations?

Thanks and best regards,
Averell

java.lang.Exception: unable to establish the security context
        at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:73)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1118)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2503)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:106)
        at org.apache.hadoop.security.Groups.<init>(Groups.java:101)
        at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:448)
        at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:331)
        at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:359)
        at
org.apache.flink.runtime.security.modules.HadoopModule.install(HadoopModule.java:70)
        at
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:67)
        ... 1 more
Caused by: java.lang.RuntimeException: class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not org.apache.hadoop.security.GroupMappingServiceProvider
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2497)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
In reply to this post by Kostas Kloudas
Hi Kostas,

Please help ignore my previous email about the issue with security. It seems
to I had mixed version of shaded and unshaded jars.

However, I'm now facing another issue with writing parquet files: only the
first part is closed. All the subsequent parts are kept in in-progress state
forever. My settings are to have checkpoint every 3 minutes. Sink
parallelism set to 1 (my tries to set to 4 or 30 showed no difference).
BucketID assigner is using event-timestamp.
I only got this issue when running Flink on a yarn cluster, either writing
to file:/// or to S3. When I ran it on my laptop, I got one part for every
single checkpoint.
TM logs says something like "*BucketState ... has pending files for
checkpoints: {2 }*"

Could you please help on how can I further debug this?

Here below is the TM log:

2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.197 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0219838,R1.S1.LT1.P25).
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:39:01.984 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-0" for bucket id=dt=2018-09-22.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=1).
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:17.855 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:18.254 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {2 }
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:44.069 [Async calls on Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
received completion notification for checkpoint with id=2.
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.691 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537656300000,meter0207081,R1.S1.LT1.P25).
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:40:46.765 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-1" for bucket id=dt=2018-09-22.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=2).
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:17.831 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 }
2018-10-06 14:43:18.401 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 }
2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10).
2018-10-06 14:45:59.276 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0218455,R1.S1.LT1.P10).
2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-2" for bucket id=dt=2018-09-22.
2018-10-06 14:45:59.334 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-2" for bucket id=dt=2018-09-22.
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=4 (max part counter=3).
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=4 (max part counter=3).
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:46:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 }
2018-10-06 14:46:18.228 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 }
2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25).
2018-10-06 14:46:25.041 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 due to element
SdcRecord(1537657200000,meter0209471,R1.S1.LT1.P25).
2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-3" for bucket id=dt=2018-09-22.
2018-10-06 14:46:25.186 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
opening new part file "part-0-3" for bucket id=dt=2018-09-22.
2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=5 (max part counter=4).
2018-10-06 14:49:17.848 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=5 (max part counter=4).
2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:49:17.849 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0
closing in-progress part file for bucket id=dt=2018-09-22 on checkpoint.
2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:49:18.385 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=6 (max part counter=4).
2018-10-06 14:52:17.824 [Sink: Unnamed (1/1)] INFO
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=6 (max part counter=4).
2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }
2018-10-06 14:52:17.825 [Sink: Unnamed (1/1)] DEBUG
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing: BucketState for bucketId=dt=2018-09-22 and
bucketPath=s3a://assn-averell/Test/output/dt=2018-09-22, has pending files
for checkpoints: {3 4 5 }

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi Averell,

From the logs, only checkpoint 2 was acknowledged (search for
“eceived completion notification for checkpoint with id=“) and this is
why no more files are finalized. So only checkpoint 2 was successfully
completed.

BTW you are using the PR you mentioned before or Flink 1.6?
I am asking because on Flink 1.6. there is no support for S3 on the
streaming file sink.

Cheers,
Kostas

> On Oct 7, 2018, at 2:02 AM, Averell <[hidden email]> wrote:
>
> received completion notification for checkpoint wit

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
Hi Kostas,

I'm using a build with your PR. However, it seemed the issue is not with S3,
as when I tried to write to local file system (file:///, not HDFS), I also
got the same problem - only the first part published. All remaining parts
were in inprogress and had names prefixed with "."

From Flink GUI, all checkpoints were shown as completed successfully.

How could I debug further?

Thanks a lot for your help.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi Averell,

Could you set your logging to DEBUG?
This may shed some light on what is happening as it will contain more logs.

Kostas

> On Oct 7, 2018, at 11:03 AM, Averell <[hidden email]> wrote:
>
> Hi Kostas,
>
> I'm using a build with your PR. However, it seemed the issue is not with S3,
> as when I tried to write to local file system (file:///, not HDFS), I also
> got the same problem - only the first part published. All remaining parts
> were in inprogress and had names prefixed with "."
>
> From Flink GUI, all checkpoints were shown as completed successfully.
>
> How could I debug further?
>
> Thanks a lot for your help.
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi,

I just saw that you have already set the level to DEBUG.

These are all your DEBUG logs of the TM when running on YARN?

Also did you try to wait a bit more to see if the acknowledgements of the checkpoints
arrive a bit later? Checkpoints and acknowledgments are not necessarily aligned.

Kostas

> On Oct 7, 2018, at 12:37 PM, Kostas Kloudas <[hidden email]> wrote:
>
> Hi Averell,
>
> Could you set your logging to DEBUG?
> This may shed some light on what is happening as it will contain more logs.
>
> Kostas
>
>> On Oct 7, 2018, at 11:03 AM, Averell <[hidden email]> wrote:
>>
>> Hi Kostas,
>>
>> I'm using a build with your PR. However, it seemed the issue is not with S3,
>> as when I tried to write to local file system (file:///, not HDFS), I also
>> got the same problem - only the first part published. All remaining parts
>> were in inprogress and had names prefixed with "."
>>
>> From Flink GUI, all checkpoints were shown as completed successfully.
>>
>> How could I debug further?
>>
>> Thanks a lot for your help.
>> Regards,
>> Averell
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
Hi Kostas,

Yes, I set the level to DEBUG, but for the
/org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only.
Will try to enable for /org.apache.flink.streaming/.
I just found one (possibly) issue with my build is that I had not used the
latest master branch when merging with your PR. So I might have missed some
other important PRs.

BTW, regarding your comment /Checkpoints and acknowledgments are not
necessarily aligned/ << If this happens and for some reason, the job crashed
right after the checkpoint and before acknowledgements, does that means that
last checkpoint is not valid?

Thanks and best regards,
Averell





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Kostas Kloudas
Hi,

Yes, please enable DEBUG to streaming to see all the
logs also from the StreamTask.

A checkpoint is “valid” as soon as it get acknowledged.
As the documentation says, the job will restart from
“ the last **successful** checkpoint” which is the most
recent acknowledged one.

Cheers,
Kostas  

> On Oct 7, 2018, at 1:03 PM, Averell <[hidden email]> wrote:
>
> Hi Kostas,
>
> Yes, I set the level to DEBUG, but for the
> /org.apache.flink.streaming.api.functions.sink.filesystem.bucket/ only.
> Will try to enable for /org.apache.flink.streaming/.
> I just found one (possibly) issue with my build is that I had not used the
> latest master branch when merging with your PR. So I might have missed some
> other important PRs.
>
> BTW, regarding your comment /Checkpoints and acknowledgments are not
> necessarily aligned/ << If this happens and for some reason, the job crashed
> right after the checkpoint and before acknowledgements, does that means that
> last checkpoint is not valid?
>
> Thanks and best regards,
> Averell
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Streaming to Parquet Files in HDFS

Averell
Hi Kostas,

Thanks for the info. That error caused by I built your code along with not
up-to-date baseline. I rebased my branch build, and there's no more such
issue.
I've been testing, and until now have some questions/issues as below:

1. I'm not able to write to S3 with the following URI format: *s3*://<path>,
and had to use *s3a*://<path>. Is this behaviour expected? (I am running
Flink on AWS EMR, and I thought that EMR provides a wrapper for HDFS over S3
with something called EMRFS).

2. Occasionally/randomly I got the below message ( parquet_error1.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error1.log>
). I'm using ParquetAvroWriters.forReflectRecord() method to write Scala
case classes. Re-running the job doesn't get that error at the same data
location, so I don't think that there's issue with data.
 *java.lang.ArrayIndexOutOfBoundsException: <some random number>* /at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.fallBackDictionaryEncodedData/.

3. Sometimes I got this error message when I use parallelism of 8 for the
sink ( parquet_error2.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/parquet_error2.log>
).
Reducing to 2 solves the issue. But is it possible to increase the pool
size? I could not find any place that I can change the
/fs.s3.maxconnections/ parameter.
/java.io.InterruptedIOException: initiate MultiPartUpload on
Test/output/dt=2018-09-20/part-7-5:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool/

4. Where is the temporary folder that you store the parquet file before
uploading to S3?

Thanks a lot for your help.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/