Re: Can I do a lookup in FlinkSQL?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Can I do a lookup in FlinkSQL?

zhangjun
You  can  try to use UDTF



------------------ Original ------------------
From: srikanth flink <[hidden email]>
Date: Mon,Sep 16,2019 9:23 PM
To: dev <[hidden email]>, user <[hidden email]>, user-ml <[hidden email]>
Subject: Re: Can I do a lookup in FlinkSQL?

Hi there,

I'm working with streaming in FlinkSQL. I've two tables created one with
dynamic stream and the other a periodic updates.
I would like to keep the periodic table a static(but updates with new data
every day or so by flushing the old), So at any point of time the static
table should contain new set of data.
With dynamic table being populated with stream data, could I do a lookup on
a column of static table to find if the value exists.

This is what I have done:
dynamic table: sourceKafka
static table: badips

Trying to build a list, kind of using ROW() function and done. From dynamic
table, trying to lookup into the list if the value exists.
Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
Resonse:
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
operator must have compatible types

Is it possible to solve my use case? If so, where am I going wrong?

Thanks
Srikanth

Reply | Threaded
Open this post in threaded view
|

Re: Can I do a lookup in FlinkSQL?

Zhenghua Gao

On Mon, Sep 16, 2019 at 9:23 PM srikanth flink <[hidden email]> wrote:
Hi there,

I'm working with streaming in FlinkSQL. I've two tables created one with
dynamic stream and the other a periodic updates.
I would like to keep the periodic table a static(but updates with new data
every day or so by flushing the old), So at any point of time the static
table should contain new set of data.
With dynamic table being populated with stream data, could I do a lookup on
a column of static table to find if the value exists.

This is what I have done:
dynamic table: sourceKafka
static table: badips

Trying to build a list, kind of using ROW() function and done. From dynamic
table, trying to lookup into the list if the value exists.
Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
Resonse:
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
operator must have compatible types

Is it possible to solve my use case? If so, where am I going wrong?

Thanks
Srikanth