Partitioning by composite key, But type and number of keys are dynamic

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Partitioning by composite key, But type and number of keys are dynamic

Gaurav Luthra
There is a data stream of some records, Lets call them "input records".
Now, I want to partition this data stream by using keyBy(). I want
partitioning based on one or more fields of "input record", But the number
and type of fields are not fixed.
So, Kindly tell me how should I achieve this partitioning based on "input
records" mentioned above?

Note: Technically, I am using Avro's GenericRecord as "input records". Means
I am using DataStream<GenericRecord>, which needs to be partitioned. And its
schema can be different for different jobs. So, I do not know the field
names and types to be provided in keyBy().



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning by composite key, But type and number of keys are dynamic

Chesnay Schepler
How do you determine which fields you want to use if you don't know the
names and types beforehand?

I would wrap the GenericRecord in my own type, implements the field
selection logic in hashCode/equals,
and unwrap them again in your functions.

On 14.11.2018 10:57, Gaurav Luthra wrote:

> There is a data stream of some records, Lets call them "input records".
> Now, I want to partition this data stream by using keyBy(). I want
> partitioning based on one or more fields of "input record", But the number
> and type of fields are not fixed.
> So, Kindly tell me how should I achieve this partitioning based on "input
> records" mentioned above?
>
> Note: Technically, I am using Avro's GenericRecord as "input records". Means
> I am using DataStream<GenericRecord>, which needs to be partitioned. And its
> schema can be different for different jobs. So, I do not know the field
> names and types to be provided in keyBy().
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning by composite key, But type and number of keys are dynamic

Gaurav Luthra
Hi Chesnay,

My End user will be aware about the fields of "input records"
(GenericRecord). In configuration my end user only will tell me the name and
number of the fields, based on these fields of GenericRecord I will have to
partition the DataStream and make Keyed Stream.

Currently, I have implemented my solution using KeySelector() function. And
I have converted all the fields mentioned by user into strings, concatenated
these strings (for all the fields for which user want keyBy() means user
wants partitioning) and returned single string from KeySelector() function.
And partitioning will be happening based on this concatenated string.
See the example below.

                dataStream.keyBy(record ->
                {
                    return
record.get("area").toString()+record.get("age").toString();
                });


But, I am looking for better solution. As I do not want to convert every
field to string.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Partitioning by composite key, But type and number of keys are dynamic

Chesnay Schepler
Why don't you calculate the hashCode for each field, combine them and use that as the key?
You won't get around calculating something for each field and combining the result.

On 15.11.2018 11:02, Gaurav Luthra wrote:
Hi Chesnay,

My End user will be aware about the fields of "input records"
(GenericRecord). In configuration my end user only will tell me the name and
number of the fields, based on these fields of GenericRecord I will have to
partition the DataStream and make Keyed Stream.

Currently, I have implemented my solution using KeySelector() function. And
I have converted all the fields mentioned by user into strings, concatenated
these strings (for all the fields for which user want keyBy() means user
wants partitioning) and returned single string from KeySelector() function. 
And partitioning will be happening based on this concatenated string.
See the example below.

                dataStream.keyBy(record ->
                {
                    return
record.get("area").toString()+record.get("age").toString();
                });


But, I am looking for better solution. As I do not want to convert every
field to string.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/