Flink SQL : Interval Outer/Left Join not working as expected

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL : Interval Outer/Left Join not working as expected

Aneesha Kaushal-2
Hi, 

I am doing a simple POC using Flink SQL and I am facing some issues with Interval Join. 

Use Case: I have two Kafka streams and using Flink SQL interval join I want to remove rows from stream 1(abandoned_user_visits) that are present in stream 2(orders) within some time interval.

Data:
1) Abandoned user visits. Sample data:
{"key1": "123", "email": "[hidden email]", "abandoned_pids": [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
{"key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 11:45:00.208"}
{"
key1": "123", "email": "[hidden email]", "abandoned_pids": [1754171520], "ts": "2021-03-18 12:00:00.208"}
{"
key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 12:45:00.208"}

2) User order stream
{"key1": "234", "email": "[hidden email]", "pids": [1754171520], "ts": "2021-03-18 11:55:00.208"}
{"key1": "123", "email": "[hidden email]", "pids": [674378611, 1754171520], "ts": "2021-03-18 12:10:00.208"}

When I try to push the above records to Kafka and select from the below VIEW. I get the result that is actually an INNER join(not OUTER join). 
I even tried posting just one record to stream(1) and no record to stream (2), expecting that that record should be emitted. But nothing was emitted. What was interesting is when I use the processing time instead of event time, I get the results as expected.   

Tables and Views used: 
CREATE TABLE abandoned_visits (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , abandoned_pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
  'connector' = 'kafka',
  'topic' = 'abandoned-visits', 
  'properties.bootstrap.servers' = '...', 
  'format' = 'json'
);

 
CREATE TABLE orders (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = '...',
    'format'    = 'json'
);


CREATE VIEW abandoned_visits_with_no_orders AS
  SELECT
      av.key1
    , av.email
    , av.abandoned_pids
    , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
    , FLOOR(o.ts TO MINUTE)     AS order_timestamp
    , o.email                   AS order_email
  FROM abandoned_visits av
  FULL OUTER JOIN orders o
  ON  av.key1 = o.key1
  AND av.email = o.email
  AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' MINUTE
--  WHERE
--    o.email IS NULL                                  // Commented this out so as to get something in result
;
  
Result: 
select * from abandoned_visits_with_no_orders;  

This gives a result the same as an inner join. It doesn't have rows with NULL order data.
I would appreciate any help. 

Thanks,
Aneesha

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL : Interval Outer/Left Join not working as expected

Benchao Li-2
Hi Aneesha,

For the interval join operator will output the data with NULL when it confirms that
there will no data coming before the watermark.
And there is an optimization for reducing state access, which may add more time 
to trigger the output of these data.
For your case, it's almost 30 + 30 = 60 min. Did you wait that long to check the output?

Aneesha Kaushal <[hidden email]> 于2021年3月18日周四 下午11:29写道:
Hi, 

I am doing a simple POC using Flink SQL and I am facing some issues with Interval Join. 

Use Case: I have two Kafka streams and using Flink SQL interval join I want to remove rows from stream 1(abandoned_user_visits) that are present in stream 2(orders) within some time interval.

Data:
1) Abandoned user visits. Sample data:
{"key1": "123", "email": "[hidden email]", "abandoned_pids": [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
{"key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 11:45:00.208"}
{"
key1": "123", "email": "[hidden email]", "abandoned_pids": [1754171520], "ts": "2021-03-18 12:00:00.208"}
{"
key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 12:45:00.208"}

2) User order stream
{"key1": "234", "email": "[hidden email]", "pids": [1754171520], "ts": "2021-03-18 11:55:00.208"}
{"key1": "123", "email": "[hidden email]", "pids": [674378611, 1754171520], "ts": "2021-03-18 12:10:00.208"}

When I try to push the above records to Kafka and select from the below VIEW. I get the result that is actually an INNER join(not OUTER join). 
I even tried posting just one record to stream(1) and no record to stream (2), expecting that that record should be emitted. But nothing was emitted. What was interesting is when I use the processing time instead of event time, I get the results as expected.   

Tables and Views used: 
CREATE TABLE abandoned_visits (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , abandoned_pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
  'connector' = 'kafka',
  'topic' = 'abandoned-visits', 
  'properties.bootstrap.servers' = '...', 
  'format' = 'json'
);

 
CREATE TABLE orders (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = '...',
    'format'    = 'json'
);


CREATE VIEW abandoned_visits_with_no_orders AS
  SELECT
      av.key1
    , av.email
    , av.abandoned_pids
    , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
    , FLOOR(o.ts TO MINUTE)     AS order_timestamp
    , o.email                   AS order_email
  FROM abandoned_visits av
  FULL OUTER JOIN orders o
  ON  av.key1 = o.key1
  AND av.email = o.email
  AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' MINUTE
--  WHERE
--    o.email IS NULL                                  // Commented this out so as to get something in result
;
  
Result: 
select * from abandoned_visits_with_no_orders;  

This gives a result the same as an inner join. It doesn't have rows with NULL order data.
I would appreciate any help. 

Thanks,
Aneesha



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL : Interval Outer/Left Join not working as expected

Aneesha Kaushal-2
Hello Benchao, 

Thanks for clarifying!
The issue was I send very few records so I could not test how the watermark is progressing. Today after trying on continuous stream I was able to get the results. 


On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <[hidden email]> wrote:
Hi Aneesha,

For the interval join operator will output the data with NULL when it confirms that
there will no data coming before the watermark.
And there is an optimization for reducing state access, which may add more time 
to trigger the output of these data.
For your case, it's almost 30 + 30 = 60 min. Did you wait that long to check the output?

Aneesha Kaushal <[hidden email]> 于2021年3月18日周四 下午11:29写道:
Hi, 

I am doing a simple POC using Flink SQL and I am facing some issues with Interval Join. 

Use Case: I have two Kafka streams and using Flink SQL interval join I want to remove rows from stream 1(abandoned_user_visits) that are present in stream 2(orders) within some time interval.

Data:
1) Abandoned user visits. Sample data:
{"key1": "123", "email": "[hidden email]", "abandoned_pids": [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
{"key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 11:45:00.208"}
{"
key1": "123", "email": "[hidden email]", "abandoned_pids": [1754171520], "ts": "2021-03-18 12:00:00.208"}
{"
key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 12:45:00.208"}

2) User order stream
{"key1": "234", "email": "[hidden email]", "pids": [1754171520], "ts": "2021-03-18 11:55:00.208"}
{"key1": "123", "email": "[hidden email]", "pids": [674378611, 1754171520], "ts": "2021-03-18 12:10:00.208"}

When I try to push the above records to Kafka and select from the below VIEW. I get the result that is actually an INNER join(not OUTER join). 
I even tried posting just one record to stream(1) and no record to stream (2), expecting that that record should be emitted. But nothing was emitted. What was interesting is when I use the processing time instead of event time, I get the results as expected.   

Tables and Views used: 
CREATE TABLE abandoned_visits (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , abandoned_pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
  'connector' = 'kafka',
  'topic' = 'abandoned-visits', 
  'properties.bootstrap.servers' = '...', 
  'format' = 'json'
);

 
CREATE TABLE orders (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = '...',
    'format'    = 'json'
);


CREATE VIEW abandoned_visits_with_no_orders AS
  SELECT
      av.key1
    , av.email
    , av.abandoned_pids
    , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
    , FLOOR(o.ts TO MINUTE)     AS order_timestamp
    , o.email                   AS order_email
  FROM abandoned_visits av
  FULL OUTER JOIN orders o
  ON  av.key1 = o.key1
  AND av.email = o.email
  AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' MINUTE
--  WHERE
--    o.email IS NULL                                  // Commented this out so as to get something in result
;
  
Result: 
select * from abandoned_visits_with_no_orders;  

This gives a result the same as an inner join. It doesn't have rows with NULL order data.
I would appreciate any help. 

Thanks,
Aneesha



--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL : Interval Outer/Left Join not working as expected

Benchao Li-2
Glad to hear that you resolved the issue.

Aneesha Kaushal <[hidden email]> 于2021年3月19日周五 下午10:49写道:
Hello Benchao, 

Thanks for clarifying!
The issue was I send very few records so I could not test how the watermark is progressing. Today after trying on continuous stream I was able to get the results. 


On Fri, Mar 19, 2021 at 5:24 PM Benchao Li <[hidden email]> wrote:
Hi Aneesha,

For the interval join operator will output the data with NULL when it confirms that
there will no data coming before the watermark.
And there is an optimization for reducing state access, which may add more time 
to trigger the output of these data.
For your case, it's almost 30 + 30 = 60 min. Did you wait that long to check the output?

Aneesha Kaushal <[hidden email]> 于2021年3月18日周四 下午11:29写道:
Hi, 

I am doing a simple POC using Flink SQL and I am facing some issues with Interval Join. 

Use Case: I have two Kafka streams and using Flink SQL interval join I want to remove rows from stream 1(abandoned_user_visits) that are present in stream 2(orders) within some time interval.

Data:
1) Abandoned user visits. Sample data:
{"key1": "123", "email": "[hidden email]", "abandoned_pids": [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
{"key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 11:45:00.208"}
{"
key1": "123", "email": "[hidden email]", "abandoned_pids": [1754171520], "ts": "2021-03-18 12:00:00.208"}
{"
key1": "234", "email": "[hidden email]", "abandoned_pids": [1942367711], "ts": "2021-03-18 12:45:00.208"}

2) User order stream
{"key1": "234", "email": "[hidden email]", "pids": [1754171520], "ts": "2021-03-18 11:55:00.208"}
{"key1": "123", "email": "[hidden email]", "pids": [674378611, 1754171520], "ts": "2021-03-18 12:10:00.208"}

When I try to push the above records to Kafka and select from the below VIEW. I get the result that is actually an INNER join(not OUTER join). 
I even tried posting just one record to stream(1) and no record to stream (2), expecting that that record should be emitted. But nothing was emitted. What was interesting is when I use the processing time instead of event time, I get the results as expected.   

Tables and Views used: 
CREATE TABLE abandoned_visits (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , abandoned_pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
  'connector' = 'kafka',
  'topic' = 'abandoned-visits', 
  'properties.bootstrap.servers' = '...', 
  'format' = 'json'
);

 
CREATE TABLE orders (
        key1 STRING
      , email STRING
      , ts TIMESTAMP(3)
      , pids ARRAY<BIGINT>
      , WATERMARK FOR ts AS ts
)
WITH (
    'connector' = 'kafka',
    'topic'     = 'orders',
    'properties.bootstrap.servers' = '...',
    'format'    = 'json'
);


CREATE VIEW abandoned_visits_with_no_orders AS
  SELECT
      av.key1
    , av.email
    , av.abandoned_pids
    , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
    , FLOOR(o.ts TO MINUTE)     AS order_timestamp
    , o.email                   AS order_email
  FROM abandoned_visits av
  FULL OUTER JOIN orders o
  ON  av.key1 = o.key1
  AND av.email = o.email
  AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30' MINUTE
--  WHERE
--    o.email IS NULL                                  // Commented this out so as to get something in result
;
  
Result: 
select * from abandoned_visits_with_no_orders;  

This gives a result the same as an inner join. It doesn't have rows with NULL order data.
I would appreciate any help. 

Thanks,
Aneesha



--

Best,
Benchao Li


--

Best,
Benchao Li