Stateful Function Ingress issues

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Function Ingress issues

Jessy Ping
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Function Ingress issues

Jessy Ping
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()

Why is this check important and how to resolve this for eventhubs.?
private byte[] requireNonNullKey(byte[] key) {
if (key == null) {
IngressType tpe = ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
throw new IllegalStateException(
"The "
+ tpe.namespace()
+ "/"
+ tpe.type()
+ " ingress requires a UTF-8 key set for each record.");
}
return key;
}
Thanks 
Jessy
On Thu, 10 Jun 2021 at 20:13, Jessy Ping <[hidden email]> wrote:
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Function Ingress issues

Timothy Bess
Hi Jessy,

I had this issue as well, here's the resolution. I ended up forking the version of statefun I used and removing the null check to default to empty string, but I'm going to switch to the solution Igal suggested.

Thanks,

Tim

On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <[hidden email]> wrote:
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()

Why is this check important and how to resolve this for eventhubs.?
private byte[] requireNonNullKey(byte[] key) {
if (key == null) {
IngressType tpe = ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
throw new IllegalStateException(
"The "
+ tpe.namespace()
+ "/"
+ tpe.type()
+ " ingress requires a UTF-8 key set for each record.");
}
return key;
}
Thanks 
Jessy
On Thu, 10 Jun 2021 at 20:13, Jessy Ping <[hidden email]> wrote:
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()


Reply | Threaded
Open this post in threaded view
|

Re: Stateful Function Ingress issues

Igal Shilman
Hello Jessy,

Currently StateFun Kafka ingress interprets the keys of the record as the destination address.
So you'd have to attach a key to use that specific ingress.

If this is not an option for you, you can consider @Tim's suggestion or create a JIRA with a feature request,
which we will be happy to follow up on, if enough people are interested :-)

Cheers,
Igal



On Thu, Jun 10, 2021 at 5:11 PM Timothy Bess <[hidden email]> wrote:
Hi Jessy,

I had this issue as well, here's the resolution. I ended up forking the version of statefun I used and removing the null check to default to empty string, but I'm going to switch to the solution Igal suggested.

Thanks,

Tim

On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <[hidden email]> wrote:
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()

Why is this check important and how to resolve this for eventhubs.?
private byte[] requireNonNullKey(byte[] key) {
if (key == null) {
IngressType tpe = ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
throw new IllegalStateException(
"The "
+ tpe.namespace()
+ "/"
+ tpe.type()
+ " ingress requires a UTF-8 key set for each record.");
}
return key;
}
Thanks 
Jessy
On Thu, 10 Jun 2021 at 20:13, Jessy Ping <[hidden email]> wrote:
Hi all,

I am trying to consume data from azure eventhub using the kafka ingress and i am getting the following error.

java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress requires a UTF-8 key set for each record.

While sending the data to the Event hub using my data producer, I am not sending it with any KEY. And the same data can be consumed without any issues with a normal flink application .

I can see the error is raising from RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()