My guess would be that the consumer
does not stop running once it exhausted the kinesis stream. Which
makes sense since this isn't a batch job.
(Wouldn't want the source to shut down
just because it happened to catch up with your input ;) )
On 12/14/2020 8:09 AM, Avi Levi wrote:
I
have the following test. the problem is it doesn't end ...
meaning it doesn't reach the assertion point. What am I doing
wrong?
"kinesis consumer" should "consume message from kinesis stream" in {
import ExecutionContext.Implicits.global
val sampleData = Seq("a", "b", "c")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new FlinkKinesisConsumer[String](
"SampleStream", new SimpleStringSchema, consumerConfig))
.addSink(new TestSink[String])
Future(createSampleDataStream(sampleData)) //publish to kinesis stream
env.execute()
TestSink.values should contain theSameElementsAs (sampleData) //not executed
}