flink sql jmh failure

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql jmh failure

jie mei
Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file

My jmh benchmark code as blew:
@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
.getExecutionEnvironment();
streamEnv.enableCheckpointing(60000);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

// create clickhouse table
new ClickHouseTableBuilder(tableEnv,
parseSchema("clickhouse_sink_table.sql"))
.database("benchmark")
.table("bilophus_sink_benchmark")
.address("jdbc:clickhouse://localhost:8123")
.build();

// create mock data table
tableEnv.executeSql(
parseSchema("clickhouse_source_table.sql") +
"WITH (" +
"'connector' = 'datagen'," +
"'number-of-rows' = '10000000')");

tableEnv.executeSql(
"INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");

}
running command:

mvn clean package -DskipTests
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>test-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>false</skip>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-Xmx6g</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<!--shouldFailOnError-->
<argument>-foe</argument>
<argument>true</argument>
<!--speed up tests-->
<argument>-f</argument>
<argument>1</argument>
<argument>-i</argument>
<argument>1</argument>
<argument>-wi</argument>
<argument>0</argument>
<argument>-rf</argument>
<argument>csv</argument>
<argument>.*</argument>
</arguments>
</configuration>
</plugin>

Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


--

Best Regards
Jeremy Mei
Reply | Threaded
Open this post in threaded view
|

Re: flink sql jmh failure

Yik San Chan
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <[hidden email]> wrote:
Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file

My jmh benchmark code as blew:
@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
.getExecutionEnvironment();
streamEnv.enableCheckpointing(60000);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

// create clickhouse table
new ClickHouseTableBuilder(tableEnv,
parseSchema("clickhouse_sink_table.sql"))
.database("benchmark")
.table("bilophus_sink_benchmark")
.address("jdbc:clickhouse://localhost:8123")
.build();

// create mock data table
tableEnv.executeSql(
parseSchema("clickhouse_source_table.sql") +
"WITH (" +
"'connector' = 'datagen'," +
"'number-of-rows' = '10000000')");

tableEnv.executeSql(
"INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");

}
running command:

mvn clean package -DskipTests
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>test-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>false</skip>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-Xmx6g</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<!--shouldFailOnError-->
<argument>-foe</argument>
<argument>true</argument>
<!--speed up tests-->
<argument>-f</argument>
<argument>1</argument>
<argument>-i</argument>
<argument>1</argument>
<argument>-wi</argument>
<argument>0</argument>
<argument>-rf</argument>
<argument>csv</argument>
<argument>.*</argument>
</arguments>
</configuration>
</plugin>

Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


--

Best Regards
Jeremy Mei
Reply | Threaded
Open this post in threaded view
|

Re: flink sql jmh failure

jie mei
Hi, Yik San 

I use a library wroten by myself and trying to verify the performance. 


Yik San Chan <[hidden email]> 于2021年3月24日周三 下午9:07写道:
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <[hidden email]> wrote:
Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file

My jmh benchmark code as blew:
@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
.getExecutionEnvironment();
streamEnv.enableCheckpointing(60000);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

// create clickhouse table
new ClickHouseTableBuilder(tableEnv,
parseSchema("clickhouse_sink_table.sql"))
.database("benchmark")
.table("bilophus_sink_benchmark")
.address("jdbc:clickhouse://localhost:8123")
.build();

// create mock data table
tableEnv.executeSql(
parseSchema("clickhouse_source_table.sql") +
"WITH (" +
"'connector' = 'datagen'," +
"'number-of-rows' = '10000000')");

tableEnv.executeSql(
"INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");

}
running command:

mvn clean package -DskipTests
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>test-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>false</skip>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-Xmx6g</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<!--shouldFailOnError-->
<argument>-foe</argument>
<argument>true</argument>
<!--speed up tests-->
<argument>-f</argument>
<argument>1</argument>
<argument>-i</argument>
<argument>1</argument>
<argument>-wi</argument>
<argument>0</argument>
<argument>-rf</argument>
<argument>csv</argument>
<argument>.*</argument>
</arguments>
</configuration>
</plugin>

Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


--

Best Regards
Jeremy Mei


--

Best Regards
Jeremy Mei
Reply | Threaded
Open this post in threaded view
|

Re: flink sql jmh failure

Guowei Ma
Hi,
I am not an expert of JMH but it seems that it is not an error. From the log it looks like that the job is not finished. 
The data source continues to read data when JMH finishes.

Thread[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (3/6),5,Flink Task Threads]
  at org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
  at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
  at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

Best,
Guowei


On Wed, Mar 24, 2021 at 9:56 PM jie mei <[hidden email]> wrote:
Hi, Yik San 

I use a library wroten by myself and trying to verify the performance. 


Yik San Chan <[hidden email]> 于2021年3月24日周三 下午9:07写道:
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <[hidden email]> wrote:
Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file

My jmh benchmark code as blew:
@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
.getExecutionEnvironment();
streamEnv.enableCheckpointing(60000);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

// create clickhouse table
new ClickHouseTableBuilder(tableEnv,
parseSchema("clickhouse_sink_table.sql"))
.database("benchmark")
.table("bilophus_sink_benchmark")
.address("jdbc:clickhouse://localhost:8123")
.build();

// create mock data table
tableEnv.executeSql(
parseSchema("clickhouse_source_table.sql") +
"WITH (" +
"'connector' = 'datagen'," +
"'number-of-rows' = '10000000')");

tableEnv.executeSql(
"INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");

}
running command:

mvn clean package -DskipTests
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>test-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>false</skip>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-Xmx6g</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<!--shouldFailOnError-->
<argument>-foe</argument>
<argument>true</argument>
<!--speed up tests-->
<argument>-f</argument>
<argument>1</argument>
<argument>-i</argument>
<argument>1</argument>
<argument>-wi</argument>
<argument>0</argument>
<argument>-rf</argument>
<argument>csv</argument>
<argument>.*</argument>
</arguments>
</configuration>
</plugin>

Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


--

Best Regards
Jeremy Mei


--

Best Regards
Jeremy Mei
Reply | Threaded
Open this post in threaded view
|

Re: flink sql jmh failure

jie mei
HI, Guowei

yeah, I think so too. There is no way trigger a checkpoint and wath the checkpoint finished now, so I will do the benchmark with lower level api.


Guowei Ma <[hidden email]> 于2021年3月25日周四 下午4:59写道:
Hi,
I am not an expert of JMH but it seems that it is not an error. From the log it looks like that the job is not finished. 
The data source continues to read data when JMH finishes.

Thread[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (3/6),5,Flink Task Threads]
  at org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82)
  at org.apache.flink.table.data.StringData.fromString(StringData.java:52)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320)
  at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277)
  at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

Best,
Guowei


On Wed, Mar 24, 2021 at 9:56 PM jie mei <[hidden email]> wrote:
Hi, Yik San 

I use a library wroten by myself and trying to verify the performance. 


Yik San Chan <[hidden email]> 于2021年3月24日周三 下午9:07写道:
Hi Jie,

I am curious what library do you use to get the ClickHouseTableBuilder

On Wed, Mar 24, 2021 at 8:41 PM jie mei <[hidden email]> wrote:
Hi, Community

I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file

My jmh benchmark code as blew:
@Benchmark
@Threads(1)
@Fork(1)
public void sinkBenchmark() throws IOException {

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment
.getExecutionEnvironment();
streamEnv.enableCheckpointing(60000);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

// create clickhouse table
new ClickHouseTableBuilder(tableEnv,
parseSchema("clickhouse_sink_table.sql"))
.database("benchmark")
.table("bilophus_sink_benchmark")
.address("jdbc:clickhouse://localhost:8123")
.build();

// create mock data table
tableEnv.executeSql(
parseSchema("clickhouse_source_table.sql") +
"WITH (" +
"'connector' = 'datagen'," +
"'number-of-rows' = '10000000')");

tableEnv.executeSql(
"INSERT INTO CLICKHOUSE_SINK_BENCHMARK SELECT '2020-12-12', * FROM CLICKHOUSE_SOURCE_BENCHMARK");

}
running command:

mvn clean package -DskipTests
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>test-benchmarks</id>
<phase>test</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>false</skip>
<classpathScope>test</classpathScope>
<executable>java</executable>
<arguments>
<argument>-Xmx6g</argument>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jmh.Main</argument>
<!--shouldFailOnError-->
<argument>-foe</argument>
<argument>true</argument>
<!--speed up tests-->
<argument>-f</argument>
<argument>1</argument>
<argument>-i</argument>
<argument>1</argument>
<argument>-wi</argument>
<argument>0</argument>
<argument>-rf</argument>
<argument>csv</argument>
<argument>.*</argument>
</arguments>
</configuration>
</plugin>

Non-finished threads:

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)

Thread[flink-akka.actor.default-dispatcher-8,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[flink-akka.actor.default-dispatcher-2,5,main]
 at sun.misc.Unsafe.park(Native Method)
 at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s
econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal
og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads]
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 at java.lang.Thread.run(Thread.java:748)


--

Best Regards
Jeremy Mei


--

Best Regards
Jeremy Mei


--

Best Regards
Jeremy Mei