Hi,
I have below use case
Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but yarn application is still running when insert job finished, and yarn container is not released.
I try to use BatchTableEnvironment, but “Primary key and unique key are not supported yet”; i try to use StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but it not works.
Please help to offer some advice.
Regards
```
[test case code]
val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
Row.ofKind(RowKind.INSERT, "1"),
Row.ofKind(RowKind.INSERT, "2")).select("f0")
btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
"id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," +
s"'key.format'='json','value.format'='json')")
btenv.executeSql("insert into test_result select f0 from bound")
```