Hello,
I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. Thanks, Timur |
There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. Thanks, Timur On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <[hidden email]> wrote:
|
Hi Timur,
Because Option[T] is not comparable type generally (if T is a POJO type), you cannot use Option[T] as a key type. I think you have to implement KeyExtractor to compare objects including Option[T]s. ``` case class MyKey(k1: Option[String], k2: Option[String]) val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), Some("c"))) val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), Some("c"))) data1.join(data2) .where(_.hashCode()) .equalTo(_.hashCode()).apply { (left: MyKey, right: MyKey) => (left, right) }.print() ``` Note that the approach in example (using hashCode()) cannot be applied to sort task. Regards, Chiwan Park > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <[hidden email]> wrote: > > There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. > > Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. > > Thanks, > Timur > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <[hidden email]> wrote: > Hello, > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. > > Thanks, > Timur > |
Thank you for your answers, Chiwan! That would mean that a generic type can't be used as a key in general? This is a non-obvious limitation of Flink DSL that I didn't see in documentation. Could you please elaborate what you mean by KeyExtractor? I see that inside `where` operator an instance of KeySelector is initialized, but I don't see how can I pass a custom KeySelector in. Thanks, Timur On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <[hidden email]> wrote: Hi Timur, |
Hi Timur,
Sorry for confusing. I meant KeySelector. `GenericType<T>` could be used as a key type if the `T` implements `Comparable`. For example, `GenericType<Integer>` could be used as a key type but `GenericType<scala.Tuple2>` could not. About my example in previous mail, the type of key is `Int` because the return type of KeySelector is `Int`. `TypeInformation<Int>` is not generic type. Regards, Chiwan Park > On Mar 31, 2016, at 1:09 AM, Timur Fayruzov <[hidden email]> wrote: > > Thank you for your answers, Chiwan! That would mean that a generic type can't be used as a key in general? This is a non-obvious limitation of Flink DSL that I didn't see in documentation. > > Could you please elaborate what you mean by KeyExtractor? I see that inside `where` operator an instance of KeySelector is initialized, but I don't see how can I pass a custom KeySelector in. > > Thanks, > Timur > > On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <[hidden email]> wrote: > Hi Timur, > > Because Option[T] is not comparable type generally (if T is a POJO type), you cannot use Option[T] as a key type. I think you have to implement KeyExtractor to compare objects including Option[T]s. > > ``` > case class MyKey(k1: Option[String], k2: Option[String]) > > val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), Some("c"))) > val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), Some("c"))) > > data1.join(data2) > .where(_.hashCode()) > .equalTo(_.hashCode()).apply { > (left: MyKey, right: MyKey) => (left, right) > }.print() > ``` > > Note that the approach in example (using hashCode()) cannot be applied to sort task. > > Regards, > Chiwan Park > > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <[hidden email]> wrote: > > > > There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. > > > > Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. > > > > Thanks, > > Timur > > > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <[hidden email]> wrote: > > Hello, > > > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. > > > > Thanks, > > Timur > > > > |
Ok, I can't make Option comparable, so the only way that I see is to translate a key to a Comparable data structure and use it (as it was alluded to in your example above). Thank you for clarification! Thanks, Timur On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park <[hidden email]> wrote: Hi Timur, |
Actually I think that it’s not correct that the @Timur, if you want, then you can file a JIRA issue to add that. Cheers, On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov <[hidden email]> wrote:
|
I just found that Timur created a JIRA issue for this (FLINK-3698).
Regards, Chiwan Park > On Mar 31, 2016, at 7:27 PM, Till Rohrmann <[hidden email]> wrote: > > Actually I think that it’s not correct that the OptionType cannot be used as a key type. In fact it is similar to a composite type and should be usable as a key iff it’s element can be used as a key. Then we only have to provide an OptionTypeComparator which will compare the elements if they are set. If not, then the None element will be smaller, for example. > > @Timur, if you want, then you can file a JIRA issue to add that. > > Cheers, > Till > > > On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov <[hidden email]> wrote: > Ok, I can't make Option comparable, so the only way that I see is to translate a key to a Comparable data structure and use it (as it was alluded to in your example above). Thank you for clarification! > > Thanks, > Timur > > On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park <[hidden email]> wrote: > Hi Timur, > > Sorry for confusing. I meant KeySelector. > > `GenericType<T>` could be used as a key type if the `T` implements `Comparable`. For example, `GenericType<Integer>` could be used as a key type but `GenericType<scala.Tuple2>` could not. > > About my example in previous mail, the type of key is `Int` because the return type of KeySelector is `Int`. `TypeInformation<Int>` is not generic type. > > Regards, > Chiwan Park > > > On Mar 31, 2016, at 1:09 AM, Timur Fayruzov <[hidden email]> wrote: > > > > Thank you for your answers, Chiwan! That would mean that a generic type can't be used as a key in general? This is a non-obvious limitation of Flink DSL that I didn't see in documentation. > > > > Could you please elaborate what you mean by KeyExtractor? I see that inside `where` operator an instance of KeySelector is initialized, but I don't see how can I pass a custom KeySelector in. > > > > Thanks, > > Timur > > > > On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <[hidden email]> wrote: > > Hi Timur, > > > > Because Option[T] is not comparable type generally (if T is a POJO type), you cannot use Option[T] as a key type. I think you have to implement KeyExtractor to compare objects including Option[T]s. > > > > ``` > > case class MyKey(k1: Option[String], k2: Option[String]) > > > > val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), Some("c"))) > > val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), Some("c"))) > > > > data1.join(data2) > > .where(_.hashCode()) > > .equalTo(_.hashCode()).apply { > > (left: MyKey, right: MyKey) => (left, right) > > }.print() > > ``` > > > > Note that the approach in example (using hashCode()) cannot be applied to sort task. > > > > Regards, > > Chiwan Park > > > > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <[hidden email]> wrote: > > > > > > There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. > > > > > > Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. > > > > > > Thanks, > > > Timur > > > > > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <[hidden email]> wrote: > > > Hello, > > > > > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. > > > > > > Thanks, > > > Timur > > > > > > > > > > |
I’ve merged a patch [1] for this issue. Now we can use Option as a key.
[1]: https://git-wip-us.apache.org/repos/asf?p=flink.git;a=commit;h=c60326f85faaa38bcc359d555cd2d2818ef2e4e7 Regards, Chiwan Park > On Apr 5, 2016, at 2:08 PM, Chiwan Park <[hidden email]> wrote: > > I just found that Timur created a JIRA issue for this (FLINK-3698). > > Regards, > Chiwan Park > >> On Mar 31, 2016, at 7:27 PM, Till Rohrmann <[hidden email]> wrote: >> >> Actually I think that it’s not correct that the OptionType cannot be used as a key type. In fact it is similar to a composite type and should be usable as a key iff it’s element can be used as a key. Then we only have to provide an OptionTypeComparator which will compare the elements if they are set. If not, then the None element will be smaller, for example. >> >> @Timur, if you want, then you can file a JIRA issue to add that. >> >> Cheers, >> Till >> >> >> On Wed, Mar 30, 2016 at 7:17 PM, Timur Fayruzov <[hidden email]> wrote: >> Ok, I can't make Option comparable, so the only way that I see is to translate a key to a Comparable data structure and use it (as it was alluded to in your example above). Thank you for clarification! >> >> Thanks, >> Timur >> >> On Wed, Mar 30, 2016 at 9:22 AM, Chiwan Park <[hidden email]> wrote: >> Hi Timur, >> >> Sorry for confusing. I meant KeySelector. >> >> `GenericType<T>` could be used as a key type if the `T` implements `Comparable`. For example, `GenericType<Integer>` could be used as a key type but `GenericType<scala.Tuple2>` could not. >> >> About my example in previous mail, the type of key is `Int` because the return type of KeySelector is `Int`. `TypeInformation<Int>` is not generic type. >> >> Regards, >> Chiwan Park >> >>> On Mar 31, 2016, at 1:09 AM, Timur Fayruzov <[hidden email]> wrote: >>> >>> Thank you for your answers, Chiwan! That would mean that a generic type can't be used as a key in general? This is a non-obvious limitation of Flink DSL that I didn't see in documentation. >>> >>> Could you please elaborate what you mean by KeyExtractor? I see that inside `where` operator an instance of KeySelector is initialized, but I don't see how can I pass a custom KeySelector in. >>> >>> Thanks, >>> Timur >>> >>> On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <[hidden email]> wrote: >>> Hi Timur, >>> >>> Because Option[T] is not comparable type generally (if T is a POJO type), you cannot use Option[T] as a key type. I think you have to implement KeyExtractor to compare objects including Option[T]s. >>> >>> ``` >>> case class MyKey(k1: Option[String], k2: Option[String]) >>> >>> val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), Some("c"))) >>> val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), Some("c"))) >>> >>> data1.join(data2) >>> .where(_.hashCode()) >>> .equalTo(_.hashCode()).apply { >>> (left: MyKey, right: MyKey) => (left, right) >>> }.print() >>> ``` >>> >>> Note that the approach in example (using hashCode()) cannot be applied to sort task. >>> >>> Regards, >>> Chiwan Park >>> >>>> On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <[hidden email]> wrote: >>>> >>>> There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. >>>> >>>> Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. >>>> >>>> Thanks, >>>> Timur >>>> >>>> On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <[hidden email]> wrote: >>>> Hello, >>>> >>>> I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. >>>> >>>> Thanks, >>>> Timur >>>> >>> >>> >> >> >> > |
Free forum by Nabble | Edit this page |