 
	
					
		
	
					| Hi, Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks. | 
| hi, Is that what you mean?See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145 Best Ben 
 | 
 
	
					
		
	
					| Hi Navneeth,Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1]. Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object. Best, Fabian 2018-03-30 7:45 GMT+02:00 Ben Yan <[hidden email]>: 
 | 
| 
		Hi Fabian. 	If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language. Thanks! Best Ben 
 | 
 
	
					
		
	
					| You must use a ProcessFunction for
      this, the timestamps are not exposed in any way to map/flatmap
      functions. On 10.04.2018 12:29, Ben Yan wrote: Hi Fabian. 
 | 
| 
				In reply to this post by Ben Yan
			 
		Hi Fabian:         I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent  of S3.Are there any better solutions available?thanks Best Ben 	<a href="https://issues.apache.org/jira/browse/FLINK-8794?jql=text ~ "BucketingSink"" class="">https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22  
 | 
| 
 
 
 | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
