Hi all,
I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration). I've noticed that when I attempt to run through a test - it seems that the pipeline never acknowledges the "end" of the stream in a bounded context and just runs forever and never makes it to my assert. Does anything look glaringly wrong with how the source is being defined? object KafkaEventSource {I can verify that the generated source has it's boundedness set properly and all of the configuration options are correct. My test itself is fairly simple and can be broken down as follows:
@Test @JvmStaticI can verify in the logs that my sink is being executed and writing to the appropriate database, however the job itself never finishes. I've tried it using a single Kafka partition as well as multiple partitions and even commented out the logic related to writing to the database. It still just seems to run ... forever. Any recommendations? Perhaps there's a bad configuration or setting that isn't being used as intended? Thanks, Rion |
Following up on this issue, I realized my initial problem was that my test case only contained a single message to send through the pipeline. This resulted in the earliest offset also being the latest and things didn’t exactly work as expected. Once I added several other messages and sent them through, the pipeline appeared to run as expected.
However, the use of “bounded” seems to be fickle in terms of test cases. Since an experience is thrown once the bound is reached, I can typically just wrap my test execution within a try/catch and simply apply my assertion afterwards. This occasionally results in passing tests, but in others, it seems that the bound is reached prior to processing the messages it had seen thus far, and as a result yields a failing test. I don’t know if this is a bug, or intentional, but I’m not aware of a workaround that could “force” the pipeline to finish processing all of the messages from the topic once the bound is reached. I’ve tried sending through “flush records” to the topic, however since there are multiple partitions, it’s not guaranteed that the pipeline will read those last. This is purely a testing problem, as a production job would be streaming and unbounded, however I’d love to have a reliable integration test or a pattern that I could use to guarantee the processing of a finite set of data via a KafkaSource (I.e. send finite records to Kafka, read from topic, process all records, apply assertion after processing). Any ideas/recommendations/workarounds would be greatly welcome and I’d be happy to share my specific code / use-cases if needed. Thanks much, Rion On Mar 12, 2021, at 10:19 AM, Rion Williams <[hidden email]> wrote:
|
Hey Rion, We solved this issue by using usual, unbounded streams, and using awaitility library to express conditions that would end the test - for example, having particular data in a table. IMO this type of testing has the advantage that you won't have divergent behavior from production as you have experienced. Regards, Maciej On Sun, Mar 14, 2021, 05:41 Rion Williams <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |