Re: How to use Lo-level Joins API

Posted by Yuta Morisawa on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-use-Lo-level-Joins-API-tp29218p29223.html

Hi Victor

 > Is it possible stream1 and stream2 don't have common keys?  You may
verify this by logging out the key of current processed element.

I misunderstood the usage.
I thought stream1 and 2 have different contexts and they can access both
state stores each other.
But actually, processElement1 and 2 must have the same key to access the
same state store.

Now I can join two streams successfully.
Thank you your comment.

Best,
Yuta



On 2019/08/07 13:49, Victor Wong wrote:

> Hi Yuta,
>
>> I made sure the 'ValueState data' has data from stream1 with the IDE
> debugger but in spite of that, processElement2 can't access it.
>
> Since `processElement1` and `processElement2`  use the same `Context`, I think there is no state access issue.
> Is it possible stream1 and stream2 don't have common keys?  You may verify this by logging out the key of current processed element.
>
> Best,
> Victor
>
> On 2019/8/7, 10:56 AM, "Yuta Morisawa" <[hidden email]> wrote:
>
>      Hi Yun
>      
>      Thank you for replying.
>       >        Have you set a default value for the state ?
>      Actually, the constructor of the ValueStateDescriptor with default value
>      is deprecated so I don't set it.
>      
>      The problem occurs when the stream1 comes first.
>      I made sure the 'ValueState data' has data from stream1 with the IDE
>      debugger but in spite of that, processElement2 can't access it.
>      
>      On 2019/08/07 11:43, Yun Gao wrote:
>      > Hi Yuta,
>      >        Have you set a default value for the state ? If the state did not
>      > have a default value and the records from stream2 comes first for a
>      > specific key, then the state would never be set with a value, thus the
>      > return value will be null.
>      >
>      > Best,
>      > Yun
>      >
>      >
>      >     ------------------------------------------------------------------
>      >     From:Yuta Morisawa <[hidden email]>
>      >     Send Time:2019 Aug. 7 (Wed.) 08:56
>      >     To:user <[hidden email]>
>      >     Subject:How to use Lo-level Joins API
>      >
>      >     Hi
>      >
>      >     I am trying to use low-level joins.
>      >     According to the doc, the way is creating a state and access it from
>      >     both streams, but I can't.
>      >
>      >     https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
>      >
>      >     This is a snippet of my code.
>      >     It seems that the processElement1,2 have different ValueStates so that
>      >     v1 in processElement2 is always null.
>      >
>      >     ---
>      >     stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
>      >
>      >     public class MyCPF extends CoProcessFunction{
>      >        ValueState data;
>      >
>      >        processElement1(v1){
>      >          data.update(v1);
>      >        }
>      >
>      >        processElement2(v2){
>      >          v1 = data.value() // v1 is always null
>      >          out.collect(v1 + v2)
>      >        }
>      >
>      >        open(){
>      >          data = getRuntimeContext().getState(descriptor);
>      >        }
>      >
>      >     }
>      >     ---
>      >
>      >     Can you tell me the collect way of the low-level joins and send me a
>      >     sample code if you have?
>      >
>      >     --
>      >     Thank you
>      >     Yuta
>      >
>      >
>      
>      --
>      --------------------------------------------
>      <KDDI総合研究所 ビジョン>
>      Challenge for the future 豊かな未来への挑戦
>      --------------------------------------------
>      Tomorrow, Together  KDDI
>      ---------------------------------------------
>         〒356-8502
>       埼玉県ふじみ野市大原2丁目1番15号
>       株式会社 KDDI総合研究所(KDDI Research, Inc.)
>       コネクティッドカー1G
>       森澤 雄太
>       mail [hidden email]
>       tel  070-3871-8883
>      ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>       この電子メールおよび添付書類は、名宛人のための
>       特別な秘密情報を含んでおります。
>       そのため、名宛人以外の方による利用は認められて
>       おりません。
>       名宛人以外の方による通信内容公表、複写、転用等
>       は厳禁であり、違法となることがあります。
>       万が一、何らかの誤りによりこの電子メールを名宛
>       人以外の方が受信された場合は、お手数でも、直ち
>       に発信人にお知らせ頂くと同時に、当メールを削除
>       下さいますようお願い申し上げます。
>      
>      
>

--
--------------------------------------------
<KDDI総合研究所 ビジョン>
Challenge for the future 豊かな未来への挑戦
--------------------------------------------
Tomorrow, Together  KDDI
---------------------------------------------
   〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail [hidden email]
 tel  070-3871-8883
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。