I have some interesting result with my test code

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

I have some interesting result with my test code

Kevin Kwon
Hi guys, I've been recently experimenting with end-to-end testing environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event increased by 100 milliseconds per loop (first message and last message has a difference of 100 seconds). There are 3 partitions for the topic I'm writing to. Below code is the test message producer using Confluent's Python SDK
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()

2. Flink performs an SQL query on this stream and publishes it back to Kafka topic that has 3 partitions. Below is the SQL code
| SELECT
| o.id,
| COUNT(*),
| TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
| order o
| GROUP BY
| o.id,
| TUMBLE(o.ts, INTERVAL '5' SECONDS)
So I expect the sum of all the counts of the result to be equal to 1000 but it seems that a lot of messages are missing (797 as below). I can't seem to figure out why though. I'm using event time for the environment

Screenshot 2020-11-02 at 23.35.23.png

Below is the configuration code
Here's the code for the consumer settings for Kafka
private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", "awesome_order")

val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
classOf[Order],
kafkaSchemaRegistry
),
properties
)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})
}
kafkaConsumer
}
Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka
private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
val properties: Properties = new Properties()
properties.put("bootstrap.servers", kafkaBrokers)
properties.put("transaction.timeout.ms", "60000")

val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
classOf[ProcessedModel],
"procssed_model-value",
kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
)
kafkaProducer
}


Side Note
Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush() # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
Reply | Threaded
Open this post in threaded view
|

Re: I have some interesting result with my test code

Kevin Kwon
Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers?
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})

On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <[hidden email]> wrote:
Hi guys, I've been recently experimenting with end-to-end testing environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event increased by 100 milliseconds per loop (first message and last message has a difference of 100 seconds). There are 3 partitions for the topic I'm writing to. Below code is the test message producer using Confluent's Python SDK
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()

2. Flink performs an SQL query on this stream and publishes it back to Kafka topic that has 3 partitions. Below is the SQL code
| SELECT
| o.id,
| COUNT(*),
| TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
| order o
| GROUP BY
| o.id,
| TUMBLE(o.ts, INTERVAL '5' SECONDS)
So I expect the sum of all the counts of the result to be equal to 1000 but it seems that a lot of messages are missing (797 as below). I can't seem to figure out why though. I'm using event time for the environment

Screenshot 2020-11-02 at 23.35.23.png

Below is the configuration code
Here's the code for the consumer settings for Kafka
private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", "awesome_order")

val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
classOf[Order],
kafkaSchemaRegistry
),
properties
)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})
}
kafkaConsumer
}
Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka
private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
val properties: Properties = new Properties()
properties.put("bootstrap.servers", kafkaBrokers)
properties.put("transaction.timeout.ms", "60000")

val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
classOf[ProcessedModel],
"procssed_model-value",
kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
)
kafkaProducer
}


Side Note
Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush() # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
Reply | Threaded
Open this post in threaded view
|

Re: I have some interesting result with my test code

rmetzger0
Hi Kevin,
thanks a lot for posting this problem. 
I'm adding Jark to the thread, he or another committer working on Flink SQL can maybe provide some insights.

On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <[hidden email]> wrote:
Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers?
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})

On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <[hidden email]> wrote:
Hi guys, I've been recently experimenting with end-to-end testing environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event increased by 100 milliseconds per loop (first message and last message has a difference of 100 seconds). There are 3 partitions for the topic I'm writing to. Below code is the test message producer using Confluent's Python SDK
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()

2. Flink performs an SQL query on this stream and publishes it back to Kafka topic that has 3 partitions. Below is the SQL code
| SELECT
| o.id,
| COUNT(*),
| TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
| order o
| GROUP BY
| o.id,
| TUMBLE(o.ts, INTERVAL '5' SECONDS)
So I expect the sum of all the counts of the result to be equal to 1000 but it seems that a lot of messages are missing (797 as below). I can't seem to figure out why though. I'm using event time for the environment

Screenshot 2020-11-02 at 23.35.23.png

Below is the configuration code
Here's the code for the consumer settings for Kafka
private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", "awesome_order")

val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
classOf[Order],
kafkaSchemaRegistry
),
properties
)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})
}
kafkaConsumer
}
Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka
private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
val properties: Properties = new Properties()
properties.put("bootstrap.servers", kafkaBrokers)
properties.put("transaction.timeout.ms", "60000")

val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
classOf[ProcessedModel],
"procssed_model-value",
kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
)
kafkaProducer
}


Side Note
Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush() # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
Reply | Threaded
Open this post in threaded view
|

Re: I have some interesting result with my test code

Jark Wu-3
Hi Kevin, 

Could you share the code of how you register the FlinkKafkaConsumer as a table? 

Regarding your initialization of FlinkKafkaConsumer, I would recommend to setStartFromEarliest() to guarantee it consumes all the records in partitions. 

Regarding the flush(), it seems it is in the foreach loop? So it is not flushing after publishing ALL events?
I'm not experienced with the flush() API, could this method block and the following random events can't be published to Kafka?

Best,
Jark

On Wed, 4 Nov 2020 at 04:04, Robert Metzger <[hidden email]> wrote:
Hi Kevin,
thanks a lot for posting this problem. 
I'm adding Jark to the thread, he or another committer working on Flink SQL can maybe provide some insights.

On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <[hidden email]> wrote:
Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers?
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})

On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <[hidden email]> wrote:
Hi guys, I've been recently experimenting with end-to-end testing environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event increased by 100 milliseconds per loop (first message and last message has a difference of 100 seconds). There are 3 partitions for the topic I'm writing to. Below code is the test message producer using Confluent's Python SDK
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()

2. Flink performs an SQL query on this stream and publishes it back to Kafka topic that has 3 partitions. Below is the SQL code
| SELECT
| o.id,
| COUNT(*),
| TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
| order o
| GROUP BY
| o.id,
| TUMBLE(o.ts, INTERVAL '5' SECONDS)
So I expect the sum of all the counts of the result to be equal to 1000 but it seems that a lot of messages are missing (797 as below). I can't seem to figure out why though. I'm using event time for the environment

Screenshot 2020-11-02 at 23.35.23.png

Below is the configuration code
Here's the code for the consumer settings for Kafka
private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", "awesome_order")

val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
classOf[Order],
kafkaSchemaRegistry
),
properties
)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})
}
kafkaConsumer
}
Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka
private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
val properties: Properties = new Properties()
properties.put("bootstrap.servers", kafkaBrokers)
properties.put("transaction.timeout.ms", "60000")

val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
classOf[ProcessedModel],
"procssed_model-value",
kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
)
kafkaProducer
}


Side Note
Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush() # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though
Reply | Threaded
Open this post in threaded view
|

Re: I have some interesting result with my test code

Jark Wu-3
Great to hear it works!

`setStartFromGroupOffset` [1] will start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers. If offsets could not be found for a partition, the 'auto.offset.reset' setting in the properties will be used. And the default value of 'auto.offset.reset' property is latest [2].

I think that's why `setStartFromGroupOffset` doesn't consume all the events.


On Fri, 6 Nov 2020 at 07:04, Kevin Kwon <[hidden email]> wrote:
Hi Jark, setStartFromEarliest actually worked. It's strange since my test is stateless (complete teardown of all docker containers) and the consumer creates the topic once it starts consuming a topic. I was assuming the setStartFromGroupOffset will let the consumer consume from the beginning anyways. I'll share the code if I have any further problems, since I can't just copy paste code created inside my company

Thanks though! I appreciate your help

On Thu, Nov 5, 2020 at 4:55 AM Jark Wu <[hidden email]> wrote:
Hi Kevin, 

Could you share the code of how you register the FlinkKafkaConsumer as a table? 

Regarding your initialization of FlinkKafkaConsumer, I would recommend to setStartFromEarliest() to guarantee it consumes all the records in partitions. 

Regarding the flush(), it seems it is in the foreach loop? So it is not flushing after publishing ALL events?
I'm not experienced with the flush() API, could this method block and the following random events can't be published to Kafka?

Best,
Jark

On Wed, 4 Nov 2020 at 04:04, Robert Metzger <[hidden email]> wrote:
Hi Kevin,
thanks a lot for posting this problem. 
I'm adding Jark to the thread, he or another committer working on Flink SQL can maybe provide some insights.

On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon <[hidden email]> wrote:
Looks like the event time that I've specified in the consumer is not being respected. Does the timestamp assigner actually work in Kafka consumers?
      .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})

On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon <[hidden email]> wrote:
Hi guys, I've been recently experimenting with end-to-end testing environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event increased by 100 milliseconds per loop (first message and last message has a difference of 100 seconds). There are 3 partitions for the topic I'm writing to. Below code is the test message producer using Confluent's Python SDK
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()

2. Flink performs an SQL query on this stream and publishes it back to Kafka topic that has 3 partitions. Below is the SQL code
| SELECT
| o.id,
| COUNT(*),
| TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
| order o
| GROUP BY
| o.id,
| TUMBLE(o.ts, INTERVAL '5' SECONDS)
So I expect the sum of all the counts of the result to be equal to 1000 but it seems that a lot of messages are missing (797 as below). I can't seem to figure out why though. I'm using event time for the environment

Screenshot 2020-11-02 at 23.35.23.png

Below is the configuration code
Here's the code for the consumer settings for Kafka
private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", "awesome_order")

val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
classOf[Order],
kafkaSchemaRegistry
),
properties
)
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
.withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp: Long): Long = {
order.getTimestamp
}
})
}
kafkaConsumer
}
Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka
private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
val properties: Properties = new Properties()
properties.put("bootstrap.servers", kafkaBrokers)
properties.put("transaction.timeout.ms", "60000")

val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
classOf[ProcessedModel],
"procssed_model-value",
kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
)
kafkaProducer
}


Side Note
Another interesting part is that, if I flush "after" publishing all events, the processed event doesn't even seem to arrive at the sink at all. The source is still populated in normally in Flink. It's as if there is no progress after the message arrived to source
order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush() # if I flush "AFTER" the loop, there is no processed data in the sink of Flink. event itself arrives without any problem in the source in Flink though