Difference between BucketingSink and StreamingFileSink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Difference between BucketingSink and StreamingFileSink

Averell
Hi everyone,

I am trying to persist my stream into parquet files. In the documents, I can
see two different file sinks: BucketingSink (Rolling File Sink) and
StreamingFileSink. I could not see any information regarding the differences
between these two types.
Which one should I choose for writing to parquet? Is that possible to
partition my output basing on event-time?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Difference between BucketingSink and StreamingFileSink

Averell
Hi,

https://issues.apache.org/jira/browse/FLINK-9749 <<< as per this ticket,
StreamingFileSink is a newer option, which is better than BucketingSink for
Parquet.
Would love to see some example one using that.

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Difference between BucketingSink and StreamingFileSink

Averell
Hi,

Sorry for wasting your time. I found the solution for that question
regarding event-time: a class that extends BucketAssigner would do the
needful:

class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
String) extends BucketAssigner[T, String]{
        @transient
        var dateFormatter = new SimpleDateFormat(formatString)

        override def getBucketId(in: T, context: BucketAssigner.Context): String =
{
                if (dateFormatter == null) dateFormatter = new
SimpleDateFormat(formatString)
                s"$prefix${dateFormatter.format(new java.util.Date(in.getTimestamp))}"
        }

        override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
}

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Difference between BucketingSink and StreamingFileSink

Aljoscha Krettek
No worries! :-) it's nice that you also posted the solution

> On 4. Oct 2018, at 13:23, Averell <[hidden email]> wrote:
>
> Hi,
>
> Sorry for wasting your time. I found the solution for that question
> regarding event-time: a class that extends BucketAssigner would do the
> needful:
>
> class SdcTimeBucketAssigner[T <: MyClass](prefix: String, formatString:
> String) extends BucketAssigner[T, String]{
> @transient
> var dateFormatter = new SimpleDateFormat(formatString)
>
> override def getBucketId(in: T, context: BucketAssigner.Context): String =
> {
> if (dateFormatter == null) dateFormatter = new
> SimpleDateFormat(formatString)
> s"$prefix${dateFormatter.format(new java.util.Date(in.getTimestamp))}"
> }
>
> override def getSerializer = SimpleVersionedStringSerializer.INSTANCE
> }
>
> Thanks and best regards,
> Averell
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/