Timestamp Issue with OutputTags

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Timestamp Issue with OutputTags

Priyanka Kalra A

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra

Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Issue with OutputTags

taher koitawala-2
Can you please share your code?

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <[hidden email]> wrote:

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra

Reply | Threaded
Open this post in threaded view
|

RE: Timestamp Issue with OutputTags

Priyanka Kalra A

Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, GenericRecord>

{

    private static final long serialVersionUID = 1L;

    private HashMap<String, OutputTag<GenericRecord>> outputMap = new HashMap<>();

    private List<String> tagList;

   

    public OutputTagProcessingFunction(List<String> tagList) {

        super();

        this.tagList = tagList;

    }

 

    @Override

    public void processElement(final GenericRecord value, Context ctx, Collector<GenericRecord> out) throws Exception {

        Set<String> tagSet = new HashSet<>();

        for (String tag : tagList) {

            List<String> tags = Arrays.asList(tag.split(","));

            tagSet.addAll(tags);

        }

       

        for (String tag : tagSet) {

            outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {});

            ctx.output(outputMap.get(tag), value);

        }

    }

}

 

Exception comes at highlighted line.

 

 

Regards,

Priyanka

From: Taher Koitawala <[hidden email]>
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A <[hidden email]>
Cc: user <[hidden email]>; Sushil Kumar Singh B <[hidden email]>; Anuj Kumar Jain A <[hidden email]>; Chirag Dewan <[hidden email]>; Pankaj Kumar Aggarwal <[hidden email]>
Subject: Re: Timestamp Issue with OutputTags

 

Can you please share your code?

 

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <[hidden email]> wrote:

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra

Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Issue with OutputTags

taher koitawala-2
Hi Priyanka,
      I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running. 

Can you try with a static named tag which is defined final. And output data that way. 

Added Till

On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <[hidden email]> wrote:

Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, GenericRecord>

{

    private static final long serialVersionUID = 1L;

    private HashMap<String, OutputTag<GenericRecord>> outputMap = new HashMap<>();

    private List<String> tagList;

   

    public OutputTagProcessingFunction(List<String> tagList) {

        super();

        this.tagList = tagList;

    }

 

    @Override

    public void processElement(final GenericRecord value, Context ctx, Collector<GenericRecord> out) throws Exception {

        Set<String> tagSet = new HashSet<>();

        for (String tag : tagList) {

            List<String> tags = Arrays.asList(tag.split(","));

            tagSet.addAll(tags);

        }

       

        for (String tag : tagSet) {

            outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {});

            ctx.output(outputMap.get(tag), value);

        }

    }

}

 

Exception comes at highlighted line.

 

 

Regards,

Priyanka

From: Taher Koitawala <[hidden email]>
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A <[hidden email]>
Cc: user <[hidden email]>; Sushil Kumar Singh B <[hidden email]>; Anuj Kumar Jain A <[hidden email]>; Chirag Dewan <[hidden email]>; Pankaj Kumar Aggarwal <[hidden email]>
Subject: Re: Timestamp Issue with OutputTags

 

Can you please share your code?

 

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <[hidden email]> wrote:

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra

Reply | Threaded
Open this post in threaded view
|

Re: Timestamp Issue with OutputTags

Till Rohrmann
Hi Priyanka,

Could you tell us which Flink version you are using? Moreover, seeing the complete Flink job could be helpful. The only explanation I have at the moment is that you might have set FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you have to set the TimeCharacteristic to EventTime because setWriteTimestampToKafka only works with event time.

Cheers,
Till

On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala <[hidden email]> wrote:
Hi Priyanka,
      I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running. 

Can you try with a static named tag which is defined final. And output data that way. 

Added Till

On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <[hidden email]> wrote:

Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, GenericRecord>

{

    private static final long serialVersionUID = 1L;

    private HashMap<String, OutputTag<GenericRecord>> outputMap = new HashMap<>();

    private List<String> tagList;

   

    public OutputTagProcessingFunction(List<String> tagList) {

        super();

        this.tagList = tagList;

    }

 

    @Override

    public void processElement(final GenericRecord value, Context ctx, Collector<GenericRecord> out) throws Exception {

        Set<String> tagSet = new HashSet<>();

        for (String tag : tagList) {

            List<String> tags = Arrays.asList(tag.split(","));

            tagSet.addAll(tags);

        }

       

        for (String tag : tagSet) {

            outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {});

            ctx.output(outputMap.get(tag), value);

        }

    }

}

 

Exception comes at highlighted line.

 

 

Regards,

Priyanka

From: Taher Koitawala <[hidden email]>
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A <[hidden email]>
Cc: user <[hidden email]>; Sushil Kumar Singh B <[hidden email]>; Anuj Kumar Jain A <[hidden email]>; Chirag Dewan <[hidden email]>; Pankaj Kumar Aggarwal <[hidden email]>
Subject: Re: Timestamp Issue with OutputTags

 

Can you please share your code?

 

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <[hidden email]> wrote:

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra

Reply | Threaded
Open this post in threaded view
|

RE: Timestamp Issue with OutputTags

Priyanka Kalra A

Hi Till,

 

I’m using Flink 1.11.2 version.

 

Yes, FlinkKafkaProducer011.setWriteTimestampToKafka(true) was set and causing the issue.

 

Thank you for your help!

 

 

Regards,

Priyanka

 

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, January 12, 2021 3:10 PM
To: Taher Koitawala <[hidden email]>
Cc: Priyanka Kalra A <[hidden email]>; user <[hidden email]>
Subject: Re: Timestamp Issue with OutputTags

 

Hi Priyanka,

 

Could you tell us which Flink version you are using? Moreover, seeing the complete Flink job could be helpful. The only explanation I have at the moment is that you might have set FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you have to set the TimeCharacteristic to EventTime because setWriteTimestampToKafka only works with event time.

 

Cheers,

Till

 

On Tue, Jan 12, 2021 at 7:48 AM Taher Koitawala <[hidden email]> wrote:

Hi Priyanka,

      I see that your are generating dynamic output tags. AFAIK, dynamic tagging is causing that issue. I don't think we can add tags after operators are running. 

 

Can you try with a static named tag which is defined final. And output data that way. 

 

Added Till

 

On Tue, Jan 12, 2021, 12:09 PM Priyanka Kalra A <[hidden email]> wrote:

Below is the code:

public class OutputTagProcessingFunction extends ProcessFunction<GenericRecord, GenericRecord>

{

    private static final long serialVersionUID = 1L;

    private HashMap<String, OutputTag<GenericRecord>> outputMap = new HashMap<>();

    private List<String> tagList;

   

    public OutputTagProcessingFunction(List<String> tagList) {

        super();

        this.tagList = tagList;

    }

 

    @Override

    public void processElement(final GenericRecord value, Context ctx, Collector<GenericRecord> out) throws Exception {

        Set<String> tagSet = new HashSet<>();

        for (String tag : tagList) {

            List<String> tags = Arrays.asList(tag.split(","));

            tagSet.addAll(tags);

        }

       

        for (String tag : tagSet) {

            outputMap.putIfAbsent(tag, new OutputTag<GenericRecord>(tag) {});

            ctx.output(outputMap.get(tag), value);

        }

    }

}

 

Exception comes at highlighted line.

 

 

Regards,

Priyanka

From: Taher Koitawala <[hidden email]>
Sent: Monday, January 11, 2021 6:50 PM
To: Priyanka Kalra A <[hidden email]>
Cc: user <[hidden email]>; Sushil Kumar Singh B <[hidden email]>; Anuj Kumar Jain A <[hidden email]>; Chirag Dewan <[hidden email]>; Pankaj Kumar Aggarwal <[hidden email]>
Subject: Re: Timestamp Issue with OutputTags

 

Can you please share your code?

 

On Mon, Jan 11, 2021, 6:47 PM Priyanka Kalra A <[hidden email]> wrote:

Hi Team,

 

We are generating multiple side-output tags and using default processing time on non-keyed stream. The class XXXX$YYY extends ProcessFunction<I, O> and implementation is provided for processElement method. Upon sending valid data, it gives error "Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null".

 

  • Why is it not able to read timestamp?
  • Why is not taking system default time as processing time?

 

Complete stack trace for reference:

java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:70) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:93) ~[kafka-clients-0.11.0.2.jar:?]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:652) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:97) ~[flink-connector-kafka-0.11_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at com.eee.dd.ccc.aaa.processing.XXXX$YYY.processElement(XXXX.java:166)

                at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.runFetchLoop(Kafka010Fetcher.java:151) ~[flink-connector-kafka-0.10_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:765) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:757) ~[flink-connector-kafka-base_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2]

 

 

Your help with this would be deeply appreciated!

 

 

Thanks & Regards,

Priyanka Kalra