Chaining the creation of a WatermarkStrategy doesn't work?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Chaining the creation of a WatermarkStrategy doesn't work?

Niels Basjes
Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I find strange.

This works
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));
watermarkStrategy
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);
However this does NOT work
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);

When I try to compile this last one I get 

Error:(109, 13) java: no suitable method found for withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>)
    method org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>) is not applicable
      (argument mismatch; org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> cannot be converted to org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
    method org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>) is not applicable
      (argument mismatch; org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> cannot be converted to org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)

Why is that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Chesnay Schepler
WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES)) returns a WatermarkStrategy<T>, but the exact type is entirely dependent on the variable declaration (i.e., it is not dependent on any argument).

So, when you assign the strategy to a variable then the compiler can infer the generic type. Without a variable it is treated as a WatermarkStrategy<Object>, because there is nothing to infer the type from.

On 08/07/2020 08:54, Niels Basjes wrote:
Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I find strange.

This works
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));
watermarkStrategy
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);
However this does NOT work
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);

When I try to compile this last one I get 

Error:(109, 13) java: no suitable method found for withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>)
    method org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>) is not applicable
      (argument mismatch; org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> cannot be converted to org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
    method org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>) is not applicable
      (argument mismatch; org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> cannot be converted to org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)

Why is that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes


Reply | Threaded
Open this post in threaded view
|

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Tzu-Li (Gordon) Tai
In reply to this post by Niels Basjes
Hi,

This would be more of a Java question.
In short, type inference of generic types does not work for chained
invocations, and therefore type information has to be explicitly included.

If you'd like to chain the calls, this would work:

WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element,
recordTimestamp) -> 42L);

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Tzu-Li (Gordon) Tai
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Niels Basjes
Thanks guys,

It is clear this is a Java thing.

Niels

On Wed, Jul 8, 2020 at 9:56 AM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes