Row arity of from does not match serializers.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Row arity of from does not match serializers.

srikanth flink
My Flink job does reading from Kafka stream and does some processing.

Code snippet:
DataStream<Row> flatternedDnsStream = filteredNonNullStream.rebalance()
.map(node -> {
JsonNode source = node.path("source");
JsonNode destination = node.path("destination");
JsonNode dns = node.path("dns");
JsonNode event = node.path("event");
JsonNode client = node.path("client");
JsonNode organization = node.path("organization");
JsonNode timestamp_received = node.path("timestamp_received");
JsonNode transaction = node.path("transaction");
JsonNode timestamp = node.path("@timestamp");
JsonNode message = node.path("message");
JsonNode network = node.path("network");
JsonNode ecs = node.path("ecs");

return Row.of(String.join(",", getParsed(ecs)), String.join(",", getParsed(source)),
String.join(",", getParsed(destination)), String.join(",", getParsed(event)),
String.join(",", getParsed(organization)), String.join(",", getParsed(timestamp_received)),
String.join(",", getParsed(client)), String.join(",", getParsed(transaction)),
String.join(",", getParsed(timestamp)), String.join(",", getParsed(message)),
String.join(",", getParsed(dns)), String.join(",", getParsed(network)));
}).returns(rowTypeDNS);

public static final RowTypeInfo rowTypeDNS = new RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING());

private static List<String> getParsed(JsonNode node) {

List<String> list = new ArrayList<>();
Iterator<Entry<String, JsonNode>> it = node.fields();
iterateAndExtract(it, list);
return list;
}

private static void iterateAndExtract(Iterator<Entry<String, JsonNode>> it, List<String> list) {

while (it.hasNext()) {
Entry<String, JsonNode> e = it.next();
if (!e.getValue().isContainerNode()) {
list.add(e.getValue().asText());
continue;
}

iterateAndExtract(e.getValue().fields(), list);
}
}

failing with the following error:
java.lang.RuntimeException: Row arity of from does not match serializers.
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

Help me understand the error in detail.

Thanks
Srikanth

Reply | Threaded
Open this post in threaded view
|

Re: Row arity of from does not match serializers.

Fabian Hueske-2
Hi,

The inline lambda MapFunction produces a Row with 12 String fields (12 calls to String.join()).
You use RowTypeInfo rowTypeDNS to declare the return type of the lambda MapFunction. However, rowTypeDNS is defined with much more String fields.

The exception tells you that the number of fields returned by the function is not equal to the number of fields that were declared by rowTypeDNS.

Hope this helps,
Fabian

Am Do., 5. Dez. 2019 um 20:35 Uhr schrieb srikanth flink <[hidden email]>:
My Flink job does reading from Kafka stream and does some processing.

Code snippet:
DataStream<Row> flatternedDnsStream = filteredNonNullStream.rebalance()
.map(node -> {
JsonNode source = node.path("source");
JsonNode destination = node.path("destination");
JsonNode dns = node.path("dns");
JsonNode event = node.path("event");
JsonNode client = node.path("client");
JsonNode organization = node.path("organization");
JsonNode timestamp_received = node.path("timestamp_received");
JsonNode transaction = node.path("transaction");
JsonNode timestamp = node.path("@timestamp");
JsonNode message = node.path("message");
JsonNode network = node.path("network");
JsonNode ecs = node.path("ecs");

return Row.of(String.join(",", getParsed(ecs)), String.join(",", getParsed(source)),
String.join(",", getParsed(destination)), String.join(",", getParsed(event)),
String.join(",", getParsed(organization)), String.join(",", getParsed(timestamp_received)),
String.join(",", getParsed(client)), String.join(",", getParsed(transaction)),
String.join(",", getParsed(timestamp)), String.join(",", getParsed(message)),
String.join(",", getParsed(dns)), String.join(",", getParsed(network)));
}).returns(rowTypeDNS);

public static final RowTypeInfo rowTypeDNS = new RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING());

private static List<String> getParsed(JsonNode node) {

List<String> list = new ArrayList<>();
Iterator<Entry<String, JsonNode>> it = node.fields();
iterateAndExtract(it, list);
return list;
}

private static void iterateAndExtract(Iterator<Entry<String, JsonNode>> it, List<String> list) {

while (it.hasNext()) {
Entry<String, JsonNode> e = it.next();
if (!e.getValue().isContainerNode()) {
list.add(e.getValue().asText());
continue;
}

iterateAndExtract(e.getValue().fields(), list);
}
}

failing with the following error:
java.lang.RuntimeException: Row arity of from does not match serializers.
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

Help me understand the error in detail.

Thanks
Srikanth