This post was updated on .
Hi,
I have a source function with parallelism = 1, sending out records ordered by event-time. These records are then re-balanced to the next operator which has parallelism > 1. I observed that within each subtask of the 2nd operator, the order of the messages is not maintained. Is this behaviour expected? If it is, is there any way to avoid that? Or at least reduce that? For example, by the end of this post are my logback logs for the 2nd operator. The event time (dt) being processed by the same [Thread-34] jumped from 15/12/2017 to 01/01/2018, then jumped back to 15/12/2017. I have high back-pressure on that 2nd operator as the one after that is slow. There is also high back-pressure on the 1st operator, which makes my problem more severe (the mentioned out-of-order is high). If I could throttle the 1st operator when back-pressure is high, then I could mitigate the mentioned problem. But I could not find any guide on doing that. 2019-10-30 05:30:43.548 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac0 2019-10-30 05:30:51.239 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:06.537 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:13.611 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:20.826 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b 2019-10-30 05:31:28.487 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b 2019-10-30 05:31:35.806 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:31:42.739 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:31:49.861 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f6 2019-10-30 05:31:55.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f6 2019-10-30 05:32:02.097 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:32:06.452 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:11.379 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:16.103 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:21.025 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d4 2019-10-30 05:32:25.758 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c 2019-10-30 05:32:30.156 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c 2019-10-30 05:32:34.169 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d4 2019-10-30 05:32:39.462 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:43.551 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:48.100 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:52.629 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a83 2019-10-30 05:32:57.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc5 2019-10-30 05:33:01.943 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc5 2019-10-30 05:33:06.871 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83 Could you please help? Thanks. Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there only the two operators? And could you also provide how the message order is checked in B_i ? Best, Yun
|
Hi Yun, My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists files, forwards to B as FileInputSlits. B parses those files and shuffles the data records to C as keyed streams. C is the slowest in the graph, A is the fastest. I relied on the slf4j/logback logs to derive that conclusion. There's one log entry for each context.collect() call of A, and there's one log entry whenever B open a new FileInputSplits (B is Flink's ContinuousFileReaderOperator). My logback configuration is: <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> The logs I got from A showed messages in order (by dt in my case). However, the logs I got from B showed that messages' order was lost (please refer to the logs below). I suppose that each logback %thread corresponding exactly one B_i. Thanks and regards, Averell 2019-10-30 05:30:43.548 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac0 2019-10-30 05:30:51.239 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:06.537 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:13.611 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e0143 2019-10-30 05:31:20.826 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b 2019-10-30 05:31:28.487 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b 2019-10-30 05:31:35.806 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:31:42.739 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:31:49.861 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f6 2019-10-30 05:31:55.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f6 2019-10-30 05:32:02.097 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca5139 2019-10-30 05:32:06.452 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:11.379 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:16.103 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f 2019-10-30 05:32:21.025 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d4 2019-10-30 05:32:25.758 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c 2019-10-30 05:32:30.156 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c 2019-10-30 05:32:34.169 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d4 2019-10-30 05:32:39.462 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:43.551 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:48.100 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb16 2019-10-30 05:32:52.629 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a83 2019-10-30 05:32:57.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc5 2019-10-30 05:33:01.943 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc5 2019-10-30 05:33:06.871 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83 On Fri, Nov 1, 2019 at 1:32 PM Yun Gao <[hidden email]> wrote:
|
In reply to this post by Yun Gao
Hi Yun,
I found the cause of the issue. That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue which maintains a buffer sorted by modTime, thus my records were re-ordered. I don't understand the reason behind using PriorityQueue instead of an ordinary Queue though. Thanks. Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |