ingesting time for TimeCharacteristic.IngestionTime on unit test

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ingesting time for TimeCharacteristic.IngestionTime on unit test

avilevi
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Andrey Zagrebin-3
Hi Avi,

do you use processing time timer (timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <[hidden email]> wrote:
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

avilevi
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <[hidden email]> wrote:
Hi Andrey,
I am testing a Filter operator that receives a key from the stream and checks if it is a new one or not. if it is new it keeps it in state and fire a timer all that is done using the ProcessFunction.
The testing is using some CollectSink as described here and the source is implementation of the SourceFunction that accepts a collection of values and adds it to ctx.collect .
The ctx.timestamp() is null, BUT even if I set the timer to sometime in the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + x) the timer is fired immediately.


On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

what is the structure of your unit test? do you create some source and then apply function or you test only ProcessFunction methods in isolation?
does ctx.timestamp() return zero or which value?

Best,
Andrey


On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <[hidden email]> wrote:
Hi Andrey ,
I'm using IngestionTime 
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

This is my timer in the processElement:
   val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
   ctx.timerService.registerProcessingTimeTimer(nextTim)

The problem is how do I use it in my unit tests ? since there is no IngestionTime and timers are fired immediately so the timers actions (such as state cleanup) are fired before time and causing the tests to fail .




On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

do you use processing time timer (timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <[hidden email]> wrote:
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Kostas Kloudas-2
Hi Avi,

Just to verify your ITCase, I wrote the following dummy example and it seems to be "working" (ie. I can see non null timestamps and timers firing).


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);

env
.addSource(new LongSource())
.keyBy(elmnt -> elmnt)
.process(new KeyedProcessFunction<Long, Long, Long>() {

@Override
public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {


long timestamp = ctx.timestamp();
long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();

System.out.println(ctx.timestamp() + " " + timerTimestamp);

ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
}
}).print();
env.execute();
The source is:

private static final class LongSource implements SourceFunction<Long> {

private volatile boolean running = true;

private long element = 0L;

@Override
public void run(SourceContext<java.lang.Long> ctx) throws Exception {
while (running) {
ctx.collect(element++ % 10);
Thread.sleep(10L);
}
}

@Override
public void cancel() {

}
}

Could you provide more details on how your usecase differs from the above dummy example so that we can pin down the problem?

As a side-note, Ingestion time is essentially event time, with the only difference that the timestamp assigner in the beginning gives each element
the timestamp System.currentTimeMillis. So in this case, maybe you could also consider setting event time timers but keep in mind then your 
Watermark emission interval.

In addition, if you want to simply check processing time processing of you operator (not the whole pipeline), then you could make use of the 
OneInputStreamTaskTestHarness or its keyed variant. This allows you to provide your own processing time provider thus allow you to deterministically
test processing time behaviour.

Cheers,
Kostas



On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <[hidden email]> wrote:
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <[hidden email]> wrote:
Hi Andrey,
I am testing a Filter operator that receives a key from the stream and checks if it is a new one or not. if it is new it keeps it in state and fire a timer all that is done using the ProcessFunction.
The testing is using some CollectSink as described here and the source is implementation of the SourceFunction that accepts a collection of values and adds it to ctx.collect .
The ctx.timestamp() is null, BUT even if I set the timer to sometime in the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + x) the timer is fired immediately.


On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

what is the structure of your unit test? do you create some source and then apply function or you test only ProcessFunction methods in isolation?
does ctx.timestamp() return zero or which value?

Best,
Andrey


On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <[hidden email]> wrote:
Hi Andrey ,
I'm using IngestionTime 
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

This is my timer in the processElement:
   val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
   ctx.timerService.registerProcessingTimeTimer(nextTim)

The problem is how do I use it in my unit tests ? since there is no IngestionTime and timers are fired immediately so the timers actions (such as state cleanup) are fired before time and causing the tests to fail .




On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

do you use processing time timer (timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <[hidden email]> wrote:
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

avilevi
Thanks, I'll check it out. I got a bit confused with the Ingesting time equals to null in tests but all is ok now , I appreciate that

On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

Just to verify your ITCase, I wrote the following dummy example and it seems to be "working" (ie. I can see non null timestamps and timers firing).


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);

env
.addSource(new LongSource())
.keyBy(elmnt -> elmnt)
.process(new KeyedProcessFunction<Long, Long, Long>() {

@Override
public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {


long timestamp = ctx.timestamp();
long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();

System.out.println(ctx.timestamp() + " " + timerTimestamp);

ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
}
}).print();
env.execute();
The source is:

private static final class LongSource implements SourceFunction<Long> {

private volatile boolean running = true;

private long element = 0L;

@Override
public void run(SourceContext<java.lang.Long> ctx) throws Exception {
while (running) {
ctx.collect(element++ % 10);
Thread.sleep(10L);
}
}

@Override
public void cancel() {

}
}

Could you provide more details on how your usecase differs from the above dummy example so that we can pin down the problem?

As a side-note, Ingestion time is essentially event time, with the only difference that the timestamp assigner in the beginning gives each element
the timestamp System.currentTimeMillis. So in this case, maybe you could also consider setting event time timers but keep in mind then your 
Watermark emission interval.

In addition, if you want to simply check processing time processing of you operator (not the whole pipeline), then you could make use of the 
OneInputStreamTaskTestHarness or its keyed variant. This allows you to provide your own processing time provider thus allow you to deterministically
test processing time behaviour.

Cheers,
Kostas



On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <[hidden email]> wrote:
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <[hidden email]> wrote:
Hi Andrey,
I am testing a Filter operator that receives a key from the stream and checks if it is a new one or not. if it is new it keeps it in state and fire a timer all that is done using the ProcessFunction.
The testing is using some CollectSink as described here and the source is implementation of the SourceFunction that accepts a collection of values and adds it to ctx.collect .
The ctx.timestamp() is null, BUT even if I set the timer to sometime in the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + x) the timer is fired immediately.


On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

what is the structure of your unit test? do you create some source and then apply function or you test only ProcessFunction methods in isolation?
does ctx.timestamp() return zero or which value?

Best,
Andrey


On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <[hidden email]> wrote:
Hi Andrey ,
I'm using IngestionTime 
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

This is my timer in the processElement:
   val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
   ctx.timerService.registerProcessingTimeTimer(nextTim)

The problem is how do I use it in my unit tests ? since there is no IngestionTime and timers are fired immediately so the timers actions (such as state cleanup) are fired before time and causing the tests to fail .




On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

do you use processing time timer (timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <[hidden email]> wrote:
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Reply | Threaded
Open this post in threaded view
|

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Kostas Kloudas-2
Hi Avi,

Good to hear that!

Cheers,
Kostas

On Mon, Mar 25, 2019 at 3:37 PM Avi Levi <[hidden email]> wrote:
Thanks, I'll check it out. I got a bit confused with the Ingesting time equals to null in tests but all is ok now , I appreciate that

On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

Just to verify your ITCase, I wrote the following dummy example and it seems to be "working" (ie. I can see non null timestamps and timers firing).


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);

env
.addSource(new LongSource())
.keyBy(elmnt -> elmnt)
.process(new KeyedProcessFunction<Long, Long, Long>() {

@Override
public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {


long timestamp = ctx.timestamp();
long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();

System.out.println(ctx.timestamp() + " " + timerTimestamp);

ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
}
}).print();
env.execute();
The source is:

private static final class LongSource implements SourceFunction<Long> {

private volatile boolean running = true;

private long element = 0L;

@Override
public void run(SourceContext<java.lang.Long> ctx) throws Exception {
while (running) {
ctx.collect(element++ % 10);
Thread.sleep(10L);
}
}

@Override
public void cancel() {

}
}

Could you provide more details on how your usecase differs from the above dummy example so that we can pin down the problem?

As a side-note, Ingestion time is essentially event time, with the only difference that the timestamp assigner in the beginning gives each element
the timestamp System.currentTimeMillis. So in this case, maybe you could also consider setting event time timers but keep in mind then your 
Watermark emission interval.

In addition, if you want to simply check processing time processing of you operator (not the whole pipeline), then you could make use of the 
OneInputStreamTaskTestHarness or its keyed variant. This allows you to provide your own processing time provider thus allow you to deterministically
test processing time behaviour.

Cheers,
Kostas



On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <[hidden email]> wrote:
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <[hidden email]> wrote:
Hi Andrey,
I am testing a Filter operator that receives a key from the stream and checks if it is a new one or not. if it is new it keeps it in state and fire a timer all that is done using the ProcessFunction.
The testing is using some CollectSink as described here and the source is implementation of the SourceFunction that accepts a collection of values and adds it to ctx.collect .
The ctx.timestamp() is null, BUT even if I set the timer to sometime in the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + x) the timer is fired immediately.


On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

what is the structure of your unit test? do you create some source and then apply function or you test only ProcessFunction methods in isolation?
does ctx.timestamp() return zero or which value?

Best,
Andrey


On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <[hidden email]> wrote:
Hi Andrey ,
I'm using IngestionTime 
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

This is my timer in the processElement:
   val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
   ctx.timerService.registerProcessingTimeTimer(nextTim)

The problem is how do I use it in my unit tests ? since there is no IngestionTime and timers are fired immediately so the timers actions (such as state cleanup) are fired before time and causing the tests to fail .




On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Avi,

do you use processing time timer (timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <[hidden email]> wrote:
Hi, 
Our stream is not based on time sequence and we do not use time based operations. we do want to clean the state after x days hence we fire timer event. My problem is that our unit test fires the event immediately (there is no ingestion time) how can I inject ingestion time ?

Cheers
Avi