kafka integration issue

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

kafka integration issue

Alex Rovner
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 


Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Till Rohrmann
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Alex Rovner
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 


Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

rmetzger0
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Alex Rovner
I believe I have set the version uniformly, unless I am overlooking something in the pom. Attaching my project.

I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <[hidden email]> wrote:
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



flink-poc.zip (80K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

rmetzger0
Hi Alex,

How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local) cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) is not up to date?

I just tried it locally, and the job seems to execute:

./bin/flink run /home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job execution switched to status RUNNING.
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to SCHEDULED 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to DEPLOYING 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING 

By the way, in order to print the stream, you have to call counts.print() instead of print(counts).






On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner <[hidden email]> wrote:
I believe I have set the version uniformly, unless I am overlooking something in the pom. Attaching my project.

I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <[hidden email]> wrote:
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Stephan Ewen
"java.lang.NoSuchMethodError" in Java virtually always means that the code is compiled against a different version than executed.

The version in "~/git/flink/" must be slightly outdated. Can you pull the latest update of the 1.0-SNAPSHOT master and rebuild the code?

Stephan

On Tue, Jan 5, 2016 at 9:48 PM, Robert Metzger <[hidden email]> wrote:
Hi Alex,

How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local) cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) is not up to date?

I just tried it locally, and the job seems to execute:

./bin/flink run /home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job execution switched to status RUNNING.
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to SCHEDULED 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to DEPLOYING 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING 

By the way, in order to print the stream, you have to call counts.print() instead of print(counts).






On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner <[hidden email]> wrote:
I believe I have set the version uniformly, unless I am overlooking something in the pom. Attaching my project.

I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <[hidden email]> wrote:
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 




Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Alex Rovner
Updating to latest version worked. Thanks! My git repo was less than 1 day old :-(

On Wed, Jan 6, 2016 at 4:54 AM Stephan Ewen <[hidden email]> wrote:
"java.lang.NoSuchMethodError" in Java virtually always means that the code is compiled against a different version than executed.

The version in "~/git/flink/" must be slightly outdated. Can you pull the latest update of the 1.0-SNAPSHOT master and rebuild the code?

Stephan

On Tue, Jan 5, 2016 at 9:48 PM, Robert Metzger <[hidden email]> wrote:
Hi Alex,

How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local) cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) is not up to date?

I just tried it locally, and the job seems to execute:

./bin/flink run /home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job execution switched to status RUNNING.
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to SCHEDULED 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to DEPLOYING 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING 

By the way, in order to print the stream, you have to call counts.print() instead of print(counts).






On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner <[hidden email]> wrote:
I believe I have set the version uniformly, unless I am overlooking something in the pom. Attaching my project.

I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <[hidden email]> wrote:
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 




--
Alex Rovner
Director, Data Engineering 


Reply | Threaded
Open this post in threaded view
|

Re: kafka integration issue

Stephan Ewen
Wow, okay, you must have hit exactly the point in time when the update was pushed ;-)

On Wed, Jan 6, 2016 at 2:18 PM, Alex Rovner <[hidden email]> wrote:
Updating to latest version worked. Thanks! My git repo was less than 1 day old :-(

On Wed, Jan 6, 2016 at 4:54 AM Stephan Ewen <[hidden email]> wrote:
"java.lang.NoSuchMethodError" in Java virtually always means that the code is compiled against a different version than executed.

The version in "~/git/flink/" must be slightly outdated. Can you pull the latest update of the 1.0-SNAPSHOT master and rebuild the code?

Stephan

On Tue, Jan 5, 2016 at 9:48 PM, Robert Metzger <[hidden email]> wrote:
Hi Alex,

How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local) cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) is not up to date?

I just tried it locally, and the job seems to execute:

./bin/flink run /home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job execution switched to status RUNNING.
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to SCHEDULED 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to DEPLOYING 
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING 

By the way, in order to print the stream, you have to call counts.print() instead of print(counts).






On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner <[hidden email]> wrote:
I believe I have set the version uniformly, unless I am overlooking something in the pom. Attaching my project.

I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <[hidden email]> wrote:
I think the problem is that you only set the version of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.

On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <[hidden email]> wrote:
Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing another error:

Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to version `1.0-SNAPSHOT`. With this version, the error should no longer occur.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <[hidden email]> wrote:
Hello Flinkers!

The below program produces the following error when running locally. I am building the program using maven, using 0.10.0 and running in streaming only local mode "start-local-streaming.sh".  I have verified that kafka and the topic is working properly by using kafka-console-*.sh scripts. What am I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))

val counts = stream.map(f=>f.split(","))

print(counts)

env.execute()
}
--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 



--
Alex Rovner
Director, Data Engineering 




--
Alex Rovner
Director, Data Engineering