Hi,
I am trying to replay kafka logs from specific offset . But I am not able to make it work . Using Ref : https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration My Code : import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import java.util.*; public class ReplayTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final FlinkKafkaConsumer010<String> kafkaSource = getKafkaSource(); final DataStreamSource<String> in = env.addSource(kafkaSource); in.addSink(new PrintSinkFunction<>()); in.addSink(getKafkaSink()); env.execute(); } private static FlinkKafkaConsumer010<String> getKafkaSource() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:8081"); properties.setProperty("group.id", "test11"); final FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties); HashMap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L); consumer.restoreState(specificStartOffsets); return consumer; } private static FlinkKafkaProducer010<String> getKafkaSink() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); return new FlinkKafkaProducer010<>("test2", new SimpleStringSchema(), properties); } } I am using <flink.version>1.2.1</flink.version> for all flink dependencies . When I am running code on IDE or local flink set up , I am getting org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) Caused by: java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388) at in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50) at in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.main(ReplayTest.java:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) ... 13 more Thanks and Regards Sohanvir |
Hi, it seems from the stack trace, that you are calling the restoreState() method yourself in ReplayTest. org.apache.flink.streaming. atin.dailyhunt.cis.enrichment. This method is actually an internal method called by the system on the task managers when restoring state. You are calling the method on the client when submitting the Flink job. That's why "The runtime context has not been initialized.". On Wed, Aug 23, 2017 at 7:34 AM, sohimankotia <[hidden email]> wrote: Hi, |
Thanks for Reply Robert .
How do I specify start position of consumer for FlinkKafkaConsumer010? Because methods e.g. setStartFromSpecificOffsets sepecified in documentation ,are not present in FlinkKafkaConsumer010. |
Free forum by Nabble | Edit this page |