BucketAssigner - Confusion

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

BucketAssigner - Confusion

Jeff Crane
I have had an issue understanding the documentation, in regard to BucketAssigner.
BucketID getBucketId(IN element,
                     BucketAssigner.Context context)
SimpleVersionedSerializer<BucketID> getSerializer()
First of all, I don't understand what type of "BucketID" means. I assume that's the returned type fo the getBucketID, which doesn't make sense. The description says getBucketId (returns?)  "A string representing the identifier of the bucket" So BucketID is not a type, it's always a string?
Base on the docs, I implemented like this, which doesn't write anything!
public final class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume <String> because BucketID is always string?
    public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}

Can someone explain how bucketAssigned is supposed to do in plainer english. I don't think the docs are clear and I'm lost.
Reply | Threaded
Open this post in threaded view
|

Re: BucketAssigner - Confusion

Jeff Crane
According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) signature requiring a return of string (Flink 1.7 libs), so I still don't think the BucketID is a variable type.

I still don't understand the role of the:
    public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
Where does that come into play, if the getBucketID makes a string anyway?




On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane <[hidden email]> wrote:


I have had an issue understanding the documentation, in regard to BucketAssigner.
BucketID getBucketId(IN element,
                     BucketAssigner.Context context)
SimpleVersionedSerializer<BucketID> getSerializer()
First of all, I don't understand what type of "BucketID" means. I assume that's the returned type fo the getBucketID, which doesn't make sense. The description says getBucketId (returns?)  "A string representing the identifier of the bucket" So BucketID is not a type, it's always a string?
Base on the docs, I implemented like this, which doesn't write anything!
public final class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume <String> because BucketID is always string?
    public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}

Can someone explain how bucketAssigned is supposed to do in plainer english. I don't think the docs are clear and I'm lost.
Reply | Threaded
Open this post in threaded view
|

Re: BucketAssigner - Confusion

Chesnay Schepler
BucketID is a variable type, and conceptually you can use any type so long as you can provide a serializer for it (BucketAssigner#getSerializer). The documentation is wrong in this instance.

The convenience Flink APIs (StreamingFileSink#forRowFormat/StreamingFileSink#forBulkFormat) default to Strings; but you can change the type by setting both an assigner and policy via "withBucketAssignerAndPolicy"; you should be able to use ".DefaultRollingPolicy.create().build()" as a default policy.

On 02/04/2019 20:18, Jeff Crane wrote:
According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) signature requiring a return of string (Flink 1.7 libs), so I still don't think the BucketID is a variable type.

I still don't understand the role of the:
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
Where does that come into play, if the getBucketID makes a string anyway?




On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane [hidden email] wrote:


I have had an issue understanding the documentation, in regard to BucketAssigner.
BucketID getBucketId(IN element,
                     BucketAssigner.Context context)
SimpleVersionedSerializer<BucketID> getSerializer()
First of all, I don't understand what type of "BucketID" means. I assume that's the returned type fo the getBucketID, which doesn't make sense. The description says getBucketId (returns?)  "A string representing the identifier of the bucket" So BucketID is not a type, it's always a string?
Base on the docs, I implemented like this, which doesn't write anything!
public final class CustomBucketAssigner implements BucketAssigner<MyEvent, String> {

    public String getBucketId(final MyEvent element, final Context context) {

        DateTime dateTimeL = new DateTime(context.currentWatermark());

        return String.join("_",
                String.valueOf(dateTimeL.getYear()),
                String.valueOf(dateTimeL.getMonthOfYear()),
                String.valueOf(dateTimeL.getDayOfMonth()),
                String.valueOf(dateTimeL.getHourOfDay()),
                String.valueOf(dateTimeL.getMinuteOfHour())
        );
    }

    // I assume <String> because BucketID is always string?
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
}

Can someone explain how bucketAssigned is supposed to do in plainer english. I don't think the docs are clear and I'm lost.