Hi,
I'm taking the timestamp from the event payload that I'm receiving from Kafka.
I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me:
"cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it. My class is providing a long so the variable itself should be fine. Any help would be really appreciated.
This is my code:
FlinkKafkaConsumer<Event> kafkaData =
new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> { return event.getTime();
});
DataStream<Event> stream = env.addSource(
kafkaData.assignTimestampsAndWatermarks(wmStrategy));
And to give you the idea of the whole project,
This is the EventDeserializationSchema class:
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
And this is the Event Class:
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public Long time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public long getTime() {
return time;
}
public void setTime(String kafkaTime) {
long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
this.time = tn;
}
}
|
Hi Simone, The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1]. The solution would be WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); [hidden email] I think we need to update the documentation about it. We have some examples which don't take this into account. Cheers, Till On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> wrote:
|
Hi Till,
That's great! thank you so much!!! I have spent one week on this. I'm so relieved!
Cheers
s
From: Till Rohrmann <[hidden email]>
Sent: 06 November 2020 17:56 To: Simone Cavallarin <[hidden email]> Cc: [hidden email] <[hidden email]>; Aljoscha Krettek <[hidden email]> Subject: Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->.. Hi Simone,
The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].
The solution would be
WatermarkStrategy<Event> wmStrategy =
WatermarkStrategy .<Event>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> { return event.getTime(); }); [hidden email] I think we need to update the documentation about it. We have some examples which don't take this into account.
Cheers,
Till
On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> wrote:
|
Glad to hear it! Cheers, Till On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <[hidden email]> wrote:
|
@Till For instances where we use withTimestampAssigner() the examples in
the docs always use the explicit generic parameter. (See event_timestamps_watermarks.md and streaming_analytics.md). For cases where we don't use withTimestampAssigner() we don't need the extra generic parameter because the compiler can figure it out. But yes, the Java compiler is not very helpful here... 😅 Best, Aljoscha On 09.11.20 09:35, Till Rohrmann wrote: > Glad to hear it! > > Cheers, > Till > > On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <[hidden email]> > wrote: > >> Hi Till, >> >> That's great! thank you so much!!! I have spent one week on this. I'm so >> relieved! >> >> Cheers >> >> s >> >> >> ------------------------------ >> *From:* Till Rohrmann <[hidden email]> >> *Sent:* 06 November 2020 17:56 >> *To:* Simone Cavallarin <[hidden email]> >> *Cc:* [hidden email] <[hidden email]>; Aljoscha Krettek < >> [hidden email]> >> *Subject:* Re: How to use properly the function: >> withTimestampAssigner((event, timestamp) ->.. >> >> Hi Simone, >> >> The problem is that the Java 1.8 compiler cannot do type inference when >> chaining methods [1]. >> >> The solution would be >> >> WatermarkStrategy<Event> wmStrategy = >> WatermarkStrategy >> .<Event>forMonotonousTimestamps() >> .withTimestampAssigner((event, timestamp) -> { >> return event.getTime(); >> }); >> >> @Aljoscha Krettek <[hidden email]> I think we need to update the >> documentation about it. We have some examples which don't take this into >> account. >> >> [1] >> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/ >> >> Cheers, >> Till >> >> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <[hidden email]> >> wrote: >> >> Hi, >> >> I'm taking the timestamp from the event payload that I'm receiving from >> Kafka. >> >> I'm struggling to get the time and I'm confused on how I should use the >> function ".withTimestampAssigner()". I'm receiving an error on event. >> getTime() that is telling me: *"cannot resolve method "Get Time" in >> "Object"* and I really don't understand how I can fix it. My class is >> providing a long so the variable itself should be fine. Any help would be >> really appreciated. >> >> *This is my code:* >> >> * FlinkKafkaConsumer<Event> kafkaData =* >> * new FlinkKafkaConsumer("CorID_0", new >> EventDeserializationSchema(), p);* >> * WatermarkStrategy<Event> wmStrategy =* >> * WatermarkStrategy* >> * .forMonotonousTimestamps()* >> * .withTimestampAssigner((event, timestamp) -> { >> return event.**getTime();* >> * });* >> >> * DataStream<Event> stream = env.addSource(* >> * kafkaData.assignTimestampsAndWatermarks(wmStrategy));* >> >> >> And to give you the idea of the whole project, >> >> *This is the EventDeserializationSchema class:* >> >> *public class EventDeserializationSchema implements >> DeserializationSchema<Event> {* >> >> * private static final long serialVersionUID = 1L;* >> >> >> * private static final CsvSchema schema = CsvSchema.builder()* >> * .addColumn("firstName")* >> * .addColumn("lastName")* >> * .addColumn("age", CsvSchema.ColumnType.NUMBER)* >> * .addColumn("time")* >> * .build();* >> >> * private static final ObjectMapper mapper = new CsvMapper();* >> >> * @Override* >> * public Event deserialize(byte[] message) throws IOException {* >> * return >> mapper.readerFor(Event.class).with(schema).readValue(message);* >> * }* >> >> * @Override* >> * public boolean isEndOfStream(Event nextElement) {* >> * return false;* >> * }* >> >> * @Override* >> * public TypeInformation<Event> getProducedType() {* >> >> * return TypeInformation.of(Event.class);* >> * }* >> *}* >> >> *And this is the Event Class:* >> >> *public class Event implements Serializable {* >> * public String firstName;* >> * public String lastName;* >> * private int age;* >> * public Long time;* >> >> >> >> * public Event() {* >> * }* >> >> * public String getFirstName() {* >> * return firstName;* >> * }* >> >> * public void setFirstName(String firstName) {* >> * this.firstName = firstName;* >> * }* >> >> * public String getLastName() {* >> * return lastName;* >> * }* >> >> * public void setLastName(String lastName) {* >> * this.lastName = lastName;* >> * }* >> >> * public int getAge() {* >> * return age;* >> * }* >> >> * public void setAge(int age) {* >> * this.age = age;* >> * }* >> >> * public long getTime() {* >> * return time;* >> * }* >> >> * public void setTime(String kafkaTime) {* >> * long tn = >> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();* >> * this.time = tn;* >> * }* >> *}* >> >> >> >> >> >> > |
I think in the JavaDocs of the WatermarkStrategy we give an incorrect example. I have created an issue [1] to fix the problem. On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek <[hidden email]> wrote: @Till For instances where we use withTimestampAssigner() the examples in |
Free forum by Nabble | Edit this page |