Python UDF filter problem

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Python UDF filter problem

László Ciople
Hello,
I am trying to use Flink v1.11.2 with Python and the Table API to read and write back messages to kafka topics. I am trying to filter messages based on the output of a udf which returns a boolean. It seems that Flink ignores the WHERE clause in my queries and every input message is received in the output topic.
The input table is declared in SQL:
--sql
CREATE TABLE teams_event (
`payload` ROW(
`createdDateTime` STRING,
`body` ROW(
`content` STRING
),
`from` ROW(
`user` ROW(
`displayName` STRING
)
),
`channelIdentity` ROW(
`channelId` STRING
)
)
) WITH (
'connector' = 'kafka',
'topic' = 'xdr.ms_teams2.events.messages',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'properties.group.id' = 'teams_profanity_filter',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
The output table is also declared in sql:
--sql
CREATE TABLE teams_profanity_event (
`createdAt` STRING,
`censoredMessage` STRING,
`username` STRING,
`channelId` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'internal.alerts.teams.rude_employees2',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'format' = 'json'
)
I have declared two udfs and registered them in the table environment
@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.BOOLEAN())
def contains_profanity(payload):
message_content = payload[1][0]
found_profanity = profanity.contains_profanity(message_content)
logger.info(f'Message "{message_content}" contains profanity: {found_profanity}')
return found_profanity


@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.STRING())
def censor_profanity(payload):
message_content = payload[1][0]
censored_message = profanity.censor(message_content)
logger.info(f'Censored message: "{censored_message}"')
return censored_message
The filtering of the messages and insertion into the sink is declared with SQL:
--sql
INSERT INTO teams_profanity_event (
SELECT `payload`.`createdDateTime`,
censor_profanity(`payload`),
`payload`.`from`.`user`.`displayName`,
`payload`.`channelIdentity`.`channelId`
FROM teams_event
WHERE contains_profanity(`payload`)
)
Am I doing something wrong? It seems that the contains_profanity udf is not used in the pipeline:
image.png
Thank you in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Python UDF filter problem

Xingbo Huang
Hi,

This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 and release-1.12.0 have not been released yet (VOTE has been passed). I run your job in release-1.12, and the plan is correct.


[1] https://issues.apache.org/jira/browse/FLINK-19675

Best,
Xingbo

László Ciople <[hidden email]> 于2020年12月8日周二 下午5:21写道:
Hello,
I am trying to use Flink v1.11.2 with Python and the Table API to read and write back messages to kafka topics. I am trying to filter messages based on the output of a udf which returns a boolean. It seems that Flink ignores the WHERE clause in my queries and every input message is received in the output topic.
The input table is declared in SQL:
--sql
CREATE TABLE teams_event (
`payload` ROW(
`createdDateTime` STRING,
`body` ROW(
`content` STRING
),
`from` ROW(
`user` ROW(
`displayName` STRING
)
),
`channelIdentity` ROW(
`channelId` STRING
)
)
) WITH (
'connector' = 'kafka',
'topic' = 'xdr.ms_teams2.events.messages',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'properties.group.id' = 'teams_profanity_filter',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
The output table is also declared in sql:
--sql
CREATE TABLE teams_profanity_event (
`createdAt` STRING,
`censoredMessage` STRING,
`username` STRING,
`channelId` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'internal.alerts.teams.rude_employees2',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'format' = 'json'
)
I have declared two udfs and registered them in the table environment
@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.BOOLEAN())
def contains_profanity(payload):
message_content = payload[1][0]
found_profanity = profanity.contains_profanity(message_content)
logger.info(f'Message "{message_content}" contains profanity: {found_profanity}')
return found_profanity


@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.STRING())
def censor_profanity(payload):
message_content = payload[1][0]
censored_message = profanity.censor(message_content)
logger.info(f'Censored message: "{censored_message}"')
return censored_message
The filtering of the messages and insertion into the sink is declared with SQL:
--sql
INSERT INTO teams_profanity_event (
SELECT `payload`.`createdDateTime`,
censor_profanity(`payload`),
`payload`.`from`.`user`.`displayName`,
`payload`.`channelIdentity`.`channelId`
FROM teams_event
WHERE contains_profanity(`payload`)
)
Am I doing something wrong? It seems that the contains_profanity udf is not used in the pipeline:
image.png
Thank you in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Python UDF filter problem

László Ciople
Awesome, thanks! 

On Tue, Dec 8, 2020 at 11:55 AM Xingbo Huang <[hidden email]> wrote:
Hi,

This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 and release-1.12.0 have not been released yet (VOTE has been passed). I run your job in release-1.12, and the plan is correct.


[1] https://issues.apache.org/jira/browse/FLINK-19675

Best,
Xingbo

László Ciople <[hidden email]> 于2020年12月8日周二 下午5:21写道:
Hello,
I am trying to use Flink v1.11.2 with Python and the Table API to read and write back messages to kafka topics. I am trying to filter messages based on the output of a udf which returns a boolean. It seems that Flink ignores the WHERE clause in my queries and every input message is received in the output topic.
The input table is declared in SQL:
--sql
CREATE TABLE teams_event (
`payload` ROW(
`createdDateTime` STRING,
`body` ROW(
`content` STRING
),
`from` ROW(
`user` ROW(
`displayName` STRING
)
),
`channelIdentity` ROW(
`channelId` STRING
)
)
) WITH (
'connector' = 'kafka',
'topic' = 'xdr.ms_teams2.events.messages',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'properties.group.id' = 'teams_profanity_filter',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
The output table is also declared in sql:
--sql
CREATE TABLE teams_profanity_event (
`createdAt` STRING,
`censoredMessage` STRING,
`username` STRING,
`channelId` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'internal.alerts.teams.rude_employees2',
'properties.bootstrap.servers' = 'senso-kafka.solexdc01.bitdefender.biz:29030',
'format' = 'json'
)
I have declared two udfs and registered them in the table environment
@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.BOOLEAN())
def contains_profanity(payload):
message_content = payload[1][0]
found_profanity = profanity.contains_profanity(message_content)
logger.info(f'Message "{message_content}" contains profanity: {found_profanity}')
return found_profanity


@udf(input_types=[
DataTypes.ROW([
DataTypes.FIELD("createdDateTime", DataTypes.STRING()),
DataTypes.FIELD("body", DataTypes.ROW([
DataTypes.FIELD("content", DataTypes.STRING())
])),
DataTypes.FIELD("from", DataTypes.ROW([
DataTypes.FIELD("user", DataTypes.ROW([
DataTypes.FIELD("displayName", DataTypes.STRING())
]))
])),
DataTypes.FIELD("channelIdentity", DataTypes.ROW([
DataTypes.FIELD("channelId", DataTypes.STRING())
]))
])],
result_type=DataTypes.STRING())
def censor_profanity(payload):
message_content = payload[1][0]
censored_message = profanity.censor(message_content)
logger.info(f'Censored message: "{censored_message}"')
return censored_message
The filtering of the messages and insertion into the sink is declared with SQL:
--sql
INSERT INTO teams_profanity_event (
SELECT `payload`.`createdDateTime`,
censor_profanity(`payload`),
`payload`.`from`.`user`.`displayName`,
`payload`.`channelIdentity`.`channelId`
FROM teams_event
WHERE contains_profanity(`payload`)
)
Am I doing something wrong? It seems that the contains_profanity udf is not used in the pipeline:
image.png
Thank you in advance!