http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-use-Lo-level-Joins-API-tp29218p29219.html
Hi Yuta,
Have you set a default value for the state ? If the state did not have a default value and the records from stream2 comes first for a specific key, then the state would never be set with a value, thus the return value will be null.
Best,
Yun
------------------------------------------------------------------
Send Time:2019 Aug. 7 (Wed.) 08:56
Subject:How to use Lo-level Joins API
Hi
I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from
both streams, but I can't.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that
v1 in processElement2 is always null.
---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
public class MyCPF extends CoProcessFunction{
ValueState data;
processElement1(v1){
data.update(v1);
}
processElement2(v2){
v1 = data.value() // v1 is always null
out.collect(v1 + v2)
}
open(){
data = getRuntimeContext().getState(descriptor);
}
}
---
Can you tell me the collect way of the low-level joins and send me a
sample code if you have?
--
Thank you
Yuta