Flink-JDBC JDBCUpsertTableSink keyFields Problem

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink-JDBC JDBCUpsertTableSink keyFields Problem

Polarisary
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly is modified to ture, and keyFields is modified to null by StreamExecSink, but i want to upsert,
Does this related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

张万新
Yes it's related to your sql, flink checks the plan of your sql to judge whether your job is append only or has updates. If your job is append only, that means no result need to be updated.

If you still have problems, please post your sql and complete error message to help people understand your use case.

Polarisary <[hidden email]> 于2019年11月13日周三 下午6:43写道:
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly is modified to ture, and keyFields is modified to null by StreamExecSink, but i want to upsert,
Does this related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks

Reply | Threaded
Open this post in threaded view
|

Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

Polarisary
My sql is regular insert like “insert into sink_table select c1,c2,c3 from source_table”, 
I want to know which case it will judge to append only? Does it has doc for this?

Many thanks!





在 2019年11月14日,上午10:05,张万新 <[hidden email]> 写道:

Yes it's related to your sql, flink checks the plan of your sql to judge whether your job is append only or has updates. If your job is append only, that means no result need to be updated.

If you still have problems, please post your sql and complete error message to help people understand your use case.

Polarisary <[hidden email]> 于2019年11月13日周三 下午6:43写道:
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly is modified to ture, and keyFields is modified to null by StreamExecSink, but i want to upsert,
Does this related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks


Reply | Threaded
Open this post in threaded view
|

Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

张万新
A typical use case that will genreate updates (meaning not append only) is a non-widown groupy-by aggregation, like "select user, count(url) from clicks group by user".

You can refer to the flink doc at https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html  

Polarisary <[hidden email]> 于2019年11月14日周四 下午3:35写道:
My sql is regular insert like “insert into sink_table select c1,c2,c3 from source_table”, 
I want to know which case it will judge to append only? Does it has doc for this?

Many thanks!





在 2019年11月14日,上午10:05,张万新 <[hidden email]> 写道:

Yes it's related to your sql, flink checks the plan of your sql to judge whether your job is append only or has updates. If your job is append only, that means no result need to be updated.

If you still have problems, please post your sql and complete error message to help people understand your use case.

Polarisary <[hidden email]> 于2019年11月13日周三 下午6:43写道:
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly is modified to ture, and keyFields is modified to null by StreamExecSink, but i want to upsert,
Does this related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks


Reply | Threaded
Open this post in threaded view
|

Re: Flink-JDBC JDBCUpsertTableSink keyFields Problem

Jingsong Li
Hi Polarisary:

Maybe I see what you mean. You want to use the upsert mode for an append stream without keyFields.
In fact, both isAppend and keyFields are set automatically by the planner framework. You can't control them.
So yes, it is related to sql, only upsert stream can be inserted into sink with upsert mode.

Now whether the JDBCUpsertTableSink is an upsert mode is only controlled by keyFields. So if you really want to support this scenario, you need to make a small change to the JDBCUpsertTableSink:
@Override
public void setKeyFields(String[] keys) {
if (keys != null) {
this.keyFields = keys;
}
}
And you need set your keyFields to JDBCUpsertTableSink.
You can have a try.

Best,
Jingsong Lee

On Thu, Nov 14, 2019 at 4:44 PM 张万新 <[hidden email]> wrote:
A typical use case that will genreate updates (meaning not append only) is a non-widown groupy-by aggregation, like "select user, count(url) from clicks group by user".

You can refer to the flink doc at https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html  

Polarisary <[hidden email]> 于2019年11月14日周四 下午3:35写道:
My sql is regular insert like “insert into sink_table select c1,c2,c3 from source_table”, 
I want to know which case it will judge to append only? Does it has doc for this?

Many thanks!





在 2019年11月14日,上午10:05,张万新 <[hidden email]> 写道:

Yes it's related to your sql, flink checks the plan of your sql to judge whether your job is append only or has updates. If your job is append only, that means no result need to be updated.

If you still have problems, please post your sql and complete error message to help people understand your use case.

Polarisary <[hidden email]> 于2019年11月13日周三 下午6:43写道:
Hi
When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly is modified to ture, and keyFields is modified to null by StreamExecSink, but i want to upsert,
Does this related to sql?

the stack as follows:
at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
at task.Device.main(Device.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)

Hope to reply!
many thanks




--
Best, Jingsong Lee