SideOutput Issue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

SideOutput Issue

Navneeth Krishnan
Hi All,

I'm having issues with creating side outputs. There are two input sources (both from kafka) and they are connected and fed into a co-process function. Inside the co-process, the regular data stream outputs a POJO and in processElement2 there is a periodic timer which creates the side output. When I start the job I get the below exception. Is there something that I'm doing wrong? 

I used the below example to implement the side output.

processElement2
ctx.output("side-output", POJO);

Job
dataStream.getSideOutput("side-output").print();


2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8) (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8) (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8) (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: SideOutput Issue

Chesnay Schepler
Hi,

which version of Flink are you using?

Could you provide us with a reproducing example? I tried reproducing it based on the information you provided in the following code, but it runs fine for me:

    private static final OutputTag<String> tag = new OutputTag<String>("test"){};

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text1 = env.fromElements("foo");
        DataStream<String> text2 = env.fromElements("bar");

        SingleOutputStreamOperator<String> process = text1.connect(text2)
            .process(new CoProcessFunction<String, String, String>() {
                @Override
                public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
                }

                @Override
                public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
                }
        });

        process.getSideOutput(tag).print();

        // execute program
        env.execute("Streaming WordCount");
    }
On 03.04.2018 19:55, Navneeth Krishnan wrote:
Hi All,

I'm having issues with creating side outputs. There are two input sources (both from kafka) and they are connected and fed into a co-process function. Inside the co-process, the regular data stream outputs a POJO and in processElement2 there is a periodic timer which creates the side output. When I start the job I get the below exception. Is there something that I'm doing wrong? 

I used the below example to implement the side output.

processElement2
ctx.output("side-output", POJO);

Job
dataStream.getSideOutput("side-output").print();


2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8) (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8) (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8) (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: SideOutput Issue

Chesnay Schepler
We were able to reproduce the issue.
It was caused by calling getSideOutput() and split() on a single DataStream, which isn't properly handled by Flink.
As a work-around one can add a no-op map function before the split() call.

I've filed FLINK-9141.

On 04.04.2018 12:21, Chesnay Schepler wrote:
Hi,

which version of Flink are you using?

Could you provide us with a reproducing example? I tried reproducing it based on the information you provided in the following code, but it runs fine for me:

    private static final OutputTag<String> tag = new OutputTag<String>("test"){};

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text1 = env.fromElements("foo");
        DataStream<String> text2 = env.fromElements("bar");

        SingleOutputStreamOperator<String> process = text1.connect(text2)
            .process(new CoProcessFunction<String, String, String>() {
                @Override
                public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
                }

                @Override
                public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
                }
        });

        process.getSideOutput(tag).print();

        // execute program
        env.execute("Streaming WordCount");
    }
On 03.04.2018 19:55, Navneeth Krishnan wrote:
Hi All,

I'm having issues with creating side outputs. There are two input sources (both from kafka) and they are connected and fed into a co-process function. Inside the co-process, the regular data stream outputs a POJO and in processElement2 there is a periodic timer which creates the side output. When I start the job I get the below exception. Is there something that I'm doing wrong? 

I used the below example to implement the side output.

processElement2
ctx.output("side-output", POJO);

Job
dataStream.getSideOutput("side-output").print();


2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8) (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8) (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO  org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8) (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

Thanks