Watermark generation issues with File sources in Flink 1.11.1

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermark generation issues with File sources in Flink 1.11.1

parti
Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1  the watermark generation has issues with file source alone. It works well with Kafka source.

With 1.9.2 a custom watermark generator implementation of  AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is deprecated and to be replaced with WatermarkStrategy (that combines both WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e. old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work perfectly well but with file source none of them works. The watermark assigner never increments the watermarks resulting in stateful operators not clearing their state ever, leading to erroneous results and continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so, any fix planned shortly?

A side note (and probably a candidate for separate email, but I will write it here) even checkpoints do not work with File Source since 1.9.2 and it is still the problem with 1.11.1. Just wondering if File source with stream API is not a priority in Flink development? If so we can rethink our sources.

Thanks & regards,
Arti
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

Till Rohrmann
Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has been introduced quite recently and might have some rough edges. I am pulling in Aljoscha and Klou who have worked on this feature and might be able to help you. For better understanding your problem, it would be great if you could share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.

For the file source, the Flink community has recently introduced a new source abstraction which will also support checkpoints for file sources once the file source connector has been migrated to the new interfaces. The community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]> wrote:
Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1  the watermark generation has issues with file source alone. It works well with Kafka source.

With 1.9.2 a custom watermark generator implementation of  AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is deprecated and to be replaced with WatermarkStrategy (that combines both WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e. old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work perfectly well but with file source none of them works. The watermark assigner never increments the watermarks resulting in stateful operators not clearing their state ever, leading to erroneous results and continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so, any fix planned shortly?

A side note (and probably a candidate for separate email, but I will write it here) even checkpoints do not work with File Source since 1.9.2 and it is still the problem with 1.11.1. Just wondering if File source with stream API is not a priority in Flink development? If so we can rethink our sources.

Thanks & regards,
Arti
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

parti
Hi Till,

Thank you for your quick response. Both the AssignerWithPeriodicWatermarks and WatermarkStrategy I am using are very simple ones.

Code for AssignerWithPeriodicWatermarks:
public class CustomEventTimeWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyPojo> {

private final long maxOutOfOrderness = 0;
private long currentMaxTimestamp;

@Override
public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
long timestamp = myPojo.getInitiationTime().toEpochMilli();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

Code for WatermarkStrategy :
WatermarkStrategy<MyPojo> watermarkStrategy =
WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((event, timestamp) -> event.getInitiationTime().toEpochMilli());

Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <[hidden email]> wrote:
Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has been introduced quite recently and might have some rough edges. I am pulling in Aljoscha and Klou who have worked on this feature and might be able to help you. For better understanding your problem, it would be great if you could share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.

For the file source, the Flink community has recently introduced a new source abstraction which will also support checkpoints for file sources once the file source connector has been migrated to the new interfaces. The community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]> wrote:
Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1  the watermark generation has issues with file source alone. It works well with Kafka source.

With 1.9.2 a custom watermark generator implementation of  AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is deprecated and to be replaced with WatermarkStrategy (that combines both WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e. old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work perfectly well but with file source none of them works. The watermark assigner never increments the watermarks resulting in stateful operators not clearing their state ever, leading to erroneous results and continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so, any fix planned shortly?

A side note (and probably a candidate for separate email, but I will write it here) even checkpoints do not work with File Source since 1.9.2 and it is still the problem with 1.11.1. Just wondering if File source with stream API is not a priority in Flink development? If so we can rethink our sources.

Thanks & regards,
Arti
Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

Aljoscha Krettek
Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?

Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:

> Hi Till,
>
> Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
> and WatermarkStrategy I am using are very simple ones.
>
> *Code for AssignerWithPeriodicWatermarks:*
>
> public class CustomEventTimeWatermarkGenerator implements
> AssignerWithPeriodicWatermarks<MyPojo> {
>
>      private final long maxOutOfOrderness = 0;
>      private long currentMaxTimestamp;
>
>      @Override
>      public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
>          long timestamp = myPojo.getInitiationTime().toEpochMilli();
>          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>          return timestamp;
>      }
>
>      @Override
>      public Watermark getCurrentWatermark() {
>          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>      }
> }
>
>
> *Code for WatermarkStrategy :*
>
> WatermarkStrategy<MyPojo> watermarkStrategy =
>          WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>                  .withTimestampAssigner((event, timestamp) ->
> event.getInitiationTime().toEpochMilli());
>
>
> Thanks & regards,
> Arti
>
>
> On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <[hidden email]> wrote:
>
>> Hi Arti,
>>
>> thanks for sharing this feedback with us. The WatermarkStrategy has been
>> introduced quite recently and might have some rough edges. I am pulling in
>> Aljoscha and Klou who have worked on this feature and might be able to help
>> you. For better understanding your problem, it would be great if you could
>> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
>>
>> For the file source, the Flink community has recently introduced a new
>> source abstraction which will also support checkpoints for file sources
>> once the file source connector has been migrated to the new interfaces. The
>> community is currently working on it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>>> the watermark generation has issues with file source alone. It works well
>>> with Kafka source.
>>>
>>> With 1.9.2 a custom watermark generator implementation of
>>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>>> deprecated and to be replaced with WatermarkStrategy (that combines both
>>> WatermarkGenerator and TimestampAssigner).
>>>
>>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>>> perfectly well but with file source none of them works. The watermark
>>> assigner never increments the watermarks resulting in stateful operators
>>> not clearing their state ever, leading to erroneous results and
>>> continuously increasing memory usage.
>>>
>>> Same code works well with Kafka source. Is this a known issue? If so, any
>>> fix planned shortly?
>>>
>>> A side note (and probably a candidate for separate email, but I will
>>> write it here) even checkpoints do not work with File Source since 1.9.2
>>> and it is still the problem with 1.11.1. Just wondering if File source with
>>> stream API is not a priority in Flink development? If so we can rethink our
>>> sources.
>>>
>>> Thanks & regards,
>>> Arti
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

parti
Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till 1.11.1 when using File source the source operator (guessing split enumerator or metadata reader) finishes immediately after starting (and assigning the splits to split readers) hence when first checkpoint is triggered, it sees the state of the first operator i.e. source as finished and hence does not do any checkpointing. Thats' what you can see in logs and also on the Flink UI for checkpoints. It assumes that the pipeline is about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it difficult to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?

Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:
> Hi Till,
>
> Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
> and WatermarkStrategy I am using are very simple ones.
>
> *Code for AssignerWithPeriodicWatermarks:*
>
> public class CustomEventTimeWatermarkGenerator implements
> AssignerWithPeriodicWatermarks<MyPojo> {
>
>      private final long maxOutOfOrderness = 0;
>      private long currentMaxTimestamp;
>
>      @Override
>      public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
>          long timestamp = myPojo.getInitiationTime().toEpochMilli();
>          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>          return timestamp;
>      }
>
>      @Override
>      public Watermark getCurrentWatermark() {
>          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>      }
> }
>
>
> *Code for WatermarkStrategy :*
>
> WatermarkStrategy<MyPojo> watermarkStrategy =
>          WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>                  .withTimestampAssigner((event, timestamp) ->
> event.getInitiationTime().toEpochMilli());
>
>
> Thanks & regards,
> Arti
>
>
> On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <[hidden email]> wrote:
>
>> Hi Arti,
>>
>> thanks for sharing this feedback with us. The WatermarkStrategy has been
>> introduced quite recently and might have some rough edges. I am pulling in
>> Aljoscha and Klou who have worked on this feature and might be able to help
>> you. For better understanding your problem, it would be great if you could
>> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
>>
>> For the file source, the Flink community has recently introduced a new
>> source abstraction which will also support checkpoints for file sources
>> once the file source connector has been migrated to the new interfaces. The
>> community is currently working on it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>>> the watermark generation has issues with file source alone. It works well
>>> with Kafka source.
>>>
>>> With 1.9.2 a custom watermark generator implementation of
>>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>>> deprecated and to be replaced with WatermarkStrategy (that combines both
>>> WatermarkGenerator and TimestampAssigner).
>>>
>>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>>> perfectly well but with file source none of them works. The watermark
>>> assigner never increments the watermarks resulting in stateful operators
>>> not clearing their state ever, leading to erroneous results and
>>> continuously increasing memory usage.
>>>
>>> Same code works well with Kafka source. Is this a known issue? If so, any
>>> fix planned shortly?
>>>
>>> A side note (and probably a candidate for separate email, but I will
>>> write it here) even checkpoints do not work with File Source since 1.9.2
>>> and it is still the problem with 1.11.1. Just wondering if File source with
>>> stream API is not a priority in Flink development? If so we can rethink our
>>> sources.
>>>
>>> Thanks & regards,
>>> Arti
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

David Anderson-3
Arti,

The problem with watermarks and the File source operator will be fixed in 1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new WatermarkStrategy api.


David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <[hidden email]> wrote:
Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till 1.11.1 when using File source the source operator (guessing split enumerator or metadata reader) finishes immediately after starting (and assigning the splits to split readers) hence when first checkpoint is triggered, it sees the state of the first operator i.e. source as finished and hence does not do any checkpointing. Thats' what you can see in logs and also on the Flink UI for checkpoints. It assumes that the pipeline is about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it difficult to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <[hidden email]> wrote:
Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?

Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:
> Hi Till,
>
> Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
> and WatermarkStrategy I am using are very simple ones.
>
> *Code for AssignerWithPeriodicWatermarks:*
>
> public class CustomEventTimeWatermarkGenerator implements
> AssignerWithPeriodicWatermarks<MyPojo> {
>
>      private final long maxOutOfOrderness = 0;
>      private long currentMaxTimestamp;
>
>      @Override
>      public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
>          long timestamp = myPojo.getInitiationTime().toEpochMilli();
>          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>          return timestamp;
>      }
>
>      @Override
>      public Watermark getCurrentWatermark() {
>          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>      }
> }
>
>
> *Code for WatermarkStrategy :*
>
> WatermarkStrategy<MyPojo> watermarkStrategy =
>          WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>                  .withTimestampAssigner((event, timestamp) ->
> event.getInitiationTime().toEpochMilli());
>
>
> Thanks & regards,
> Arti
>
>
> On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <[hidden email]> wrote:
>
>> Hi Arti,
>>
>> thanks for sharing this feedback with us. The WatermarkStrategy has been
>> introduced quite recently and might have some rough edges. I am pulling in
>> Aljoscha and Klou who have worked on this feature and might be able to help
>> you. For better understanding your problem, it would be great if you could
>> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
>>
>> For the file source, the Flink community has recently introduced a new
>> source abstraction which will also support checkpoints for file sources
>> once the file source connector has been migrated to the new interfaces. The
>> community is currently working on it.
>>
>> Cheers,
>> Till
>>
>> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>>> the watermark generation has issues with file source alone. It works well
>>> with Kafka source.
>>>
>>> With 1.9.2 a custom watermark generator implementation of
>>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>>> deprecated and to be replaced with WatermarkStrategy (that combines both
>>> WatermarkGenerator and TimestampAssigner).
>>>
>>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>>> perfectly well but with file source none of them works. The watermark
>>> assigner never increments the watermarks resulting in stateful operators
>>> not clearing their state ever, leading to erroneous results and
>>> continuously increasing memory usage.
>>>
>>> Same code works well with Kafka source. Is this a known issue? If so, any
>>> fix planned shortly?
>>>
>>> A side note (and probably a candidate for separate email, but I will
>>> write it here) even checkpoints do not work with File Source since 1.9.2
>>> and it is still the problem with 1.11.1. Just wondering if File source with
>>> stream API is not a priority in Flink development? If so we can rethink our
>>> sources.
>>>
>>> Thanks & regards,
>>> Arti
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermark generation issues with File sources in Flink 1.11.1

Aljoscha Krettek
Thanks David! This saved me quite some time.

Aljoscha

On 09.09.20 19:58, David Anderson wrote:

> Arti,
>
> The problem with watermarks and the File source operator will be fixed in
> 1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
> WatermarkStrategy api.
>
> [1] https://issues.apache.org/jira/browse/FLINK-19109
>
> David
>
> On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <[hidden email]> wrote:
>
>> Hi Aljoscha,
>>
>> By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
>> 1.11.1 when using File source the source operator (guessing split
>> enumerator or metadata reader) finishes immediately after starting (and
>> assigning the splits to split readers) hence when first checkpoint is
>> triggered, it sees the state of the first operator i.e. source as finished
>> and hence does not do any checkpointing. Thats' what you can see in logs
>> and also on the Flink UI for checkpoints. It assumes that the pipeline is
>> about to finish shortly and aborts the checkpoint.
>>
>> This along with the watermark generation problems kind of make it
>> difficult to use file source in production.
>>
>>
>> On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Hi Arti,
>>>
>>> what exactly do you mean by "checkpoints do not work"? Are there
>>> exceptions being thrown? How are you writing your file-based sources,
>>> what API methods are you using?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 20.08.20 16:21, Arti Pande wrote:
>>>> Hi Till,
>>>>
>>>> Thank you for your quick response. Both the
>>> AssignerWithPeriodicWatermarks
>>>> and WatermarkStrategy I am using are very simple ones.
>>>>
>>>> *Code for AssignerWithPeriodicWatermarks:*
>>>>
>>>> public class CustomEventTimeWatermarkGenerator implements
>>>> AssignerWithPeriodicWatermarks<MyPojo> {
>>>>
>>>>       private final long maxOutOfOrderness = 0;
>>>>       private long currentMaxTimestamp;
>>>>
>>>>       @Override
>>>>       public long extractTimestamp(MyPojo myPojo, long
>>> previousTimestamp) {
>>>>           long timestamp = myPojo.getInitiationTime().toEpochMilli();
>>>>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>>>>           return timestamp;
>>>>       }
>>>>
>>>>       @Override
>>>>       public Watermark getCurrentWatermark() {
>>>>           return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>>>>       }
>>>> }
>>>>
>>>>
>>>> *Code for WatermarkStrategy :*
>>>>
>>>> WatermarkStrategy<MyPojo> watermarkStrategy =
>>>>
>>> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>>>>                   .withTimestampAssigner((event, timestamp) ->
>>>> event.getInitiationTime().toEpochMilli());
>>>>
>>>>
>>>> Thanks & regards,
>>>> Arti
>>>>
>>>>
>>>> On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <[hidden email]>
>>> wrote:
>>>>
>>>>> Hi Arti,
>>>>>
>>>>> thanks for sharing this feedback with us. The WatermarkStrategy has
>>> been
>>>>> introduced quite recently and might have some rough edges. I am
>>> pulling in
>>>>> Aljoscha and Klou who have worked on this feature and might be able to
>>> help
>>>>> you. For better understanding your problem, it would be great if you
>>> could
>>>>> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
>>> us.
>>>>>
>>>>> For the file source, the Flink community has recently introduced a new
>>>>> source abstraction which will also support checkpoints for file sources
>>>>> once the file source connector has been migrated to the new
>>> interfaces. The
>>>>> community is currently working on it.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <[hidden email]>
>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>>>>>> the watermark generation has issues with file source alone. It works
>>> well
>>>>>> with Kafka source.
>>>>>>
>>>>>> With 1.9.2 a custom watermark generator implementation of
>>>>>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>>>>>> deprecated and to be replaced with WatermarkStrategy (that combines
>>> both
>>>>>> WatermarkGenerator and TimestampAssigner).
>>>>>>
>>>>>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>>>>>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>>>>>> perfectly well but with file source none of them works. The watermark
>>>>>> assigner never increments the watermarks resulting in stateful
>>> operators
>>>>>> not clearing their state ever, leading to erroneous results and
>>>>>> continuously increasing memory usage.
>>>>>>
>>>>>> Same code works well with Kafka source. Is this a known issue? If so,
>>> any
>>>>>> fix planned shortly?
>>>>>>
>>>>>> A side note (and probably a candidate for separate email, but I will
>>>>>> write it here) even checkpoints do not work with File Source since
>>> 1.9.2
>>>>>> and it is still the problem with 1.11.1. Just wondering if File
>>> source with
>>>>>> stream API is not a priority in Flink development? If so we can
>>> rethink our
>>>>>> sources.
>>>>>>
>>>>>> Thanks & regards,
>>>>>> Arti
>>>>>>
>>>>>
>>>>
>>>
>>>
>