How to use Lo-level Joins API

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use Lo-level Joins API

Yuta Morisawa
Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from
both streams, but I can't.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that
v1 in processElement2 is always null.

---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
  ValueState data;

  processElement1(v1){
    data.update(v1);
  }

  processElement2(v2){
    v1 = data.value() // v1 is always null
    out.collect(v1 + v2)
  }

  open(){
    data = getRuntimeContext().getState(descriptor);
  }

}
---

Can you tell me the collect way of the low-level joins and send me a
sample code if you have?

--
Thank you
Yuta

Reply | Threaded
Open this post in threaded view
|

Re: How to use Lo-level Joins API

Yun Gao
Hi Yuta,
    
      Have you set a default value for the state ? If the state did not have a default value and the records from stream2 comes first for a specific key, then the state would never be set with a value, thus the return value will be null.

Best,
Yun


------------------------------------------------------------------
From:Yuta Morisawa <[hidden email]>
Send Time:2019 Aug. 7 (Wed.) 08:56
To:user <[hidden email]>
Subject:How to use Lo-level Joins API

Hi

I am trying to use low-level joins.
According to the doc, the way is creating a state and access it from 
both streams, but I can't.

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html

This is a snippet of my code.
It seems that the processElement1,2 have different ValueStates so that 
v1 in processElement2 is always null.

---
stream1.connect(stream2).keyBy(0,0).process(new MyCPF());

public class MyCPF extends CoProcessFunction{
  ValueState data;

  processElement1(v1){
    data.update(v1);
  }

  processElement2(v2){
    v1 = data.value() // v1 is always null
    out.collect(v1 + v2)
  }

  open(){
    data = getRuntimeContext().getState(descriptor);
  }

}
---

Can you tell me the collect way of the low-level joins and send me a 
sample code if you have?

--
Thank you
Yuta

Reply | Threaded
Open this post in threaded view
|

Re: How to use Lo-level Joins API

Yuta Morisawa
Hi Yun

Thank you for replying.
 >        Have you set a default value for the state ?
Actually, the constructor of the ValueStateDescriptor with default value
is deprecated so I don't set it.

The problem occurs when the stream1 comes first.
I made sure the 'ValueState data' has data from stream1 with the IDE
debugger but in spite of that, processElement2 can't access it.

On 2019/08/07 11:43, Yun Gao wrote:

> Hi Yuta,
>        Have you set a default value for the state ? If the state did not
> have a default value and the records from stream2 comes first for a
> specific key, then the state would never be set with a value, thus the
> return value will be null.
>
> Best,
> Yun
>
>
>     ------------------------------------------------------------------
>     From:Yuta Morisawa <[hidden email]>
>     Send Time:2019 Aug. 7 (Wed.) 08:56
>     To:user <[hidden email]>
>     Subject:How to use Lo-level Joins API
>
>     Hi
>
>     I am trying to use low-level joins.
>     According to the doc, the way is creating a state and access it from
>     both streams, but I can't.
>
>     https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
>
>     This is a snippet of my code.
>     It seems that the processElement1,2 have different ValueStates so that
>     v1 in processElement2 is always null.
>
>     ---
>     stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
>
>     public class MyCPF extends CoProcessFunction{
>        ValueState data;
>
>        processElement1(v1){
>          data.update(v1);
>        }
>
>        processElement2(v2){
>          v1 = data.value() // v1 is always null
>          out.collect(v1 + v2)
>        }
>
>        open(){
>          data = getRuntimeContext().getState(descriptor);
>        }
>
>     }
>     ---
>
>     Can you tell me the collect way of the low-level joins and send me a
>     sample code if you have?
>
>     --
>     Thank you
>     Yuta
>
>

--
--------------------------------------------
<KDDI総合研究所 ビジョン>
Challenge for the future 豊かな未来への挑戦
--------------------------------------------
Tomorrow, Together  KDDI
---------------------------------------------
   〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail [hidden email]
 tel  070-3871-8883
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。

Reply | Threaded
Open this post in threaded view
|

Re: How to use Lo-level Joins API

Victor Wong
Hi Yuta,

> I made sure the 'ValueState data' has data from stream1 with the IDE
debugger but in spite of that, processElement2 can't access it.

Since `processElement1` and `processElement2`  use the same `Context`, I think there is no state access issue.
Is it possible stream1 and stream2 don't have common keys?  You may verify this by logging out the key of current processed element.

Best,
Victor

On 2019/8/7, 10:56 AM, "Yuta Morisawa" <[hidden email]> wrote:

    Hi Yun
   
    Thank you for replying.
     >        Have you set a default value for the state ?
    Actually, the constructor of the ValueStateDescriptor with default value
    is deprecated so I don't set it.
   
    The problem occurs when the stream1 comes first.
    I made sure the 'ValueState data' has data from stream1 with the IDE
    debugger but in spite of that, processElement2 can't access it.
   
    On 2019/08/07 11:43, Yun Gao wrote:
    > Hi Yuta,
    >        Have you set a default value for the state ? If the state did not
    > have a default value and the records from stream2 comes first for a
    > specific key, then the state would never be set with a value, thus the
    > return value will be null.
    >
    > Best,
    > Yun
    >
    >
    >     ------------------------------------------------------------------
    >     From:Yuta Morisawa <[hidden email]>
    >     Send Time:2019 Aug. 7 (Wed.) 08:56
    >     To:user <[hidden email]>
    >     Subject:How to use Lo-level Joins API
    >
    >     Hi
    >
    >     I am trying to use low-level joins.
    >     According to the doc, the way is creating a state and access it from
    >     both streams, but I can't.
    >
    >     https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
    >
    >     This is a snippet of my code.
    >     It seems that the processElement1,2 have different ValueStates so that
    >     v1 in processElement2 is always null.
    >
    >     ---
    >     stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
    >
    >     public class MyCPF extends CoProcessFunction{
    >        ValueState data;
    >
    >        processElement1(v1){
    >          data.update(v1);
    >        }
    >
    >        processElement2(v2){
    >          v1 = data.value() // v1 is always null
    >          out.collect(v1 + v2)
    >        }
    >
    >        open(){
    >          data = getRuntimeContext().getState(descriptor);
    >        }
    >
    >     }
    >     ---
    >
    >     Can you tell me the collect way of the low-level joins and send me a
    >     sample code if you have?
    >
    >     --
    >     Thank you
    >     Yuta
    >
    >
   
    --
    --------------------------------------------
    <KDDI総合研究所 ビジョン>
    Challenge for the future 豊かな未来への挑戦
    --------------------------------------------
    Tomorrow, Together  KDDI
    ---------------------------------------------
       〒356-8502
     埼玉県ふじみ野市大原2丁目1番15号
     株式会社 KDDI総合研究所(KDDI Research, Inc.)
     コネクティッドカー1G
     森澤 雄太
     mail [hidden email]
     tel  070-3871-8883
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
     この電子メールおよび添付書類は、名宛人のための
     特別な秘密情報を含んでおります。
     そのため、名宛人以外の方による利用は認められて
     おりません。
     名宛人以外の方による通信内容公表、複写、転用等
     は厳禁であり、違法となることがあります。
     万が一、何らかの誤りによりこの電子メールを名宛
     人以外の方が受信された場合は、お手数でも、直ち
     に発信人にお知らせ頂くと同時に、当メールを削除
     下さいますようお願い申し上げます。
   
   

Reply | Threaded
Open this post in threaded view
|

Re: How to use Lo-level Joins API

Yuta Morisawa
Hi Victor

 > Is it possible stream1 and stream2 don't have common keys?  You may
verify this by logging out the key of current processed element.

I misunderstood the usage.
I thought stream1 and 2 have different contexts and they can access both
state stores each other.
But actually, processElement1 and 2 must have the same key to access the
same state store.

Now I can join two streams successfully.
Thank you your comment.

Best,
Yuta



On 2019/08/07 13:49, Victor Wong wrote:

> Hi Yuta,
>
>> I made sure the 'ValueState data' has data from stream1 with the IDE
> debugger but in spite of that, processElement2 can't access it.
>
> Since `processElement1` and `processElement2`  use the same `Context`, I think there is no state access issue.
> Is it possible stream1 and stream2 don't have common keys?  You may verify this by logging out the key of current processed element.
>
> Best,
> Victor
>
> On 2019/8/7, 10:56 AM, "Yuta Morisawa" <[hidden email]> wrote:
>
>      Hi Yun
>      
>      Thank you for replying.
>       >        Have you set a default value for the state ?
>      Actually, the constructor of the ValueStateDescriptor with default value
>      is deprecated so I don't set it.
>      
>      The problem occurs when the stream1 comes first.
>      I made sure the 'ValueState data' has data from stream1 with the IDE
>      debugger but in spite of that, processElement2 can't access it.
>      
>      On 2019/08/07 11:43, Yun Gao wrote:
>      > Hi Yuta,
>      >        Have you set a default value for the state ? If the state did not
>      > have a default value and the records from stream2 comes first for a
>      > specific key, then the state would never be set with a value, thus the
>      > return value will be null.
>      >
>      > Best,
>      > Yun
>      >
>      >
>      >     ------------------------------------------------------------------
>      >     From:Yuta Morisawa <[hidden email]>
>      >     Send Time:2019 Aug. 7 (Wed.) 08:56
>      >     To:user <[hidden email]>
>      >     Subject:How to use Lo-level Joins API
>      >
>      >     Hi
>      >
>      >     I am trying to use low-level joins.
>      >     According to the doc, the way is creating a state and access it from
>      >     both streams, but I can't.
>      >
>      >     https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html
>      >
>      >     This is a snippet of my code.
>      >     It seems that the processElement1,2 have different ValueStates so that
>      >     v1 in processElement2 is always null.
>      >
>      >     ---
>      >     stream1.connect(stream2).keyBy(0,0).process(new MyCPF());
>      >
>      >     public class MyCPF extends CoProcessFunction{
>      >        ValueState data;
>      >
>      >        processElement1(v1){
>      >          data.update(v1);
>      >        }
>      >
>      >        processElement2(v2){
>      >          v1 = data.value() // v1 is always null
>      >          out.collect(v1 + v2)
>      >        }
>      >
>      >        open(){
>      >          data = getRuntimeContext().getState(descriptor);
>      >        }
>      >
>      >     }
>      >     ---
>      >
>      >     Can you tell me the collect way of the low-level joins and send me a
>      >     sample code if you have?
>      >
>      >     --
>      >     Thank you
>      >     Yuta
>      >
>      >
>      
>      --
>      --------------------------------------------
>      <KDDI総合研究所 ビジョン>
>      Challenge for the future 豊かな未来への挑戦
>      --------------------------------------------
>      Tomorrow, Together  KDDI
>      ---------------------------------------------
>         〒356-8502
>       埼玉県ふじみ野市大原2丁目1番15号
>       株式会社 KDDI総合研究所(KDDI Research, Inc.)
>       コネクティッドカー1G
>       森澤 雄太
>       mail [hidden email]
>       tel  070-3871-8883
>      ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>       この電子メールおよび添付書類は、名宛人のための
>       特別な秘密情報を含んでおります。
>       そのため、名宛人以外の方による利用は認められて
>       おりません。
>       名宛人以外の方による通信内容公表、複写、転用等
>       は厳禁であり、違法となることがあります。
>       万が一、何らかの誤りによりこの電子メールを名宛
>       人以外の方が受信された場合は、お手数でも、直ち
>       に発信人にお知らせ頂くと同時に、当メールを削除
>       下さいますようお願い申し上げます。
>      
>      
>

--
--------------------------------------------
<KDDI総合研究所 ビジョン>
Challenge for the future 豊かな未来への挑戦
--------------------------------------------
Tomorrow, Together  KDDI
---------------------------------------------
   〒356-8502
 埼玉県ふじみ野市大原2丁目1番15号
 株式会社 KDDI総合研究所(KDDI Research, Inc.)
 コネクティッドカー1G
 森澤 雄太
 mail [hidden email]
 tel  070-3871-8883
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 この電子メールおよび添付書類は、名宛人のための
 特別な秘密情報を含んでおります。
 そのため、名宛人以外の方による利用は認められて
 おりません。
 名宛人以外の方による通信内容公表、複写、転用等
 は厳禁であり、違法となることがあります。
 万が一、何らかの誤りによりこの電子メールを名宛
 人以外の方が受信された場合は、お手数でも、直ち
 に発信人にお知らせ頂くと同時に、当メールを削除
 下さいますようお願い申し上げます。