(no subject)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

vtygoss


Hi,

I have below use case


Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but  yarn application is still running when insert job finished, and yarn container is not released.


I try to use BatchTableEnvironment, but “Primary key and unique key are not supported yet; i try to use StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but it not works.


Please help to offer some advice. 


Regards



```

[test case code]

val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
Row.ofKind(RowKind.INSERT, "1"),
Row.ofKind(RowKind.INSERT, "2")).select("f0")

btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
"id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," +
s"'key.format'='json','value.format'='json')")
btenv.executeSql("insert into test_result select f0 from bound")


```


 




Reply | Threaded
Open this post in threaded view
|

Re:

Jake
Hi, vtygoss


You can check out the official demo[1]

```
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
    .newInstance()
    //.inStreamingMode()
    .inBatchMode()
    .build()

val tEnv = TableEnvironment.create(setting)
```

Regards




On May 19, 2021, at 18:01, vtygoss <[hidden email]> wrote:


Hi,

I have below use case

Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but  yarn application is still running when insert job finished, and yarn container is not released.

I try to use BatchTableEnvironment, but “Primary key and unique key are not supported yet; i try to use StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but it not works.

Please help to offer some advice. 

Regards


```
[test case code]
val (senv, btenv) = FlinkSession.getOrCreate()
val table = btenv.fromValues(
Row.ofKind(RowKind.INSERT, "1"),
Row.ofKind(RowKind.INSERT, "2")).select("f0")

btenv.createTemporaryView("bound", table)
btenv.executeSql(s"create table if not exists test_result(" +
"id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," +
s"'key.format'='json','value.format'='json')")
btenv.executeSql("insert into test_result select f0 from bound")

```