Reset Kafka Consumer using Flink Consumer 10 API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reset Kafka Consumer using Flink Consumer 10 API

sohimankotia
Hi,

I am trying to replay kafka logs from specific offset . But I am not able to make it work .


Using Ref : https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

My Code :



import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.*;


public class ReplayTest {

        public static void main(String[] args) throws Exception {

                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);

                final FlinkKafkaConsumer010<String> kafkaSource = getKafkaSource();
                final DataStreamSource<String> in = env.addSource(kafkaSource);

                in.addSink(new PrintSinkFunction<>());
                in.addSink(getKafkaSink());

                env.execute();


        }

        private static FlinkKafkaConsumer010<String> getKafkaSource() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("zookeeper.connect", "localhost:8081");
                properties.setProperty("group.id", "test11");
                final FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
                HashMap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
                specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L);
                consumer.restoreState(specificStartOffsets);
                return consumer;
        }


        private static FlinkKafkaProducer010<String> getKafkaSink() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                return new FlinkKafkaProducer010<>("test2", new SimpleStringSchema(), properties);
        }


}


I am using <flink.version>1.2.1</flink.version> for all flink dependencies .

When I am running code on IDE or local flink set up , I am getting


org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
        at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: java.lang.IllegalStateException: The runtime context has not been initialized.
        at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388)
        at in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50)
        at in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.main(ReplayTest.java:27)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        ... 13 more


Thanks and Regards
Sohanvir



















Reply | Threaded
Open this post in threaded view
|

Re: Reset Kafka Consumer using Flink Consumer 10 API

rmetzger0
Hi,

it seems from the stack trace, that you are calling the restoreState() method yourself in ReplayTest.getKafkaSource(ReplayTest.java:50):

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388)
        at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50)


This method is actually an internal method called by the system on the task managers when restoring state.
You are calling the method on the client when submitting the Flink job. That's why "The runtime context has not been initialized.".


On Wed, Aug 23, 2017 at 7:34 AM, sohimankotia <[hidden email]> wrote:
Hi,

I am trying to replay kafka logs from specific offset . But I am not able to
make it work .


Using Ref :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

My Code :



import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.*;


public class ReplayTest {

        public static void main(String[] args) throws Exception {

                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);

                final FlinkKafkaConsumer010<String> kafkaSource = getKafkaSource();
                final DataStreamSource<String> in = env.addSource(kafkaSource);

                in.addSink(new PrintSinkFunction<>());
                in.addSink(getKafkaSink());

                env.execute();


        }

        private static FlinkKafkaConsumer010<String> getKafkaSource() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("zookeeper.connect", "localhost:8081");
                properties.setProperty("group.id", "test11");
                final FlinkKafkaConsumer010<String> consumer = new
FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
                HashMap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
                specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L);
                consumer.restoreState(specificStartOffsets);
                return consumer;
        }


        private static FlinkKafkaProducer010<String> getKafkaSink() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                return new FlinkKafkaProducer010<>("test2", new SimpleStringSchema(),
properties);
        }


}


I am using <flink.version>1.2.1</flink.version> for all flink dependencies .

When I am running code on IDE or local flink set up , I am getting


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
        at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
        at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: java.lang.IllegalStateException: The runtime context has not been
initialized.
        at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388)
        at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50)
        at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.main(ReplayTest.java:27)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        ... 13 more


Thanks and Regards
Sohanvir























--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reset-Kafka-Consumer-using-Flink-Consumer-10-API-tp15077.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Reset Kafka Consumer using Flink Consumer 10 API

sohimankotia
Thanks for Reply Robert .

How do I specify start position of consumer for FlinkKafkaConsumer010?

Because  methods e.g. setStartFromSpecificOffsets  sepecified in  documentation  ,are not present in  FlinkKafkaConsumer010.