| 
					
	
	 Hello, While trying to use the Pyflink DataStream API in Flink 1.13, I have encountered an error regarding list types. I am trying to read data from a Kafka topic that contains events in a json format. For example: {     "timestamp": 1614259940,     "harvesterID": "aws-harvester",     "clientID": "aws-client-id",     "deviceID": "aws-devid",     "payload": {         "Version": {             "PolicyVersion": {                 "Document": {                     "Version": "2012-10-17",                     "Statement": [                         {                             "Action": "ec2:*",                             "Effect": "Allow",                             "Resource": "*"                         },                         {                             "Effect": "Allow",                             "Action": "elasticloadbalancing:*",                             "Resource": "*"                         },                         {                             "Effect": "Allow",                             "Action": "cloudwatch:*",                             "Resource": "*"                         },                         {                             "Effect": "Allow",                             "Action": "autoscaling:*",                             "Resource": "*"                         },                         {                             "Effect": "Allow",                             "Action": "iam:CreateServiceLinkedRole",                             "Resource": "*",                             "Condition": {                                 "StringEquals": {                                     "iam:AWSServiceName": [                                     ]                                 }                             }                         }                     ]                 },                 "VersionId": "v5",                 "IsDefaultVersion": true,                 "CreateDate": "2018-11-27 02:16:56+00:00"             },             "ResponseMetadata": {                 "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515",                 "HTTPStatusCode": 200,                 "HTTPHeaders": {                     "x-amzn-requestid": "6d32c946-1273-4bc5-b465-e5549dc4f515",                     "content-type": "text/xml",                     "content-length": "2312",                     "vary": "accept-encoding",                     "date": "Thu, 25 Feb 2021 15:32:18 GMT"                 },                 "RetryAttempts": 0             }         },         "Policy": {             "Policy": {                 "PolicyName": "AmazonEC2FullAccess",                 "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6",                 "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess",                 "Path": "/",                 "DefaultVersionId": "v5",                 "AttachmentCount": 2,                 "PermissionsBoundaryUsageCount": 0,                 "IsAttachable": true,                 "Description": "Provides full access to Amazon EC2 via the AWS Management Console.",                 "CreateDate": "2015-02-06 18:40:15+00:00",                 "UpdateDate": "2018-11-27 02:16:56+00:00"             },             "ResponseMetadata": {                 "RequestId": "a7e9f175-a757-4215-851e-f3d001083631",                 "HTTPStatusCode": 200,                 "HTTPHeaders": {                     "x-amzn-requestid": "a7e9f175-a757-4215-851e-f3d001083631",                     "content-type": "text/xml",                     "content-length": "866",                     "date": "Thu, 25 Feb 2021 15:32:18 GMT"                 },                 "RetryAttempts": 0             }         }     } } input_type = Types.ROW_NAMED(         ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'],         [             Types.LONG(),  # timestamp             Types.STRING(),  # harvesterID             Types.STRING(),  # clientID             Types.STRING(),  # deviceID             Types.ROW_NAMED(  # Payload                 ['Version', 'Policy'],                 [                     Types.ROW_NAMED(  # Version                         ['PolicyVersion', 'ResponseMetadata'],                         [                             Types.ROW_NAMED(  # PolicyVersion                                 ['Document', 'VersionId', 'IsDefaultVersion', 'CreateDate'],                                 [                                     Types.ROW_NAMED(  # Document                                         ['Version', 'Statement'],                                         [                                             Types.STRING(),  # Version                                             Types.LIST(  # Statement                                                 Types.ROW_NAMED(                                                     ['Action', 'Effect', 'Resource', 'Condition'],                                                     [                                                         Types.STRING(),  # Action                                                         Types.STRING(),  # Effect                                                         Types.STRING(),  # Resource                                                         Types.ROW_NAMED(  # Condition                                                             ['StringEquals'],                                                             [                                                                 Types.ROW_NAMED(  # StringEquals                                                                     ['iam:AWSServiceName'],                                                                     [                                                                         Types.LIST(Types.STRING())  # iam:AWSServiceName                                                                     ])                                                             ])                                                     ])                                             )                                         ]),                                     Types.STRING(),  # VersionId                                     Types.BOOLEAN(),  # IsDefaultVersion                                     Types.STRING()  # CreateDate                                 ]),                             Types.ROW_NAMED(                                 ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'],                                 [                                     Types.STRING(),  # RequestId                                     Types.INT(),  # HTTPStatusCode                                     Types.MAP(  # HTTPHeaders                                         Types.STRING(),                                         Types.STRING()                                     ),                                     Types.INT()  # RetryAttempts                                 ])                         ]),                     Types.ROW_NAMED(  # Policy                         ['Policy', 'ResponseMetadata'],                         [                             Types.ROW_NAMED(  # Policy                                 ['PolicyName', 'PolicyId', 'Arn', 'Path', 'DefaultVersionId', 'AttachmentCount',                                     'PermissionBoundaryUsageCount', 'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'],                                 [                                     Types.STRING(),  # PolicyName                                     Types.STRING(),  # PolicyId                                     Types.STRING(),  # Arn                                     Types.STRING(),  # Path                                     Types.STRING(),  # DefaultVersionId                                     Types.INT(),  # AttachmentCount                                     Types.INT(),  # PermissionBoundaryUsageCount                                     Types.BOOLEAN(),  # IsAttachable                                     Types.STRING(),  # Description                                     Types.STRING(),  # CreateDate                                     Types.STRING()  # UpdateDate                                 ]),                             Types.ROW_NAMED(  # ResponseMetadata                                 ['RequestId', 'HTTPStatusCode', 'HTTPHeaders', 'RetryAttempts'],                                 [                                     Types.STRING(),  # RequestId                                     Types.INT(),  # HTTPStatusCode                                     Types.MAP(  # HTTPHeaders                                         Types.STRING(),                                         Types.STRING()                                     ),                                     Types.INT()  # RetryAttempts                                 ])                         ])                 ])         ]) But when I try to run submit the application to Flink, I receive the following error, when it tries to read data from the Kafka topic: java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to org.apache.flink.types.Row     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)     at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:102)     at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)     at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72) Best regards, Laszlo  | 
			
| 
					
	
	 
		Hi Laszlo, 
				It seems because the json format supports object array type and doesn’t support list type.  However, it still hasn’t provided object array type in PyFlink Datastream API [1]. I have created a ticket as a following up. For now, I guess you could implement it yourself and could take a look at the basic array [2] as an example. Regards, Dian 
  | 
			
| Free forum by Nabble | Edit this page | 
	
	
		