Hi all,
I am using Flink with Bahir's Apache ActiveMQ connector. However it's quite dated and poses many limitations, most notably the source supports only ByteMessages, does not support parallelism and has a bug that is only fixed in a snapshot version. So I started implementing my own SourceFunction (still with parallelism of only 1) based on AMQSource. I want it to support Flink's checkpointing and make it work with ActiveMQ acks. AMQSource uses ordinary HashMap to store Messages to be acked in the broker and this is where my question arises. Is the HashMap safe to use here? Please correct me if I'm wrong, but my understanding is that /run/ method is executed in one thread and /acknowledgeIDs/ in another so there is a possibility of thread race (even if we assume all the message ids are unique). Also, do you know of any ActiveMQ specific (or JMS in general), more up-to-date connectors I could use which do not have the issues mentioned above? Thanks, Oskar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Regarding your last question, sorry I don’t know about ActiveMQ connectors. I’m not sure exactly how you are implementing your SourceFunction. Generally speaking `run()` method is executed in one thread, and other operations like checkpointing, timers (if any) are executed from another thread. In order to synchronise between those, user is expected to acquire checkpoint lock in the `run()` method as it’s documented [1]. Note that if you want your state (your HashMap) to be actually checkpointed, it must be either already defined as Flink manage’d state (like `ListState` in the example [1]), or you must copy content of your `HashMap` to Flink managed state during `snapshotState` call. Note 2, also keep in mind we are in the process of reimplementing source interfaces [2] and probably Flink 1.11 will offer a new and better API for that (SourceReader instead of SourceFunction). Piotrek
|
Hi Piotr,
I'm not sure about: "Note that if you want your state (your HashMap) to be actually checkpointed, it must be either already defined as Flink manage’d state (like `ListState` in the example [1]), or you must copy content of your `HashMap` to Flink managed state during `snapshotState` call." From [1] we can read "Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State." Oskar was asking about ActiveMq and not Kafka but I guess the rule applies here also. The ActiveMq connector he is using is this one [2]. His question actually boils down to one thing, regarding this class [3]. Does having HashMap and not ConcurentHashMap in context of [3] for unacknowledgedMessages is thread safe. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html [2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/ [3] https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java Piotr Nowojski-3 wrote > Hi, > > Regarding your last question, sorry I don’t know about ActiveMQ > connectors. > > I’m not sure exactly how you are implementing your SourceFunction. > Generally speaking `run()` method is executed in one thread, and other > operations like checkpointing, timers (if any) are executed from another > thread. In order to synchronise between those, user is expected to acquire > checkpoint lock in the `run()` method as it’s documented [1]. > > Note that if you want your state (your HashMap) to be actually > checkpointed, it must be either already defined as Flink manage’d state > (like `ListState` in the example [1]), or you must copy content of your > `HashMap` to Flink managed state during `snapshotState` call. > > Note 2, also keep in mind we are in the process of reimplementing source > interfaces [2] and probably Flink 1.11 will offer a new and better API for > that (SourceReader instead of SourceFunction). > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html> > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> > >> On 29 Jan 2020, at 13:08, OskarM < > pentiak@ > > wrote: >> >> Hi all, >> >> I am using Flink with Bahir's Apache ActiveMQ connector. However it's >> quite >> dated and poses many limitations, most notably the source supports only >> ByteMessages, does not support parallelism and has a bug that is only >> fixed >> in a snapshot version. >> >> So I started implementing my own SourceFunction (still with parallelism >> of >> only 1) based on AMQSource. >> I want it to support Flink's checkpointing and make it work with ActiveMQ >> acks. >> AMQSource uses ordinary HashMap to store Messages to be acked in the >> broker >> and this is where my question arises. >> >> Is the HashMap safe to use here? >> >> Please correct me if I'm wrong, but my understanding is that /run/ method >> is >> executed in one thread and /acknowledgeIDs/ in another so there is a >> possibility of thread race (even if we assume all the message ids are >> unique). >> >> Also, do you know of any ActiveMQ specific (or JMS in general), more >> up-to-date connectors I could use which do not have the issues mentioned >> above? >> >> Thanks, >> Oskar >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Thanks for sharing the code pointers. His question actually boils down to one thing, regarding this class [3]. Yes, it’s safe, because it’s used only in two places. 1. Under manually acquired checkpoint from the source thread 2. From `acknowledgeIDs` method, which is executed from `notifyCheckpointCompleted`, which is also synchronised on the same lock. Piotrek
|
Free forum by Nabble | Edit this page |