Never terminating test ...

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Never terminating test ...

Avi Levi-2

I have the following test. the problem is it doesn't end ... meaning it doesn't reach the assertion point. What am I doing wrong?

"kinesis consumer" should "consume message from kinesis stream" in {
    import ExecutionContext.Implicits.global
    val sampleData = Seq("a", "b", "c")
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new FlinkKinesisConsumer[String](
     "SampleStream", new SimpleStringSchema, consumerConfig))
    .addSink(new TestSink[String])

Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute()
TestSink.values should contain theSameElementsAs (sampleData) //not executed 
 }
Reply | Threaded
Open this post in threaded view
|

Re: Never terminating test ...

Chesnay Schepler
My guess would be that the consumer does not stop running once it exhausted the kinesis stream. Which makes sense since this isn't a batch job.
(Wouldn't want the source to shut down just because it happened to catch up with your input ;) )

On 12/14/2020 8:09 AM, Avi Levi wrote:

I have the following test. the problem is it doesn't end ... meaning it doesn't reach the assertion point. What am I doing wrong?

"kinesis consumer" should "consume message from kinesis stream" in {
    import ExecutionContext.Implicits.global
    val sampleData = Seq("a", "b", "c")
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new FlinkKinesisConsumer[String](
     "SampleStream", new SimpleStringSchema, consumerConfig))
    .addSink(new TestSink[String])

Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute()
TestSink.values should contain theSameElementsAs (sampleData) //not executed 
 }