Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

sundy


Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}

Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

Xingcan Cui
Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}


Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

sundy
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some problem.



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function.




On 3 Mar 2018, at 12:47, Xingcan Cui <[hidden email]> wrote:

Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}



Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

Hequn Cheng
Hi sundy,

1. Some partition of your input kafka don't have data. Since window watermark is the min value of all it's inputs, if there are no data from one of it's inputs, window will never be triggered. You can set parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but worth a try).
2. Only one record in the input. In this case, window can not be triggered either. You might think of it like the time has be stopped. To trigger the widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <[hidden email]>:
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some problem.



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function.




On 3 Mar 2018, at 12:47, Xingcan Cui <[hidden email]> wrote:

Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}




Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

sundy
Hi Hequn Cheng,

Finally I got the problem and find the way to define the correct WaterMark by your advice, thank you very much.

The problem is that I set the watermark to the 

    waterMark =  maxEventTime - lag 

And the timeWindow is 10Seconds,  But I generated the test records too quickly so the 10000 records are all in the window duration(my bad). 
So flink are waiting for new more numbers to close the window.  

Another one question is why I set   'env.setParallelism(1)’ and run the code in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different threads?
Which time is  the getWaterMark function called? After the keyBy operation or after the source operation?



On 3 Mar 2018, at 15:28, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

The default parallelism is 4. It seems that your configuration does not take effect. You can try 'env.setParallelism(1)' to set the job parallelism.
For watermark, you can refer to [1] and [2].

PS: Others can not see your reply because you only reply to me. Try reply to all so that others can help you too :-) 


On Sat, Mar 3, 2018 at 3:03 PM, sundy <[hidden email]> wrote:

Hi Hequn Cheng,

Thanks for you advice, I think I found the problem. But I didn't know why.

Firstly, let me introduce my operations,  I run the code in IDEA with MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka topic ‘stream_test' has  one partition. Then send 10000 records with current timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field is the evenTime.

I add this debug code to print the thread-id.

<PastedGraphic-1.png>


The result is it will print in 4 threads int each period duration(3 seconds).Such as

50:water mark:0
52:water mark:0
51:water mark:1520056427871
49:water mark:0

So this results to the watermark 0. But why it happened in 1 parallelism? 
Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to know how to set the watermark in the right way.


  
 

On 3 Mar 2018, at 13:52, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

1. Some partition of your input kafka don't have data. Since window watermark is the min value of all it's inputs, if there are no data from one of it's inputs, window will never be triggered. You can set parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but worth a try).
2. Only one record in the input. In this case, window can not be triggered either. You might think of it like the time has be stopped. To trigger the widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <[hidden email]>:
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some problem.



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function.




On 3 Mar 2018, at 12:47, Xingcan Cui <[hidden email]> wrote:

Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}







Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

Hequn Cheng
Hi sundy,

It is strange that your configuration does not take effect. Do you set parallelism somewhere else? Maybe, you can refer to the kafka test case[1]. In this test case, line 229 set parallelism to 1 and works fine.
Hope it helps you.


On Sat, Mar 3, 2018 at 4:02 PM, sundy <[hidden email]> wrote:
Hi Hequn Cheng,

Finally I got the problem and find the way to define the correct WaterMark by your advice, thank you very much.

The problem is that I set the watermark to the 

    waterMark =  maxEventTime - lag 

And the timeWindow is 10Seconds,  But I generated the test records too quickly so the 10000 records are all in the window duration(my bad). 
So flink are waiting for new more numbers to close the window.  

Another one question is why I set   'env.setParallelism(1)’ and run the code in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different threads?
Which time is  the getWaterMark function called? After the keyBy operation or after the source operation?



On 3 Mar 2018, at 15:28, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

The default parallelism is 4. It seems that your configuration does not take effect. You can try 'env.setParallelism(1)' to set the job parallelism.
For watermark, you can refer to [1] and [2].

PS: Others can not see your reply because you only reply to me. Try reply to all so that others can help you too :-) 


On Sat, Mar 3, 2018 at 3:03 PM, sundy <[hidden email]> wrote:

Hi Hequn Cheng,

Thanks for you advice, I think I found the problem. But I didn't know why.

Firstly, let me introduce my operations,  I run the code in IDEA with MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka topic ‘stream_test' has  one partition. Then send 10000 records with current timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field is the evenTime.

I add this debug code to print the thread-id.

<PastedGraphic-1.png>


The result is it will print in 4 threads int each period duration(3 seconds).Such as

50:water mark:0
52:water mark:0
51:water mark:1520056427871
49:water mark:0

So this results to the watermark 0. But why it happened in 1 parallelism? 
Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to know how to set the watermark in the right way.


  
 

On 3 Mar 2018, at 13:52, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

1. Some partition of your input kafka don't have data. Since window watermark is the min value of all it's inputs, if there are no data from one of it's inputs, window will never be triggered. You can set parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but worth a try).
2. Only one record in the input. In this case, window can not be triggered either. You might think of it like the time has be stopped. To trigger the widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <[hidden email]>:
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some problem.



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function.




On 3 Mar 2018, at 12:47, Xingcan Cui <[hidden email]> wrote:

Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}








Reply | Threaded
Open this post in threaded view
|

Re: Watermark not worked as expected in TimeCharacteristic.EvenTime setting.

sundy
In reply to this post by sundy
Thanks a lot, use env.setParallelism(1) before the source  define works (I set it before the env.execute, so it did not take effect).
 
On 3 Mar 2018, at 16:02, sundy <[hidden email]> wrote:

Hi Hequn Cheng,

Finally I got the problem and find the way to define the correct WaterMark by your advice, thank you very much.

The problem is that I set the watermark to the 

    waterMark =  maxEventTime - lag 

And the timeWindow is 10Seconds,  But I generated the test records too quickly so the 10000 records are all in the window duration(my bad). 
So flink are waiting for new more numbers to close the window.  

Another one question is why I set   'env.setParallelism(1)’ and run the code in IDEA(mini Flink cluster) , but the getWatermark is called in 4 different threads?
Which time is  the getWaterMark function called? After the keyBy operation or after the source operation?



On 3 Mar 2018, at 15:28, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

The default parallelism is 4. It seems that your configuration does not take effect. You can try 'env.setParallelism(1)' to set the job parallelism.
For watermark, you can refer to [1] and [2].

PS: Others can not see your reply because you only reply to me. Try reply to all so that others can help you too :-) 


On Sat, Mar 3, 2018 at 3:03 PM, sundy <[hidden email]> wrote:

Hi Hequn Cheng,

Thanks for you advice, I think I found the problem. But I didn't know why.

Firstly, let me introduce my operations,  I run the code in IDEA with MinFlinkCluster,  and then I set the env parallelism to 1, and the kafka topic ‘stream_test' has  one partition. Then send 10000 records with current timestamp  the kafka , format is  ‘4281_783_1520059217832’ , the last field is the evenTime.

I add this debug code to print the thread-id.

<PastedGraphic-1.png>


The result is it will print in 4 threads int each period duration(3 seconds).Such as

50:water mark:0
52:water mark:0
51:water mark:1520056427871
49:water mark:0

So this results to the watermark 0. But why it happened in 1 parallelism? 
Maybe it was caused by the keyBy operation?  I am new to Flink,  I hope to know how to set the watermark in the right way.


  
 

On 3 Mar 2018, at 13:52, Hequn Cheng <[hidden email]> wrote:

Hi sundy,

1. Some partition of your input kafka don't have data. Since window watermark is the min value of all it's inputs, if there are no data from one of it's inputs, window will never be triggered. You can set parallelism of your job to 1 to avoid this problem(PS: Maybe this bug is fixed now, but worth a try).
2. Only one record in the input. In this case, window can not be triggered either. You might think of it like the time has be stopped. To trigger the widow, you should read more data with watermark bigger than the window end.

Hope it helps you.
Best, Hequn

2018-03-03 13:06 GMT+08:00 sundy <[hidden email]>:
Hi, thanks for your reply. 

I have searched it in stackoverflow, and there is someone who has the some problem.



From your advice, I tried the code. 

 env.getConfig().setAutoWatermarkInterval(3 * 1000);

And it calls the getCurrentWaterMark function each 3 seconds,  but still no result come out.
From the outputs   ('water mark1520049229163'), I could see that the add method is called. But the no result from the sink function.




On 3 Mar 2018, at 12:47, Xingcan Cui <[hidden email]> wrote:

Hi,

for periodically generated watermarks, you should use `ExecutionConfig.setAutoWatermarkInterval()` to set an interval.

Hope that helps.

Best,
Xingcan

On 3 Mar 2018, at 12:08 PM, sundy <[hidden email]> wrote:



Hi, I got a problem in Flink  and need your help.

I tried to use TimeCharacteristic.EvenTime, but the sink function never be executed.  

public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper jsonMapper = new ObjectMapper();

Properties properties = new Properties();
// String brokers = "172.27.138.8:9092";
String brokers = "localhost:9092";
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("group.id", "test_fink");
String topic = "stream_test";

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<BitRate> myConsumer =
new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);

DataStream<BitRate> stream = env.addSource(myConsumer)
.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<BitRate>
reduceItems =
stream
.keyBy(a -> a.gameId)
.timeWindow(Time.seconds(10))
.reduce((a, b) -> a.add(b));

reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
try {
tuple.end();
System.out.println(tuple.rate + "\t" + tuple.user);
return jsonMapper.writeValueAsBytes(tuple);
} catch (JsonProcessingException e) {
e.printStackTrace();
return "".getBytes();
}
}));

env.execute("Flink Streaming Java API Skeleton");
}

}


Here is the CustomWatermarkEmitter. I tried to increase the lag number,  but not worked.

public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {

private long currentMax = 0;
private long lag = 3600 * 1000; //not worked ,even though the lag is very big

@Nullable
@Override
public Watermark getCurrentWatermark() {
long atLeastTime = currentMax - lag;
System.out.println("water mark" + atLeastTime);
return new Watermark(atLeastTime < 0 ? 0 : atLeastTime);
}

@Override
public long extractTimestamp(BitRate bitRate, long l) {
currentMax = Long.max(bitRate.eventTime, currentMax);
return bitRate.eventTime;
}
}


Here is the entity BitRate, the logs are generated in time , sample log   `4281_783_1520047769115`


public BitRate(long eventTime, long gameId, long rate, long user) {
this.eventTime = eventTime;

this.gameId = gameId;
this.rate = rate;
this.user = user;
this.startTs = System.currentTimeMillis();
this.type = 0;
}

public void end() {
this.endTs = System.currentTimeMillis();
}

public BitRate add(BitRate b) {
System.out.println("Add:" + b.rate);
this.rate += b.rate;
this.user += b.user;
return this;
}