I am using IntervalJoin function to join two streams within 10 minutes. As
below: labelStream.intervalJoin(adLogStream) .between(Time.milliseconds(0), Time.milliseconds(600000)) .process(new processFunction()) .sink(kafkaProducer) labelStream and adLogStream are proto-buf class that are keyed by Long id. Our two input-streams are huge. After running about 30minutes, the output to kafka go down slowly, like this: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png> When data output begins going down, I use jstack and pstack sevaral times to get these: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png> It seems the program is stucked in rockdb's seek. And I find that some rockdb's srt file are accessed slowly by iteration. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png> I have tried several ways: 1)Reduce the input amount to half. This works well. 2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well. 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails. 4)Use new versions of rocksdbjni. This still fails. Can anyone give me some suggestions? Thank you very much. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Jiangang,
The IntervalJoin is actually the DataStream-level implementation of the SQL time-windowed join[1]. To ensure the completeness of the join results, we have to cache all the records (from both sides) in the most recent time interval. That may lead to state backend problems when huge streams flooding in. One benefit of SQL is that the optimizer will help to reduce the join inputs as much as possible (e.g., via predicate pushdown), but that should be done manually in DataStream programs. Thus, I suggest you to 1) try increasing the parallelism (and number of nodes if possible); 2) filter out some records or reduce the number of fields in advance. Best, Xingcan [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins > On Nov 21, 2018, at 2:06 AM, liujiangang <[hidden email]> wrote: > > I am using IntervalJoin function to join two streams within 10 minutes. As > below: > > labelStream.intervalJoin(adLogStream) > .between(Time.milliseconds(0), Time.milliseconds(600000)) > .process(new processFunction()) > .sink(kafkaProducer) > labelStream and adLogStream are proto-buf class that are keyed by Long id. > > Our two input-streams are huge. After running about 30minutes, the output to > kafka go down slowly, like this: > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/1.png> > > When data output begins going down, I use jstack and pstack sevaral times to > get these: > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/2.png> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/3.png> > > It seems the program is stucked in rockdb's seek. And I find that some > rockdb's srt file are accessed slowly by iteration. > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1713/4.png> > > I have tried several ways: > > 1)Reduce the input amount to half. This works well. > 2)Replace labelStream and adLogStream with simple Strings. This way, data > amount will not change. This works well. > 3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and > SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails. > 4)Use new versions of rocksdbjni. This still fails. > Can anyone give me some suggestions? Thank you very much. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thank you very much. I have something to say. Each data is 20KB. The
parallelism is 500 and each taskmanager memory is 10G. The memory is enough, and I think the parallelism is big enough. Only the intervalJoin thread is beyond 100% because of rockdb's seek. I am confused that why rockdb's seek taks so long time but get no result. I don't kow how to debug rocksdb in flink. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
are your RocksDB instances running on local SSDs or on something like EBS? If have previously seen cases where this happened because some EBS quota was exhausted and the performance got throttled. Best, Stefan > On 22. Nov 2018, at 09:51, liujiangang <[hidden email]> wrote: > > Thank you very much. I have something to say. Each data is 20KB. The > parallelism is 500 and each taskmanager memory is 10G. The memory is enough, > and I think the parallelism is big enough. Only the intervalJoin thread is > beyond 100% because of rockdb's seek. I am confused that why rockdb's seek > taks so long time but get no result. I don't kow how to debug rocksdb in > flink. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This is not my case. Thank you.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Btw how did you make sure that it is stuck in the seek call and that the trace does not show different invocations of seek? This can indicate that seek is slow, but is not yet proof that you are stuck.
> On 22. Nov 2018, at 13:01, liujiangang <[hidden email]> wrote: > > This is not my case. Thank you. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Yes, you are right. I add log to record the time of seek and find that
sometimes it is very slow. Then I use the rocksdb's files to test locally and the same problem appears. It is very weird to find that rocksdb's seek iterate data one by one. Until now, I add cache for rocksdb. The time is faster than before but not solved completely. Added code is below: public ColumnFamilyOptions createColumnOptions() { // return new ColumnFamilyOptions(); BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig(); blockBasedTableConfig.setBlockCacheSize(1024 * 1024 * 1024); ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); columnFamilyOptions.setTableFormatConfig(blockBasedTableConfig); return columnFamilyOptions; } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |