How to use properly the function: withTimestampAssigner((event, timestamp) ->..

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use properly the function: withTimestampAssigner((event, timestamp) ->..

SimAzz
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it.  My class is providing a long so the variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}





Reply | Threaded
Open this post in threaded view
|

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Till Rohrmann
Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].

The solution would be 

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

[hidden email] I think we need to update the documentation about it. We have some examples which don't take this into account.


Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it.  My class is providing a long so the variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}





Reply | Threaded
Open this post in threaded view
|

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

SimAzz
Hi Till,

That's great! thank you so much!!! I have spent one week on this. I'm so relieved!

Cheers

s



From: Till Rohrmann <[hidden email]>
Sent: 06 November 2020 17:56
To: Simone Cavallarin <[hidden email]>
Cc: [hidden email] <[hidden email]>; Aljoscha Krettek <[hidden email]>
Subject: Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..
 
Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].

The solution would be 

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

[hidden email] I think we need to update the documentation about it. We have some examples which don't take this into account.


Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it.  My class is providing a long so the variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}





Reply | Threaded
Open this post in threaded view
|

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Till Rohrmann
Glad to hear it!

Cheers,
Till

On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <[hidden email]> wrote:
Hi Till,

That's great! thank you so much!!! I have spent one week on this. I'm so relieved!

Cheers

s



From: Till Rohrmann <[hidden email]>
Sent: 06 November 2020 17:56
To: Simone Cavallarin <[hidden email]>
Cc: [hidden email] <[hidden email]>; Aljoscha Krettek <[hidden email]>
Subject: Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..
 
Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].

The solution would be 

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

[hidden email] I think we need to update the documentation about it. We have some examples which don't take this into account.


Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it.  My class is providing a long so the variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}





Reply | Threaded
Open this post in threaded view
|

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Aljoscha Krettek
@Till For instances where we use withTimestampAssigner() the examples in
the docs always use the explicit generic parameter. (See
event_timestamps_watermarks.md and streaming_analytics.md). For cases
where we don't use withTimestampAssigner() we don't need the extra
generic parameter because the compiler can figure it out.

But yes, the Java compiler is not very helpful here... 😅


Best,
Aljoscha

On 09.11.20 09:35, Till Rohrmann wrote:

> Glad to hear it!
>
> Cheers,
> Till
>
> On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <[hidden email]>
> wrote:
>
>> Hi Till,
>>
>> That's great! thank you so much!!! I have spent one week on this. I'm so
>> relieved!
>>
>> Cheers
>>
>> s
>>
>>
>> ------------------------------
>> *From:* Till Rohrmann <[hidden email]>
>> *Sent:* 06 November 2020 17:56
>> *To:* Simone Cavallarin <[hidden email]>
>> *Cc:* [hidden email] <[hidden email]>; Aljoscha Krettek <
>> [hidden email]>
>> *Subject:* Re: How to use properly the function:
>> withTimestampAssigner((event, timestamp) ->..
>>
>> Hi Simone,
>>
>> The problem is that the Java 1.8 compiler cannot do type inference when
>> chaining methods [1].
>>
>> The solution would be
>>
>> WatermarkStrategy<Event> wmStrategy =
>>                  WatermarkStrategy
>>                          .<Event>forMonotonousTimestamps()
>>                          .withTimestampAssigner((event, timestamp) -> {
>> return event.getTime();
>>                          });
>>
>> @Aljoscha Krettek <[hidden email]> I think we need to update the
>> documentation about it. We have some examples which don't take this into
>> account.
>>
>> [1]
>> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]>
>> wrote:
>>
>> Hi,
>>
>> I'm taking the timestamp from the event payload that I'm receiving from
>> Kafka.
>>
>> I'm struggling to get the time and I'm confused on how I should use the
>> function ".withTimestampAssigner()". I'm receiving an error on event.
>> getTime() that is telling me: *"cannot resolve method "Get Time" in
>> "Object"* and I really don't understand how I can fix it.  My class is
>> providing a long so the variable itself should be fine. Any help would be
>> really appreciated.
>>
>> *This is my code:*
>>
>> *    FlinkKafkaConsumer<Event> kafkaData =*
>> *                new FlinkKafkaConsumer("CorID_0", new
>> EventDeserializationSchema(), p);*
>> *        WatermarkStrategy<Event> wmStrategy =*
>> *                WatermarkStrategy*
>> *                        .forMonotonousTimestamps()*
>> *                        .withTimestampAssigner((event, timestamp) -> {
>> return event.**getTime();*
>> *                        });*
>>
>> *        DataStream<Event> stream = env.addSource(*
>> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>>
>> And to give you the idea of the whole project,
>>
>> *This is the EventDeserializationSchema class:*
>>
>> *public class EventDeserializationSchema implements
>> DeserializationSchema<Event> {*
>>
>> *    private static final long serialVersionUID = 1L;*
>>
>>
>> *    private static final CsvSchema schema = CsvSchema.builder()*
>> *            .addColumn("firstName")*
>> *            .addColumn("lastName")*
>> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
>> *            .addColumn("time")*
>> *            .build();*
>>
>> *    private static final ObjectMapper mapper = new CsvMapper();*
>>
>> *    @Override*
>> *    public Event deserialize(byte[] message) throws IOException {*
>> *        return
>> mapper.readerFor(Event.class).with(schema).readValue(message);*
>> *    }*
>>
>> *    @Override*
>> *    public boolean isEndOfStream(Event nextElement) {*
>> *        return false;*
>> *    }*
>>
>> *    @Override*
>> *    public TypeInformation<Event> getProducedType() {*
>>
>> *        return TypeInformation.of(Event.class);*
>> *    }*
>> *}*
>>
>> *And this is the Event Class:*
>>
>> *public class Event implements Serializable {*
>> *    public String firstName;*
>> *    public String lastName;*
>> *    private int age;*
>> *    public Long time;*
>>
>>
>>
>> *    public Event() {*
>> *    }*
>>
>> *    public String getFirstName() {*
>> *        return firstName;*
>> *    }*
>>
>> *    public void setFirstName(String firstName) {*
>> *        this.firstName = firstName;*
>> *    }*
>>
>> *    public String getLastName() {*
>> *        return lastName;*
>> *    }*
>>
>> *    public void setLastName(String lastName) {*
>> *        this.lastName = lastName;*
>> *    }*
>>
>> *    public int getAge() {*
>> *        return age;*
>> *    }*
>>
>> *    public void setAge(int age) {*
>> *        this.age = age;*
>> *    }*
>>
>> *    public long getTime() {*
>> *        return time;*
>> *    }*
>>
>> *    public void setTime(String kafkaTime) {*
>> *        long tn =
>> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
>> *        this.time = tn;*
>> *    }*
>> *}*
>>
>>
>>
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Till Rohrmann
I think in the JavaDocs of the WatermarkStrategy we give an incorrect example. I have created an issue [1] to fix the problem.


On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek <[hidden email]> wrote:
@Till For instances where we use withTimestampAssigner() the examples in
the docs always use the explicit generic parameter. (See
event_timestamps_watermarks.md and streaming_analytics.md). For cases
where we don't use withTimestampAssigner() we don't need the extra
generic parameter because the compiler can figure it out.

But yes, the Java compiler is not very helpful here... 😅


Best,
Aljoscha

On 09.11.20 09:35, Till Rohrmann wrote:
> Glad to hear it!
>
> Cheers,
> Till
>
> On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <[hidden email]>
> wrote:
>
>> Hi Till,
>>
>> That's great! thank you so much!!! I have spent one week on this. I'm so
>> relieved!
>>
>> Cheers
>>
>> s
>>
>>
>> ------------------------------
>> *From:* Till Rohrmann <[hidden email]>
>> *Sent:* 06 November 2020 17:56
>> *To:* Simone Cavallarin <[hidden email]>
>> *Cc:* [hidden email] <[hidden email]>; Aljoscha Krettek <
>> [hidden email]>
>> *Subject:* Re: How to use properly the function:
>> withTimestampAssigner((event, timestamp) ->..
>>
>> Hi Simone,
>>
>> The problem is that the Java 1.8 compiler cannot do type inference when
>> chaining methods [1].
>>
>> The solution would be
>>
>> WatermarkStrategy<Event> wmStrategy =
>>                  WatermarkStrategy
>>                          .<Event>forMonotonousTimestamps()
>>                          .withTimestampAssigner((event, timestamp) -> {
>> return event.getTime();
>>                          });
>>
>> @Aljoscha Krettek <[hidden email]> I think we need to update the
>> documentation about it. We have some examples which don't take this into
>> account.
>>
>> [1]
>> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]>
>> wrote:
>>
>> Hi,
>>
>> I'm taking the timestamp from the event payload that I'm receiving from
>> Kafka.
>>
>> I'm struggling to get the time and I'm confused on how I should use the
>> function ".withTimestampAssigner()". I'm receiving an error on event.
>> getTime() that is telling me: *"cannot resolve method "Get Time" in
>> "Object"* and I really don't understand how I can fix it.  My class is
>> providing a long so the variable itself should be fine. Any help would be
>> really appreciated.
>>
>> *This is my code:*
>>
>> *    FlinkKafkaConsumer<Event> kafkaData =*
>> *                new FlinkKafkaConsumer("CorID_0", new
>> EventDeserializationSchema(), p);*
>> *        WatermarkStrategy<Event> wmStrategy =*
>> *                WatermarkStrategy*
>> *                        .forMonotonousTimestamps()*
>> *                        .withTimestampAssigner((event, timestamp) -> {
>> return event.**getTime();*
>> *                        });*
>>
>> *        DataStream<Event> stream = env.addSource(*
>> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>>
>> And to give you the idea of the whole project,
>>
>> *This is the EventDeserializationSchema class:*
>>
>> *public class EventDeserializationSchema implements
>> DeserializationSchema<Event> {*
>>
>> *    private static final long serialVersionUID = 1L;*
>>
>>
>> *    private static final CsvSchema schema = CsvSchema.builder()*
>> *            .addColumn("firstName")*
>> *            .addColumn("lastName")*
>> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
>> *            .addColumn("time")*
>> *            .build();*
>>
>> *    private static final ObjectMapper mapper = new CsvMapper();*
>>
>> *    @Override*
>> *    public Event deserialize(byte[] message) throws IOException {*
>> *        return
>> mapper.readerFor(Event.class).with(schema).readValue(message);*
>> *    }*
>>
>> *    @Override*
>> *    public boolean isEndOfStream(Event nextElement) {*
>> *        return false;*
>> *    }*
>>
>> *    @Override*
>> *    public TypeInformation<Event> getProducedType() {*
>>
>> *        return TypeInformation.of(Event.class);*
>> *    }*
>> *}*
>>
>> *And this is the Event Class:*
>>
>> *public class Event implements Serializable {*
>> *    public String firstName;*
>> *    public String lastName;*
>> *    private int age;*
>> *    public Long time;*
>>
>>
>>
>> *    public Event() {*
>> *    }*
>>
>> *    public String getFirstName() {*
>> *        return firstName;*
>> *    }*
>>
>> *    public void setFirstName(String firstName) {*
>> *        this.firstName = firstName;*
>> *    }*
>>
>> *    public String getLastName() {*
>> *        return lastName;*
>> *    }*
>>
>> *    public void setLastName(String lastName) {*
>> *        this.lastName = lastName;*
>> *    }*
>>
>> *    public int getAge() {*
>> *        return age;*
>> *    }*
>>
>> *    public void setAge(int age) {*
>> *        this.age = age;*
>> *    }*
>>
>> *    public long getTime() {*
>> *        return time;*
>> *    }*
>>
>> *    public void setTime(String kafkaTime) {*
>> *        long tn =
>> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
>> *        this.time = tn;*
>> *    }*
>> *}*
>>
>>
>>
>>
>>
>>
>