I have below use case

Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but  yarn application is still running when insert job finished, and yarn container is not released.

I try to use BatchTableEnvironment, but “Primary key and unique key are not supported yet; i try to use StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but it not works.

Please help to offer some advice. 



[test case code]

val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
Row.ofKind(RowKind.INSERT, "1"),
Row.ofKind(RowKind.INSERT, "2")).select("f0")

btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
"id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," +
btenv.executeSql("insert into test_result select f0 from bound")



11 posts
Hi, vtygoss

You can check out the official demo[1]

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings

val tEnv = TableEnvironment.create(setting)


