why operator not chained?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

why operator not chained?

Steven Wu

I have this DAG screenshot from Flink UI.
image.png
I am wondering why is the middle "icebergsink-writer" operator not chained with the first operator chain? 
Or an equivalent question is why is forward partitioner used here?

The first operator chain are all map functions after source. The last two operators are added like this.
--------------------------
dataStream
    .transform("icebergsink-writer", TypeInformation.of(SomeClass.class), writer)
    .addSink(committer)
    .name("icebergsink-committer")
    .uid("icebergsink-committer")
    .setParallelism(1);

Thanks,
Steven
Reply | Threaded
Open this post in threaded view
|

Re: why operator not chained?

Guowei Ma
Hi, Steven

1. The `icebergsink-writer` operator does not chain with the first operator is because the "icebergsink-writer" operator 's ChainingStrategy is NULL or HEAD. You could verify it by printing writer.getChainingStrategy.
2. The two operators use the FORWARD partition if the parallelism of two operators is the same and the partitioner is not specified by the user.

Best,
Guowei


Steven Wu <[hidden email]> 于2019年11月23日周六 上午5:17写道:

I have this DAG screenshot from Flink UI.
image.png
I am wondering why is the middle "icebergsink-writer" operator not chained with the first operator chain? 
Or an equivalent question is why is forward partitioner used here?

The first operator chain are all map functions after source. The last two operators are added like this.
--------------------------
dataStream
    .transform("icebergsink-writer", TypeInformation.of(SomeClass.class), writer)
    .addSink(committer)
    .name("icebergsink-committer")
    .uid("icebergsink-committer")
    .setParallelism(1);

Thanks,
Steven
Reply | Threaded
Open this post in threaded view
|

Re: why operator not chained?

Steven Wu
Guowei, thanks a lot. Looks like chainingStrategy default is HEAD. Let me try.

writer.setChainingStrategy(ChainingStrategy.ALWAYS);

On Sat, Nov 23, 2019 at 7:17 PM Guowei Ma <[hidden email]> wrote:
Hi, Steven

1. The `icebergsink-writer` operator does not chain with the first operator is because the "icebergsink-writer" operator 's ChainingStrategy is NULL or HEAD. You could verify it by printing writer.getChainingStrategy.
2. The two operators use the FORWARD partition if the parallelism of two operators is the same and the partitioner is not specified by the user.

Best,
Guowei


Steven Wu <[hidden email]> 于2019年11月23日周六 上午5:17写道:

I have this DAG screenshot from Flink UI.
image.png
I am wondering why is the middle "icebergsink-writer" operator not chained with the first operator chain? 
Or an equivalent question is why is forward partitioner used here?

The first operator chain are all map functions after source. The last two operators are added like this.
--------------------------
dataStream
    .transform("icebergsink-writer", TypeInformation.of(SomeClass.class), writer)
    .addSink(committer)
    .name("icebergsink-committer")
    .uid("icebergsink-committer")
    .setParallelism(1);

Thanks,
Steven