<Need help>The processing of flink sql is slow,how to accelerate it?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

<Need help>The processing of flink sql is slow,how to accelerate it?

wydhcws@gmail.com

Hi all,I wrote some indicator calculation programs using flink sql, and read the data from kafka and send to influxdb after processing by flinksql. I found that when the Kafka log production speed is 10 k / min at night, there is no problem with the program, but it gradually increases to 100k / min during the day ,It  became difficult for KafkaConsumer to consume data, using flink 1.9, now I feel that  the layer of Flink sql execution  is a bit slow, the window is scrolling for 5 minutes, currently using two solts, increasing the parallelism and tried it has no effect, Is there any solution for this?    Thank you for everyone
code show as below:  
val windowWidth = 5

//stream config
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000 * 60*5)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000*10)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//table config
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10))

tEnv.registerFunction("TimeFormatJava", new TimeFormatJava())
tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF)


//Kafka Source
val kafkaProperties: Properties = new Properties
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId)
kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000")
kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500")


var topics: util.List[String] = new util.ArrayList[String]

for (topic <- kafkaTopics.split(SPLIT)) {
topics.add(topic)
}

val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties)

val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => {
val info: String = msg.substring(msg.indexOf("{"), msg.length)
val createTime = msg.substring(0, 19)
val timeStamp = getLongTime(createTime)

val json = JSON.parseObject(info)
DriverSearch(
json.getString("driverId") + "_" + timeStamp,
json.getString("driverId"),
json.getIntValue("searchType"),
timeStamp
)
}).setParallelism(2)

val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) {
override def extractTimestamp(element: DriverSearch): Long = element.timestamp
}
)

driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis()))

val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w)

val sql1: String =
s"""
select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end,
searchType,
count(distinct driverId) as typeUv,
count(distinct rowKey) as typePv
from $table
group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType
""".stripMargin

val resultTable1: Table = tEnv.sqlQuery(sql1)

val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch")

val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map {
row => {
val typeName: String= typeMap(row.getField(1).asInstanceOf[Int])
val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour
val fields = new util.HashMap[String,Object]()
val tags = new util.HashMap[String,String]()
fields.put("typeUv", row.getField(2))
fields.put("typePv",row.getField(3))
point.setFields(fields)
tags.put("typeName",typeName)
point.setTags(tags)
point
}
}

influxStream.map{
point=>{
println( println("influxPoint:"+point.getFields+"=="
+point.getTags+"=="+point.getMeasurement
+"=="+point.getTimestamp+"time:"+System.currentTimeMillis())
)
}
}


val influxDBConfig = InfluxDBConfig.builder("http://host:8086", "admin", "admin", "aimetric").build


influxStream.addSink(new InfluxDBSink(influxDBConfig))

env.execute()

}


def getLongTime(str:String) ={
val format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val time: Long = format.parse(str).getTime
time

Reply | Threaded
Open this post in threaded view
|

Re: Re: <Need help>The processing of flink sql is slow,how to accelerate it?

wydhcws@gmail.com
Dear  DannyChan ~
Thanks for your reply , In fact, I tried many methods, including adjusting the degree of parallelism (I tried to increase it to 4), adjusting the parameters of KafkaConsumer,change the window size from 5min to 1min, but none of them worked in the end.As for the problem of influxdbSink, it is now rolling the window for 5 minutes.(Very low) ,It should not be a bottleneck (I have a sink whose data source is canal binlog also uses this sink, no problem writing.Moreover, the local test show it consumer a little data at the beginning, and then it could not be consumed. ), the back pressure of the program shows that the bottleneck should be the flink sql layer whick make kafka conordinator dead, kafka log speed is 1.8k /s( Is the amount of data too large for flinksql?), and  Is there any tuning method for flink sql (may be need use blink)? Or I try to write a program with flink api or spark? 

Best
wydhcws
计算机生成了可选文字:
0
RUNNING
旧:18a10f(e3fdOfedb6a6b350(d73487
StartTime2019.12.0219:11:25
Overview
Exceptions
TimeLine
Checkpoints
Configuration
14h33m
Detail
SubTask
SubTasks
TaskManagers
WatermarksAccumulators
BackPressure
Measurement:10S3g0
Source:CustomSource一>Ma
p一>Timestamps/Watermarks
一>from:(driverld,searchType,
w)一>timeattribute:)
Parallelism:4
groupBy.阝r01弯卫蚓n00“gumblingGroupWindow('wS,'w,彐閬..
Source:CustomSource一>Map一>Tirnestamps/Watermarks一>from:.
艹
groupBy:(searchType),
(TumblingGroupWindc••
、彐00000.millis〕),5乸e
earchType,COUNT(DISTII
driverld)AStypeuv,COU
AStypePv,start('wS〕AS、
rtend('wS〕ASwSend,<
e('wS)ASwSrowtime,prc
e('wS)AS'"Sproctime)一>“
ct:(wSendAStime_end,sea
hType,type„•
BytesReceived
248M
RecordsReceived
《11《474
BytesSent
24gMB
RecordsSent
151虍05
r肱m
BackPressureStatus:
HIGH
StartTime
201g-12.0210113彐
201g一12.0211133
Metrics
HIGH
HIGH
HIGH
HIGH
Duratlon
14h彐彐m
14h31
EndTime
Cancel
Tasks
0
RUNNING

Consumption is too slow and kafka coordinator dead


local test
 In the Production Environment ,it also consumer a little data at the beginning, and then it could not be consumed

 
Date: 2019-12-02 18:15
Subject: Re: <Need help>The processing of flink sql is slow,how to accelerate it?
Dear wydhcws ~

Usually reading data from source is not bottleneck, most probably it the influxdb that can not consume the data as fast. So the whole program just trigger back pressure, and the source do not consume data because its queue memory is full.

Did you try to increase the parallelism of the sink tasks ?

Best,
Danny Chan
在 2019年12月2日 +0800 PM5:24,[hidden email] <[hidden email]>,写道:

Hi all,I wrote some indicator calculation programs using flink sql, and read the data from kafka and send to influxdb after processing by flinksql. I found that when the Kafka log production speed is 10 k / min at night, there is no problem with the program, but it gradually increases to 100k / min during the day ,It  became difficult for KafkaConsumer to consume data, using flink 1.9, now I feel that  the layer of Flink sql execution  is a bit slow, the window is scrolling for 5 minutes, currently using two solts, increasing the parallelism and tried it has no effect, Is there any solution for this?    Thank you for everyone
code show as below:  
val windowWidth = 5

//stream config
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000 * 60*5)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000*10)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//table config
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10))

tEnv.registerFunction("TimeFormatJava", new TimeFormatJava())
tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF)


//Kafka Source
val kafkaProperties: Properties = new Properties
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId)
kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000")
kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500")


var topics: util.List[String] = new util.ArrayList[String]

for (topic <- kafkaTopics.split(SPLIT)) {
topics.add(topic)
}

val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties)

val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => {
val info: String = msg.substring(msg.indexOf("{"), msg.length)
val createTime = msg.substring(0, 19)
val timeStamp = getLongTime(createTime)

val json = JSON.parseObject(info)
DriverSearch(
json.getString("driverId") + "_" + timeStamp,
json.getString("driverId"),
json.getIntValue("searchType"),
timeStamp
)
}).setParallelism(2)

val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) {
override def extractTimestamp(element: DriverSearch): Long = element.timestamp
}
)

driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis()))

val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w)

val sql1: String =
s"""
select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end,
searchType,
count(distinct driverId) as typeUv,
count(distinct rowKey) as typePv
from $table
group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType
""".stripMargin

val resultTable1: Table = tEnv.sqlQuery(sql1)

val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch")

val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map {
row => {
val typeName: String= typeMap(row.getField(1).asInstanceOf[Int])
val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour
val fields = new util.HashMap[String,Object]()
val tags = new util.HashMap[String,String]()
fields.put("typeUv", row.getField(2))
fields.put("typePv",row.getField(3))
point.setFields(fields)
tags.put("typeName",typeName)
point.setTags(tags)
point
}
}

influxStream.map{
point=>{
println( println("influxPoint:"+point.getFields+"=="
+point.getTags+"=="+point.getMeasurement
+"=="+point.getTimestamp+"time:"+System.currentTimeMillis())
)
}
}


val influxDBConfig = InfluxDBConfig.builder("http://host:8086", "admin", "admin", "aimetric").build


influxStream.addSink(new InfluxDBSink(influxDBConfig))

env.execute()

}


def getLongTime(str:String) ={
val format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val time: Long = format.parse(str).getTime
time
<Catch.jpg>
Reply | Threaded
Open this post in threaded view
|

Re: Re: <Need help>The processing of flink sql is slow,how to accelerate it?

Arvid Heise-3
Hi wydhcws,

could you please check if you have enough kafka partitions?
I recently answered a similar question on SO and the main culprit was the lack of Kafka partitions [1].
If you'd provide a bit more details on that, I could help more. 

However, looking at the backpressure, it looks the processing or sink is the main source of lag. Did you ever run without the println map? That's definitely an obvious bottleneck.

Btw there should be no huge difference between Flink SQL and streaming API. The only difference is that you can't customize windows yet in Flink SQL.


On Tue, Dec 3, 2019 at 3:28 AM [hidden email] <[hidden email]> wrote:
Dear  DannyChan ~
Thanks for your reply , In fact, I tried many methods, including adjusting the degree of parallelism (I tried to increase it to 4), adjusting the parameters of KafkaConsumer,change the window size from 5min to 1min, but none of them worked in the end.As for the problem of influxdbSink, it is now rolling the window for 5 minutes.(Very low) ,It should not be a bottleneck (I have a sink whose data source is canal binlog also uses this sink, no problem writing.Moreover, the local test show it consumer a little data at the beginning, and then it could not be consumed. ), the back pressure of the program shows that the bottleneck should be the flink sql layer whick make kafka conordinator dead, kafka log speed is 1.8k /s( Is the amount of data too large for flinksql?), and  Is there any tuning method for flink sql (may be need use blink)? Or I try to write a program with flink api or spark? 

Best
wydhcws
计算机生成了可选文字:
0
RUNNING
旧:18a10f(e3fdOfedb6a6b350(d73487
StartTime2019.12.0219:11:25
Overview
Exceptions
TimeLine
Checkpoints
Configuration
14h33m
Detail
SubTask
SubTasks
TaskManagers
WatermarksAccumulators
BackPressure
Measurement:10S3g0
Source:CustomSource一>Ma
p一>Timestamps/Watermarks
一>from:(driverld,searchType,
w)一>timeattribute:)
Parallelism:4
groupBy.阝r01弯卫蚓n00“gumblingGroupWindow('wS,'w,彐閬..
Source:CustomSource一>Map一>Tirnestamps/Watermarks一>from:.
艹
groupBy:(searchType),
(TumblingGroupWindc••
、彐00000.millis〕),5乸e
earchType,COUNT(DISTII
driverld)AStypeuv,COU
AStypePv,start('wS〕AS、
rtend('wS〕ASwSend,<
e('wS)ASwSrowtime,prc
e('wS)AS'"Sproctime)一>“
ct:(wSendAStime_end,sea
hType,type„•
BytesReceived
248M
RecordsReceived
《11《474
BytesSent
24gMB
RecordsSent
151虍05
r肱m
BackPressureStatus:
HIGH
StartTime
201g-12.0210113彐
201g一12.0211133
Metrics
HIGH
HIGH
HIGH
HIGH
Duratlon
14h彐彐m
14h31
EndTime
Cancel
Tasks
0
RUNNING

Consumption is too slow and kafka coordinator dead


local test
 In the Production Environment ,it also consumer a little data at the beginning, and then it could not be consumed

 
Date: 2019-12-02 18:15
Subject: Re: <Need help>The processing of flink sql is slow,how to accelerate it?
Dear wydhcws ~

Usually reading data from source is not bottleneck, most probably it the influxdb that can not consume the data as fast. So the whole program just trigger back pressure, and the source do not consume data because its queue memory is full.

Did you try to increase the parallelism of the sink tasks ?

Best,
Danny Chan
在 2019年12月2日 +0800 PM5:24,[hidden email] <[hidden email]>,写道:

Hi all,I wrote some indicator calculation programs using flink sql, and read the data from kafka and send to influxdb after processing by flinksql. I found that when the Kafka log production speed is 10 k / min at night, there is no problem with the program, but it gradually increases to 100k / min during the day ,It  became difficult for KafkaConsumer to consume data, using flink 1.9, now I feel that  the layer of Flink sql execution  is a bit slow, the window is scrolling for 5 minutes, currently using two solts, increasing the parallelism and tried it has no effect, Is there any solution for this?    Thank you for everyone
code show as below:  
val windowWidth = 5

//stream config
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000 * 60*5)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000*10)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//table config
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.getConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(10))

tEnv.registerFunction("TimeFormatJava", new TimeFormatJava())
tEnv.registerFunction("TimeFormatUDF", TimeFormatUDF)


//Kafka Source
val kafkaProperties: Properties = new Properties
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId)
kafkaProperties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"5000")
kafkaProperties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,"500")


var topics: util.List[String] = new util.ArrayList[String]

for (topic <- kafkaTopics.split(SPLIT)) {
topics.add(topic)
}

val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema, kafkaProperties)

val driverSearchDstream: DataStream[DriverSearch] = env.addSource(kafkaConsumer.setStartFromLatest()).map(msg => {
val info: String = msg.substring(msg.indexOf("{"), msg.length)
val createTime = msg.substring(0, 19)
val timeStamp = getLongTime(createTime)

val json = JSON.parseObject(info)
DriverSearch(
json.getString("driverId") + "_" + timeStamp,
json.getString("driverId"),
json.getIntValue("searchType"),
timeStamp
)
}).setParallelism(2)

val driverSearchDstreamWithEventTime: DataStream[DriverSearch] = driverSearchDstream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[DriverSearch](org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)) {
override def extractTimestamp(element: DriverSearch): Long = element.timestamp
}
)

driverSearchDstream.map(info=>println(info+"time:"+System.currentTimeMillis()))

val table: Table = tEnv.fromDataStream(driverSearchDstreamWithEventTime, 'rowKey, 'driverId, 'searchType, 'timestamp.rowtime as 'w)

val sql1: String =
s"""
select TimeFormatJava(TUMBLE_END(w, INTERVAL '$windowWidth' MINUTE),8) as time_end,
searchType,
count(distinct driverId) as typeUv,
count(distinct rowKey) as typePv
from $table
group by TUMBLE(w, INTERVAL '$windowWidth' MINUTE),searchType
""".stripMargin

val resultTable1: Table = tEnv.sqlQuery(sql1)

val typeMap= immutable.Map(1->"1-goWorkSearch",2->"2-offWorkSearch",3->"3-nearbySearch",4->"4-temporarySearch",5->"5-commonSearch",6->"6-multiplySearch")

val influxStream: DataStream[InfluxDBPoint] = tEnv.toAppendStream[Row](resultTable1).map {
row => {
val typeName: String= typeMap(row.getField(1).asInstanceOf[Int])
val point = new InfluxDBPoint("Carpool_Search_Pv_Uv", row.getField(0).asInstanceOf[Long]) //udf +8hour
val fields = new util.HashMap[String,Object]()
val tags = new util.HashMap[String,String]()
fields.put("typeUv", row.getField(2))
fields.put("typePv",row.getField(3))
point.setFields(fields)
tags.put("typeName",typeName)
point.setTags(tags)
point
}
}

influxStream.map{
point=>{
println( println("influxPoint:"+point.getFields+"=="
+point.getTags+"=="+point.getMeasurement
+"=="+point.getTimestamp+"time:"+System.currentTimeMillis())
)
}
}


val influxDBConfig = InfluxDBConfig.builder("http://host:8086", "admin", "admin", "aimetric").build


influxStream.addSink(new InfluxDBSink(influxDBConfig))

env.execute()

}


def getLongTime(str:String) ={
val format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val time: Long = format.parse(str).getTime
time
<Catch.jpg>