Implicit inference of TypeInformation for join keys

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Implicit inference of TypeInformation for join keys

Timur Fayruzov
Hello,

Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
.where(e => e.f1)
//.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
.equalTo("f1") {
(left, right) => 1
}
However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
.where(e => (e.f1, e.f2))
.equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).

Now, I can split this in 2 operations like below:  
val tmp = a.coGroup(b)
.where(e => (e.f1, e.f2))
.equalTo(e => (e.f1, e.f2))

tmp { (left, right) => 1}
but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.

As an option, I can hash the hell out of my keys like that:
a.coGroup(b)
.where(e => (e.f1, e.f2).hashCode)
.equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).

Any insights on what is the way to go here are highly appreciated.

Thanks,
Timur
Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Chiwan Park-2
Hi Timur,

You can use a composite key [1] to compare keys consisting of multiple fields. For example:

```
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
  .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
    (left, right) => 1
  }
```

Composite key can be applied to Scala tuple also:

```
val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
a.coGroup(b)
  .where(0, 1) // Note that field numbers start from 0.
  .equalTo(0, 1) {
    (left, right) => 1
  }
```

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples

Regards,
Chiwan Park

> On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Hello,
>
> Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => e.f1)
>   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
>   .equalTo("f1") {
>     (left, right) => 1
>   }
> However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
>
> Now, I can split this in 2 operations like below:  
> val tmp = a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))
>
> tmp { (left, right) => 1}
> but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
>
> As an option, I can hash the hell out of my keys like that:
> a.coGroup(b)
>   .where(e => (e.f1, e.f2).hashCode)
>   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
>
> Any insights on what is the way to go here are highly appreciated.
>
> Thanks,
> Timur

Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Timur Fayruzov
Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work.

BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception.

Thanks,
Timur


On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <[hidden email]> wrote:
Hi Timur,

You can use a composite key [1] to compare keys consisting of multiple fields. For example:

```
val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
  .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
    (left, right) => 1
  }
```

Composite key can be applied to Scala tuple also:

```
val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
a.coGroup(b)
  .where(0, 1) // Note that field numbers start from 0.
  .equalTo(0, 1) {
    (left, right) => 1
  }
```

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples

Regards,
Chiwan Park

> On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Hello,
>
> Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => e.f1)
>   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
>   .equalTo("f1") {
>     (left, right) => 1
>   }
> However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
>
> Now, I can split this in 2 operations like below:
> val tmp = a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))
>
> tmp { (left, right) => 1}
> but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
>
> As an option, I can hash the hell out of my keys like that:
> a.coGroup(b)
>   .where(e => (e.f1, e.f2).hashCode)
>   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
>
> Any insights on what is the way to go here are highly appreciated.
>
> Thanks,
> Timur


Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Chiwan Park-2
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector, original value). So there is some performance decrease when you are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work.
>
> BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception.
>
> Thanks,
> Timur
>
>
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <[hidden email]> wrote:
> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Timur Fayruzov
Actually, there is an even easier solution (which I saw in your reply to my other question):
```
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2)).apply {
    (left, right) => 1
  }.print()
```
pretty much does what I want. Explicit `apply` gives a hint that a compiler was missing before. Nevertheless, `createTypeInformation` works too, thanks for sharing!

Thanks,
Timur

On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <[hidden email]> wrote:
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector, original value). So there is some performance decrease when you are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work.
>
> BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception.
>
> Thanks,
> Timur
>
>
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <[hidden email]> wrote:
> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Aljoscha Krettek
I'm afraid there is no way around having that extra ".apply" because the Scala compiler will get confused with the additional implicit parameter. It's a bit ugly, though ...

On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <[hidden email]> wrote:
Actually, there is an even easier solution (which I saw in your reply to my other question):
```
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2)).apply {

    (left, right) => 1
  }.print()
```
pretty much does what I want. Explicit `apply` gives a hint that a compiler was missing before. Nevertheless, `createTypeInformation` works too, thanks for sharing!

Thanks,
Timur

On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <[hidden email]> wrote:
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector, original value). So there is some performance decrease when you are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work.
>
> BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception.
>
> Thanks,
> Timur
>
>
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <[hidden email]> wrote:
> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Implicit inference of TypeInformation for join keys

Timur Fayruzov
I'm very content with an extra `apply`, it's much cleaner than any of my initial solutions.

On Thu, Mar 31, 2016 at 2:18 AM, Aljoscha Krettek <[hidden email]> wrote:
I'm afraid there is no way around having that extra ".apply" because the Scala compiler will get confused with the additional implicit parameter. It's a bit ugly, though ...

On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <[hidden email]> wrote:
Actually, there is an even easier solution (which I saw in your reply to my other question):
```
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2)).apply {

    (left, right) => 1
  }.print()
```
pretty much does what I want. Explicit `apply` gives a hint that a compiler was missing before. Nevertheless, `createTypeInformation` works too, thanks for sharing!

Thanks,
Timur

On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <[hidden email]> wrote:
Hi Timur,

You have to use `createTypeInfomation` method in `org.apache.flink.api` package to create TypeInformation object for Scala-specific objects such as case classes, tuples, eithers, options. For example:

```
import org.apache.flink.api.scala._ // to import package object

val a: DataSet[Thing] = …
val b: DataSet[Thing] = …

a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
    (left, right) => 1
  }.print()
```

Note that Flink creates internally copied 2-tuples consisted of (extracted key by KeySelector, original value). So there is some performance decrease when you are using KeySelector.

Regards,
Chiwan Park

> On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <[hidden email]> wrote:
>
> Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work.
>
> BTW, TypeInformation.of has an implementation that takes TypeHint (https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception.
>
> Thanks,
> Timur
>
>
> On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <[hidden email]> wrote:
> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
>     (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <[hidden email]> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit
> >   .equalTo("f1") {
> >     (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType<scala.Tuple2>, while scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>