pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

刘亚坤
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?

场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
<span class="fa fa-minus-square-o" onclick="hide(this)" style="box-sizing: border-box; display: inline-block; font-variant-numeric: normal; font-variant-east-asian: normal; font-stretch: normal; font-size: 15.52px; line-height: 1; font-family: FontAwesome; text-rendering: auto; -webkit-font-smoothing: antialiased; color: rgb(74, 85, 96); white-space: pre-line; cursor: pointer;">{
    "logType":"syslog",
    "message":"sla;flkdsjf"
}
<span class="fa fa-minus-square-o" onclick="hide(this)" style="box-sizing: border-box; display: inline-block; font-variant-numeric: normal; font-variant-east-asian: normal; font-stretch: normal; font-size: 15.52px; line-height: 1; font-family: FontAwesome; text-rendering: auto; -webkit-font-smoothing: antialiased; color: rgb(74, 85, 96); white-space: pre-line; cursor: pointer;">{
    "logType":"alarm",
    "message":"sla;flkdsjf"
}
      t_env.from_path("source")\
          .filter("logType=syslog")\
          .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)

如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:

      t_env.from_path("source")\
          .filter("logType=syslog")\
          .insert_into("sink1")\
          .filter("logType=alarm")\
          .insert_into("sink2")
请各位大牛指点,感谢



Reply | Threaded
Open this post in threaded view
|

Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

刘亚坤


测试使用如下结构:
table= t_env.from_path("source")

if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")


我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??




在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: >{ > "logType":"syslog", > "message":"sla;flkdsjf" >} >{ > "logType":"alarm", > "message":"sla;flkdsjf" >} > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1") >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: >if logType=="syslog": > insert_into(sink1) >elif logType=="alarm": > insert_into(sink2) > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1")\ > .filter("logType=alarm")\ > .insert_into("sink2") >请各位大牛指点,感谢 > > > > >
Reply | Threaded
Open this post in threaded view
|

Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

jincheng sun
您好,jack:

Table API  不用 if/else 直接用类似逻辑即可:

val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")


Best,
Jincheng



jack <[hidden email]> 于2020年6月19日周五 上午10:35写道:


测试使用如下结构:
table= t_env.from_path("source")

if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")


我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??




在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: >{ > "logType":"syslog", > "message":"sla;flkdsjf" >} >{ > "logType":"alarm", > "message":"sla;flkdsjf" >} > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1") >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: >if logType=="syslog": > insert_into(sink1) >elif logType=="alarm": > insert_into(sink2) > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1")\ > .filter("logType=alarm")\ > .insert_into("sink2") >请各位大牛指点,感谢 > > > > >
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

刘亚坤

您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑


Best,
Jack



在 2020-06-22 14:28:04,"jincheng sun" <[hidden email]> 写道:

您好,jack:

Table API  不用 if/else 直接用类似逻辑即可:

val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")


Best,
Jincheng



jack <[hidden email]> 于2020年6月19日周五 上午10:35写道:


测试使用如下结构:
table= t_env.from_path("source")

if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")


我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??




在 2020-06-19 10:08:25,"jack" <[hidden email]> 写道: >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? > > >场景:使用pyflink通过filter进行条件过滤后插入到sink中, >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: >{ > "logType":"syslog", > "message":"sla;flkdsjf" >} >{ > "logType":"alarm", > "message":"sla;flkdsjf" >} > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1") >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: >if logType=="syslog": > insert_into(sink1) >elif logType=="alarm": > insert_into(sink2) > > >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下: > > > t_env.from_path("source")\ > .filter("logType=syslog")\ > .insert_into("sink1")\ > .filter("logType=alarm")\ > .insert_into("sink2") >请各位大牛指点,感谢 > > > > >