Preserving (best effort) messages order between operators

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Preserving (best effort) messages order between operators

Averell
This post was updated on .
Hi,

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?

For example, by the end of this post are my logback logs for the 2nd operator. The event time (dt) being processed by the same [Thread-34] jumped from 15/12/2017 to 01/01/2018, then jumped back to 15/12/2017.

I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

2019-10-30 05:30:43.548 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac0
2019-10-30 05:30:51.239 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:06.537 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:13.611 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:20.826 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b
2019-10-30 05:31:28.487 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b
2019-10-30 05:31:35.806 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:31:42.739 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:31:49.861 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f6
2019-10-30 05:31:55.834 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f6
2019-10-30 05:32:02.097 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:32:06.452 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:11.379 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:16.103 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:21.025 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d4
2019-10-30 05:32:25.758 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c
2019-10-30 05:32:30.156 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c
2019-10-30 05:32:34.169 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d4
2019-10-30 05:32:39.462 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:43.551 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:48.100 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:52.629 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a83
2019-10-30 05:32:57.834 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc5
2019-10-30 05:33:01.943 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc5
2019-10-30 05:33:06.871 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83



Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Preserving (best effort) messages order between operators

Yun Gao
     Hi Averell,

           If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there only the two operators? And could you also provide how the message order is checked in B_i ? 

   Best,
   Yun

------------------------------------------------------------------
From:Averell <[hidden email]>
Send Time:2019 Oct. 31 (Thu.) 12:55
To:user <[hidden email]>
Subject:Preserving (best effort) messages order between operators

Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Preserving (best effort) messages order between operators

Averell
Hi Yun,

My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists files, forwards to B as FileInputSlits. B parses those files and shuffles the data records to C as keyed streams.
C is the slowest in the graph, A is the fastest.

I relied on the slf4j/logback logs to derive that conclusion. There's one log entry for each context.collect() call of A, and there's one log entry whenever B open a new FileInputSplits (B is Flink's ContinuousFileReaderOperator).
My logback configuration is:             <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>

The logs I got from A showed messages in order (by dt in my case). However, the logs I got from B showed that messages' order was lost (please refer to the logs below). I suppose that each logback %thread corresponding exactly one B_i.

Thanks and regards,
Averell

2019-10-30 05:30:43.548 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac0
2019-10-30 05:30:51.239 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:06.537 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:13.611 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e0143
2019-10-30 05:31:20.826 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b
2019-10-30 05:31:28.487 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b
2019-10-30 05:31:35.806 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:31:42.739 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:31:49.861 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f6
2019-10-30 05:31:55.834 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f6
2019-10-30 05:32:02.097 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca5139
2019-10-30 05:32:06.452 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:11.379 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:16.103 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f
2019-10-30 05:32:21.025 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d4
2019-10-30 05:32:25.758 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c
2019-10-30 05:32:30.156 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c
2019-10-30 05:32:34.169 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d4
2019-10-30 05:32:39.462 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:43.551 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:48.100 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb16
2019-10-30 05:32:52.629 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a83
2019-10-30 05:32:57.834 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc5
2019-10-30 05:33:01.943 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc5
2019-10-30 05:33:06.871 [Thread-34] INFO  com.myco.myFIF  - Opening file /dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83


On Fri, Nov 1, 2019 at 1:32 PM Yun Gao <[hidden email]> wrote:
     Hi Averell,

           If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there only the two operators? And could you also provide how the message order is checked in B_i ? 

   Best,
   Yun

------------------------------------------------------------------
From:Averell <[hidden email]>
Send Time:2019 Oct. 31 (Thu.) 12:55
To:user <[hidden email]>
Subject:Preserving (best effort) messages order between operators

Hi, 

I have a source function with parallelism = 1, sending out records ordered
by event-time. These records are then re-balanced to the next operator which
has parallelism > 1. I observed that within each subtask of the 2nd
operator, the order of the messages is not maintained. Is this behaviour
expected? If it is, is there any way to avoid that? Or at least reduce that?
I have high back-pressure on that 2nd operator as the one after that is
slow. There is also high back-pressure on the 1st operator, which makes my
problem more severe (the mentioned out-of-order is high). If I could
throttle the 1st operator when back-pressure is high, then I could mitigate
the mentioned problem. But I could not find any guide on doing that.

Could you please help?

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Preserving (best effort) messages order between operators

Averell
In reply to this post by Yun Gao
Hi Yun,

I found the cause of the issue.
That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue
which maintains a buffer sorted by modTime, thus my records were re-ordered.
I don't understand the reason behind using PriorityQueue instead of an
ordinary Queue though.

Thanks.
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/