I am using flink sql to do some complicated calculations. I have encountered some very difficult problems in this process, so I would like to ask everyone for your help. My goal is to build a data stream with a very accurate result, which is also in line with the Streaming System. The core idea of this book is also what I have to do. I use kafka to receive the mysql binlog as the data source, then join into multiple tables, and then perform complex sql calculations on these multiple tables. I found that flink does not provide upsert. Implementation, so I added a last_value(xxx), last_value(xxx)..group by(id) operation for each kafka data source to ensure consistency of the final result, which works, I understand this Will cache a dynamic table, resulting in a large state (about 3 G), but seems to introduce some other very strange problems, summarized as follows: 1. In the case of sql is very complicated, it is clear that checkpoint is turned on, but the web interface finds that there is no checkpoint at all, and none of them 2. During the running of the program, it frequently hangs. The error has always been the following error: (1) the assigned slot id_xxxxx was removed (2) the heartbeat with taskmanager was timeout I have used slotSharingGroup to split tasks into different slots whenever possible, but I still often report these two errors, causing the program to hang. I have no clue about these mistakes. If anyone can help, I really appreciate it. Added: I receive data from 4 kafka topics, the maximum amount of data is more than 20 million. My startup command is as follows Flink1.6/bin/flink run -m yarn-cluster -ytm 23240 -yn 3 -ys 2 -ynm xxxx -yqu xxxx -c xxxxxxx xxx.jar ./test.conf -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
it is very difficult to spot the problem with the little information you gave us. Maybe you can show us a simplified SQL query and the implementation of the `LAST_VALUE` function? An initial guess would be that you are running out of memory such that YARN kills your task manager. If you are sure that you state size remains constant, you could also try to use a different state backend that spills to disk. Have you tried out the RocksDB state backend? Did you configure a state rentention time? Regards, Timo Am 03.12.18 um 16:15 schrieb clay4444: > I am using flink sql to do some complicated calculations. I have > encountered some very difficult problems in this process, so I would like to > ask everyone for your help. My goal is to build a data stream with a very > accurate result, which is also in line with the Streaming System. The core > idea of this book is also what I have to do. I use kafka to receive the > mysql binlog as the data source, then join into multiple tables, and then > perform complex sql calculations on these multiple tables. I found that > flink does not provide upsert. Implementation, so I added a last_value(xxx), > last_value(xxx)..group by(id) operation for each kafka data source to ensure > consistency of the final result, which works, I understand this Will cache a > dynamic table, resulting in a large state (about 3 G), but seems to > introduce some other very strange problems, summarized as follows: > > 1. In the case of sql is very complicated, it is clear that checkpoint is > turned on, but the web interface finds that there is no checkpoint at all, > and none of them > 2. During the running of the program, it frequently hangs. The error has > always been the following error: > > (1) the assigned slot id_xxxxx was removed > (2) the heartbeat with taskmanager was timeout > > I have used slotSharingGroup to split tasks into different slots whenever > possible, but I still often report these two errors, causing the program to > hang. > > I have no clue about these mistakes. If anyone can help, I really appreciate > it. > > Added: I receive data from 4 kafka topics, the maximum amount of data is > more than 20 million. > My startup command is as follows > > Flink1.6/bin/flink run -m yarn-cluster -ytm 23240 -yn 3 -ys 2 -ynm xxxx -yqu > xxxx -c xxxxxxx xxx.jar ./test.conf > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
hi Timo:
The LAST_VALUE function simply groups by id and then takes the latest row of data for each primary key. I was inspired by this answer: https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables Its implementation is also very simple: class Middle2 extends Serializable{ private val serialVersionUID = 3L var mid:String = "none" } class StringLastValueFunc extends AggregateFunction[JString, Middle2] { override def createAccumulator(): Middle2 = { new Middle2 } def accumulate(acc: Middle2, iValue: String): Unit = { if(iValue != null && iValue.toString != ""){ acc.mid = iValue } } override def getValue(acc: Middle2): JString = { acc.mid } override def getResultType: TypeInformation[JString] = Types.STRING } and I don't think I should set the state expiration time because the data for each primary key changes at any time. I have used the rocksdb backend and set the incremental checkpoint. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I have found out that checkpoint is not triggered. Regarding the in operation in flink sql, this sql will trigger checkpoint normally. select name,age from user where id in (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840) This sql will not trigger (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456) is this a bug? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
yes this was a unintended behavior that got fixed in Flink 1.7. See https://issues.apache.org/jira/browse/FLINK-10474 Regards, Timo Am 04.12.18 um 05:21 schrieb clay4444: > I have found out that checkpoint is not triggered. Regarding the in > operation in flink sql, this sql will trigger checkpoint normally. > > select name,age from user where id in > (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840) > > This sql will not trigger > > (5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456) > > is this a bug? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
hi Timo:
first very thank u, I have solve the ploblems, Regarding the problem of too large state, I set the global parallelism to 7 for the program, which solved my problem very well, checkpoint is very fast, but I would like to ask if there is a way to set parallelism for each operator(translated from sql statement) instead of global settings? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Unfortunately, setting the parallelism per SQL operator is not supported
right now. We are currently thinking about a way of having fine-grained control about properties of SQL operators but this is in an early design phase and might take a while Am 04.12.18 um 13:05 schrieb clay4444: > hi Timo: > > first very thank u, I have solve the ploblems, > > Regarding the problem of too large state, I set the global parallelism to 7 > for the program, which solved my problem very well, checkpoint is very fast, > but I would like to ask if there is a way to set parallelism for each > operator(translated from sql statement) instead of global settings? > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |