Hi, Community I run a jmh benchmark task get blew error, which use flink sql consuming data from data-gen connector(10_000_000) and write data to clickhouse. blew is partly log and you can see completable log by attached file My jmh benchmark code as blew: @Benchmark running command: mvn clean package -DskipTests <plugin> Non-finished threads:
Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (1/6),5,Flink Task Threads] at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Thread[flink-akka.actor.default-dispatcher-8,5,main] at sun.misc.Unsafe.park(Native Method) at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thread[flink-akka.actor.default-dispatcher-2,5,main] at sun.misc.Unsafe.park(Native Method) at akka.dispatch.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thread[Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, s econd_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catal og.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (4/6),5,Flink Task Threads] at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:298) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Best Regards Jeremy Meilog (2M) Download Attachment |
Hi Jie, I am curious what library do you use to get the ClickHouseTableBuilder On Wed, Mar 24, 2021 at 8:41 PM jie mei <[hidden email]> wrote:
|
Hi, Yik San I use a library wroten by myself and trying to verify the performance. Yik San Chan <[hidden email]> 于2021年3月24日周三 下午9:07写道:
Best Regards Jeremy Mei |
Hi, I am not an expert of JMH but it seems that it is not an error. From the log it looks like that the job is not finished. The data source continues to read data when JMH finishes. Thread[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, CLICKHOUSE_SOURCE_BENCHMARK]], fields=[first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Calc(select=[_UTF-16LE'2020-12-12' AS EXPR$0, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) -> Sink: Sink(table=[default_catalog.default_database.CLICKHOUSE_SINK_BENCHMARK], fields=[dt, first_bigint, second_bigint, first_int, second_int, first_float, second_float, first_double, second_double, first_string, second_string]) (3/6),5,Flink Task Threads] at org.apache.flink.table.data.binary.BinaryStringData.fromString(BinaryStringData.java:82) at org.apache.flink.table.data.StringData.fromString(StringData.java:52) at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:171) at org.apache.flink.table.factories.DataGenTableSourceFactory$1.next(DataGenTableSourceFactory.java:168) at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:320) at org.apache.flink.table.factories.DataGenTableSourceFactory$RowGenerator.next(DataGenTableSourceFactory.java:277) at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Best, Guowei On Wed, Mar 24, 2021 at 9:56 PM jie mei <[hidden email]> wrote:
|
HI, Guowei yeah, I think so too. There is no way trigger a checkpoint and wath the checkpoint finished now, so I will do the benchmark with lower level api. Guowei Ma <[hidden email]> 于2021年3月25日周四 下午4:59写道:
Best Regards Jeremy Mei |
Free forum by Nabble | Edit this page |