ActiveMQ connector

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

ActiveMQ connector

OskarM
Hi all,

I am using Flink with Bahir's Apache ActiveMQ connector. However it's quite
dated and poses many limitations, most notably the source supports only
ByteMessages, does not support parallelism and has a bug that is only fixed
in a snapshot version.

So I started implementing my own SourceFunction (still with parallelism of
only 1) based on AMQSource.
I want it to support Flink's checkpointing and make it work with ActiveMQ
acks.
AMQSource uses ordinary HashMap to store Messages to be acked in the broker
and this is where my question arises.

Is the HashMap safe to use here?

Please correct me if I'm wrong, but my understanding is that /run/ method is
executed in one thread and /acknowledgeIDs/ in another so there is a
possibility of thread race (even if we assume all the message ids are
unique).

Also, do you know of any ActiveMQ specific (or JMS in general), more
up-to-date connectors I could use which do not have the issues mentioned
above?

Thanks,
Oskar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ connector

Piotr Nowojski-3
Hi,

Regarding your last question, sorry I don’t know about ActiveMQ connectors.

I’m not sure exactly how you are implementing your SourceFunction. Generally speaking `run()` method is executed in one thread, and other operations like checkpointing, timers (if any) are executed from another thread. In order to synchronise between those, user is expected to acquire checkpoint lock in the `run()` method as it’s documented [1].

Note that if you want your state (your HashMap) to be actually checkpointed, it must be either already defined as Flink manage’d state (like `ListState` in the example [1]), or you must copy content of your `HashMap` to Flink managed state during `snapshotState` call.

Note 2, also keep in mind we are in the process of reimplementing source interfaces [2] and probably Flink 1.11 will offer a new and better API for that (SourceReader instead of SourceFunction). 

Piotrek


On 29 Jan 2020, at 13:08, OskarM <[hidden email]> wrote:

Hi all,

I am using Flink with Bahir's Apache ActiveMQ connector. However it's quite
dated and poses many limitations, most notably the source supports only
ByteMessages, does not support parallelism and has a bug that is only fixed
in a snapshot version.

So I started implementing my own SourceFunction (still with parallelism of
only 1) based on AMQSource.
I want it to support Flink's checkpointing and make it work with ActiveMQ
acks.
AMQSource uses ordinary HashMap to store Messages to be acked in the broker
and this is where my question arises.

Is the HashMap safe to use here?

Please correct me if I'm wrong, but my understanding is that /run/ method is
executed in one thread and /acknowledgeIDs/ in another so there is a
possibility of thread race (even if we assume all the message ids are
unique).

Also, do you know of any ActiveMQ specific (or JMS in general), more
up-to-date connectors I could use which do not have the issues mentioned
above?

Thanks,
Oskar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ connector

KristoffSC
Hi Piotr,
I'm not sure about:
"Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manage’d state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call."

From [1] we can read
"Each parallel instance of the Kafka consumer maintains a map of topic
partitions and offsets as its Operator State."

Oskar was asking about ActiveMq and not Kafka but I guess the rule applies
here also. The ActiveMq connector he is using is this one [2].

His question actually boils down to one thing, regarding this class [3].
Does having HashMap and not ConcurentHashMap in context of [3] for
unacknowledgedMessages is thread safe.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/
[3]
https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
Piotr Nowojski-3 wrote

> Hi,
>
> Regarding your last question, sorry I don’t know about ActiveMQ
> connectors.
>
> I’m not sure exactly how you are implementing your SourceFunction.
> Generally speaking `run()` method is executed in one thread, and other
> operations like checkpointing, timers (if any) are executed from another
> thread. In order to synchronise between those, user is expected to acquire
> checkpoint lock in the `run()` method as it’s documented [1].
>
> Note that if you want your state (your HashMap) to be actually
> checkpointed, it must be either already defined as Flink manage’d state
> (like `ListState` in the example [1]), or you must copy content of your
> `HashMap` to Flink managed state during `snapshotState` call.
>
> Note 2, also keep in mind we are in the process of reimplementing source
> interfaces [2] and probably Flink 1.11 will offer a new and better API for
> that (SourceReader instead of SourceFunction).
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
> &lt;https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html&gt; 
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> &lt;https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface&gt;
>
>> On 29 Jan 2020, at 13:08, OskarM &lt;

> pentiak@

> &gt; wrote:
>>
>> Hi all,
>>
>> I am using Flink with Bahir's Apache ActiveMQ connector. However it's
>> quite
>> dated and poses many limitations, most notably the source supports only
>> ByteMessages, does not support parallelism and has a bug that is only
>> fixed
>> in a snapshot version.
>>
>> So I started implementing my own SourceFunction (still with parallelism
>> of
>> only 1) based on AMQSource.
>> I want it to support Flink's checkpointing and make it work with ActiveMQ
>> acks.
>> AMQSource uses ordinary HashMap to store Messages to be acked in the
>> broker
>> and this is where my question arises.
>>
>> Is the HashMap safe to use here?
>>
>> Please correct me if I'm wrong, but my understanding is that /run/ method
>> is
>> executed in one thread and /acknowledgeIDs/ in another so there is a
>> possibility of thread race (even if we assume all the message ids are
>> unique).
>>
>> Also, do you know of any ActiveMQ specific (or JMS in general), more
>> up-to-date connectors I could use which do not have the issues mentioned
>> above?
>>
>> Thanks,
>> Oskar
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: ActiveMQ connector

Piotr Nowojski-3
Hi,

Thanks for sharing the code pointers.

His question actually boils down to one thing, regarding this class [3].
Does having HashMap and not ConcurentHashMap in context of [3] for
unacknowledgedMessages is thread safe.

Yes, it’s safe, because it’s used only in two places.

1. Under manually acquired checkpoint from the source thread


2. From `acknowledgeIDs` method, which is executed from `notifyCheckpointCompleted`, which is also synchronised on the same lock.


Piotrek

On 30 Jan 2020, at 16:53, KristoffSC <[hidden email]> wrote:

Hi Piotr,
I'm not sure about:
"Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manage’d state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call."

From [1] we can read
"Each parallel instance of the Kafka consumer maintains a map of topic
partitions and offsets as its Operator State."

Oskar was asking about ActiveMq and not Kafka but I guess the rule applies
here also. The ActiveMq connector he is using is this one [2].

His question actually boils down to one thing, regarding this class [3].
Does having HashMap and not ConcurentHashMap in context of [3] for
unacknowledgedMessages is thread safe.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/
[3]
https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
Piotr Nowojski-3 wrote
Hi,

Regarding your last question, sorry I don’t know about ActiveMQ
connectors.

I’m not sure exactly how you are implementing your SourceFunction.
Generally speaking `run()` method is executed in one thread, and other
operations like checkpointing, timers (if any) are executed from another
thread. In order to synchronise between those, user is expected to acquire
checkpoint lock in the `run()` method as it’s documented [1].

Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manage’d state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call.

Note 2, also keep in mind we are in the process of reimplementing source
interfaces [2] and probably Flink 1.11 will offer a new and better API for
that (SourceReader instead of SourceFunction). 

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
&lt;https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html&gt; 
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
&lt;https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface&gt;

On 29 Jan 2020, at 13:08, OskarM &lt;

pentiak@

&gt; wrote:

Hi all,

I am using Flink with Bahir's Apache ActiveMQ connector. However it's
quite
dated and poses many limitations, most notably the source supports only
ByteMessages, does not support parallelism and has a bug that is only
fixed
in a snapshot version.

So I started implementing my own SourceFunction (still with parallelism
of
only 1) based on AMQSource.
I want it to support Flink's checkpointing and make it work with ActiveMQ
acks.
AMQSource uses ordinary HashMap to store Messages to be acked in the
broker
and this is where my question arises.

Is the HashMap safe to use here?

Please correct me if I'm wrong, but my understanding is that /run/ method
is
executed in one thread and /acknowledgeIDs/ in another so there is a
possibility of thread race (even if we assume all the message ids are
unique).

Also, do you know of any ActiveMQ specific (or JMS in general), more
up-to-date connectors I could use which do not have the issues mentioned
above?

Thanks,
Oskar



--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/