Handling Bounded Sources with KafkaSource

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling Bounded Sources with KafkaSource

Rion Williams
Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the pipeline never acknowledges the "end" of the stream in a bounded context and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?
object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource<Event> {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder<Event>()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}
I can verify that the generated source has it's boundedness set properly and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:
  1. Inject records into a Kafka Topic
  2. Initialize my Flink job using all of my testing parameters
  3. Apply my assertion (in this case verifying that a JdbcSink wrote to a specific database)
@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}
Where my job itself is broken down further and reads from the source, performs a process function into multiple side outputs and writes each of them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array<String>) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
.fromSource(KafkaEventSource.withParameters(parameters), WatermarkStrategy.noWatermarks(), "kafka")
.process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo, parameters))
}

stream.execute(parameters.getRequired("application"))
}
I can verify in the logs that my sink is being executed and writing to the appropriate database, however the job itself never finishes. I've tried it using a single Kafka partition as well as multiple partitions and even commented out the logic related to writing to the database. It still just seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that isn't being used as intended?

Thanks,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Handling Bounded Sources with KafkaSource

Rion Williams
Following up on this issue, I realized my initial problem was that my test case only contained a single message to send through the pipeline. This resulted in the earliest offset also being the latest and things didn’t exactly work as expected. Once I added several other messages and sent them through, the pipeline appeared to run as expected.

However, the use of “bounded” seems to be fickle in terms of test cases. Since an experience is thrown once the bound is reached, I can typically just wrap my test execution within a try/catch and simply apply my assertion afterwards. 

This occasionally results in passing tests, but in others, it seems that the bound is reached prior to processing the messages it had seen thus far, and as a result yields a failing test. I don’t know if this is a bug, or intentional, but I’m not aware of a workaround that could “force” the pipeline to finish processing all of the messages from the topic once the bound is reached. I’ve tried sending through “flush records” to the topic, however since there are multiple partitions, it’s not guaranteed that the pipeline will read those last. 

This is purely a testing problem, as a production job would be streaming and unbounded, however I’d love to have a reliable integration test or a pattern that I could use to guarantee the processing of a finite set of data via a KafkaSource (I.e. send finite records to Kafka, read from topic, process all records, apply assertion after processing).

Any ideas/recommendations/workarounds would be greatly welcome and I’d be happy to share my specific code / use-cases if needed.

Thanks much,

Rion 

On Mar 12, 2021, at 10:19 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the pipeline never acknowledges the "end" of the stream in a bounded context and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?
object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource<Event> {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder<Event>()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}
I can verify that the generated source has it's boundedness set properly and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:
  1. Inject records into a Kafka Topic
  2. Initialize my Flink job using all of my testing parameters
  3. Apply my assertion (in this case verifying that a JdbcSink wrote to a specific database)
@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}
Where my job itself is broken down further and reads from the source, performs a process function into multiple side outputs and writes each of them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array<String>) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
.fromSource(KafkaEventSource.withParameters(parameters), WatermarkStrategy.noWatermarks(), "kafka")
.process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo, parameters))
}

stream.execute(parameters.getRequired("application"))
}
I can verify in the logs that my sink is being executed and writing to the appropriate database, however the job itself never finishes. I've tried it using a single Kafka partition as well as multiple partitions and even commented out the logic related to writing to the database. It still just seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that isn't being used as intended?

Thanks,

Rion

Reply | Threaded
Open this post in threaded view
|

Re: Handling Bounded Sources with KafkaSource

Maciej Obuchowski
Hey Rion,

We solved this issue by using usual, unbounded streams, and using awaitility library to express conditions that would end the test - for example, having particular data in a table. 

IMO this type of testing has the advantage that you won't have divergent behavior from production as you have experienced.

Regards,
Maciej



On Sun, Mar 14, 2021, 05:41 Rion Williams <[hidden email]> wrote:
Following up on this issue, I realized my initial problem was that my test case only contained a single message to send through the pipeline. This resulted in the earliest offset also being the latest and things didn’t exactly work as expected. Once I added several other messages and sent them through, the pipeline appeared to run as expected.

However, the use of “bounded” seems to be fickle in terms of test cases. Since an experience is thrown once the bound is reached, I can typically just wrap my test execution within a try/catch and simply apply my assertion afterwards. 

This occasionally results in passing tests, but in others, it seems that the bound is reached prior to processing the messages it had seen thus far, and as a result yields a failing test. I don’t know if this is a bug, or intentional, but I’m not aware of a workaround that could “force” the pipeline to finish processing all of the messages from the topic once the bound is reached. I’ve tried sending through “flush records” to the topic, however since there are multiple partitions, it’s not guaranteed that the pipeline will read those last. 

This is purely a testing problem, as a production job would be streaming and unbounded, however I’d love to have a reliable integration test or a pattern that I could use to guarantee the processing of a finite set of data via a KafkaSource (I.e. send finite records to Kafka, read from topic, process all records, apply assertion after processing).

Any ideas/recommendations/workarounds would be greatly welcome and I’d be happy to share my specific code / use-cases if needed.

Thanks much,

Rion 

On Mar 12, 2021, at 10:19 AM, Rion Williams <[hidden email]> wrote:


Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and things have been going well. I configured my source such that it could be used in either a streaming or bounded mode, with the bounded approach specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the pipeline never acknowledges the "end" of the stream in a bounded context and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?
object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource<Event> {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder<Event>()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}
I can verify that the generated source has it's boundedness set properly and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:
  1. Inject records into a Kafka Topic
  2. Initialize my Flink job using all of my testing parameters
  3. Apply my assertion (in this case verifying that a JdbcSink wrote to a specific database)
@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}
Where my job itself is broken down further and reads from the source, performs a process function into multiple side outputs and writes each of them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array<String>) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
.fromSource(KafkaEventSource.withParameters(parameters), WatermarkStrategy.noWatermarks(), "kafka")
.process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo, parameters))
}

stream.execute(parameters.getRequired("application"))
}
I can verify in the logs that my sink is being executed and writing to the appropriate database, however the job itself never finishes. I've tried it using a single Kafka partition as well as multiple partitions and even commented out the logic related to writing to the database. It still just seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that isn't being used as intended?

Thanks,

Rion