Any testing issues when using StreamTableEnvironment.createTemporaryView?

classic Classic list List threaded Threaded
2 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Any testing issues when using StreamTableEnvironment.createTemporaryView?

Dan
Summary
I'm hitting an error when running a test that is related to using createTemporaryView to convert a Protobuf input stream to Flink Table API.  I'm not sure how to debug "SourceConversion$5.processElement(Unknown Source)" line.  Is this generated code?  How can I debug this?

Any help would be appreciated.  Thanks! - Dan

Details
My current input is a protocol buffer stream.  I convert it to the Table API spec using createTemporaryView.  The code is hacky.  I want to get some tests implemented before cleaning it up.

KeyedStream<BatchLog, String> batchLogStream =
        env.<BatchLog>fromElements(BatchLog.class, new LogGenerator.BatchLogIterator().next())
                .keyBy((logRequest) -> logRequest.getUserId());

tableEnv.createTemporaryView(
        "input_user",
    
    batchLogStream.flatMap(new ToUsers()),
   
     $("userId"),
   
     $("timeEpochMillis"),
   
     $("userTime").rowtime());

This appears to work in my prototype (maybe serialization is broken).  In a Flink test, I hit the following error.

org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map -> SourceConversion(table=[default.mydb.input_user], fields=[userId, timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) -> StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from RUNNING to FAILED. java.lang.NullPointerException
        at SourceConversion$5.processElement(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
        at ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:834)


I wasn't able to find this exact stacktrace when looking on Google.
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Any testing issues when using StreamTableEnvironment.createTemporaryView?

Dan
I figured out my issue.  I needed to assign watermarks (e.g. assignTimestampsAndWatermarks) after the fromElements.  I could not figure out how the auto-generated code worked.  I hooked up a debugger and guessed at the issue.

On Thu, Oct 8, 2020 at 11:09 PM Dan Hill <[hidden email]> wrote:
Summary
I'm hitting an error when running a test that is related to using createTemporaryView to convert a Protobuf input stream to Flink Table API.  I'm not sure how to debug "SourceConversion$5.processElement(Unknown Source)" line.  Is this generated code?  How can I debug this?

Any help would be appreciated.  Thanks! - Dan

Details
My current input is a protocol buffer stream.  I convert it to the Table API spec using createTemporaryView.  The code is hacky.  I want to get some tests implemented before cleaning it up.

KeyedStream<BatchLog, String> batchLogStream =
        env.<BatchLog>fromElements(BatchLog.class, new LogGenerator.BatchLogIterator().next())
                .keyBy((logRequest) -> logRequest.getUserId());

tableEnv.createTemporaryView(
        "input_user",
    
    batchLogStream.flatMap(new ToUsers()),
   
     $("userId"),
   
     $("timeEpochMillis"),
   
     $("userTime").rowtime());

This appears to work in my prototype (maybe serialization is broken).  In a Flink test, I hit the following error.

org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map -> SourceConversion(table=[default.mydb.input_user], fields=[userId, timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) -> StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from RUNNING to FAILED. java.lang.NullPointerException
        at SourceConversion$5.processElement(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
        at ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.base/java.lang.Thread.run(Thread.java:834)


I wasn't able to find this exact stacktrace when looking on Google.