[info] 16:18:53.130 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Kafka consumer initialized [info] 16:18:53.406 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Initiating connection to node 0.0.0.0:9092 (id: -1 rack: null) [info] 16:18:53.421 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent [info] 16:18:53.423 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received [info] 16:18:53.424 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency [info] 16:18:53.430 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Created socket with SO_RCVBUF = 342972, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1 [info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Completed connection to node -1. Fetching API versions. [info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1. [info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1. [info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Using older server API v0 to send API_VERSIONS {} with correlation id 2 to node -1 [info] 16:18:53.574 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): UNSUPPORTED) [info] 16:18:53.577 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Sending metadata request (type=MetadataRequest, topics=) to node 0.0.0.0:9092 (id: -1 rack: null) [info] 16:18:53.591 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Using older server API v5 to send METADATA {topics=[],allow_auto_topic_creation=true} with correlation id 3 to node -1 [info] 16:18:53.603 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Using older server API v5 to send METADATA {topics=[parcel_event_update],allow_auto_topic_creation=true} with correlation id 0 to node -1 [info] 16:18:53.628 [Source: Custom Source (1/1)] INFO org.apache.kafka.clients.Metadata - Cluster ID: UqNuwLlMTyu4KMKniZ4q-Q [info] 16:18:53.628 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = UqNuwLlMTyu4KMKniZ4q-Q, nodes = [4c34977feb35:9092 (id: 1001 rack: null)], partitions = [], controller = 4c34977feb35:9092 (id: 1001 rack: null)) [info] 16:18:53.765 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Topic metadata fetch included errors: {parcel_event_update=LEADER_NOT_AVAILABLE} [info] 16:18:53.867 [Source: Custom Source (1/1)] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Initiating connection to node 4c34977feb35:9092 (id: 1001 rack: null) [info] 16:18:53.902 [Source: Custom Source (1/1)] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=pp_flink_shipment_processor] Error connecting to node 4c34977feb35:9092 (id: 1001 rack: null) [info] java.io.IOException: Can't resolve address: 4c34977feb35:9092 [info] at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) [info] at org.apache.kafka.common.network.Selector.connect(Selector.java:214) [info] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [info] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [info] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:485) [info] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)