Only a single message processed

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Only a single message processed

Niclas Hedhman
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java
Reply | Threaded
Open this post in threaded view
|

Re: Only a single message processed

Fabian Hueske-2
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the topic static?
If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1].

Best, Fabian


2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Reply | Threaded
Open this post in threaded view
|

Re: Only a single message processed

Niclas Hedhman

So, the producer is run (at the moment) manually (command-line) one message at a time.
Kafka's tooling (different consumer group) shows that a message is added each time.

Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske" <[hidden email]> wrote:
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the topic static?
If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1].

Best, Fabian


2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Reply | Threaded
Open this post in threaded view
|

Re: Only a single message processed

Xingcan Cui
Hi Niclas,

About the second point you mentioned, was the processed message a random one or a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while debugging. Also, before that, you may try fetching the messages with the Kafka console consumer tool to see whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll be helpful.

Best,
Xingcan



On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <[hidden email]> wrote:


So, the producer is run (at the moment) manually (command-line) one message at a time.
Kafka's tooling (different consumer group) shows that a message is added each time.

Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske" <[hidden email]> wrote:
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the topic static?
If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1].

Best, Fabian


2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java


Reply | Threaded
Open this post in threaded view
|

Re: Only a single message processed

Niclas Hedhman

(Sorry for the incoherent order and ramblings. I am writing this as I am trying to sort out what is going on...)

1. It is the first message to be processed in the Kafka topic. If I set the offset manually, it will pick up the message at that point, process it, and ignore all following messages.

2. Yes, the Kafka console consumer tool is spitting out the messages without problem. Btw, they are plain Strings, well, serialized JSON objects.

3. Code is a bit messy, but I have copied out the relevant parts below.

I also noticed that a LOT of exceptions are thrown ("breakpoint on any exception"), mostly ClassCastException, classes not found and NoSuchMethodException, but nothing that bubbles up out of the internals. Is this part of Scala raping the JVM, or just the normal JVM class loading sequence (no wonder it is so slow)? Is that expected?

I have tried to use both the ObjectMapper from Jackson proper, as well as the shadowed ObjectMapper in flink. No difference.

Recap; Positioning Kafka consumer to message 8th from the last. Only that message is consumed, the remaining 7 are ignored/swallowed.


Ok, so I think I have traced this down to something happening in the CassandraSink. There is a Exception being thrown somewhere, which I see as the Kafka09Fetcher.runFetchLoop()'s finally clause is called.

Found it (hours later in debugging), on this line (Flink 1.4.1)
org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258
which contains

future.addListener(callbackListener, executor); // IDEA says 'future' is of type DefaultResultSetFuture

throws an Exception without stepping into the addListener() method. There is nothing catching the Exception (and I don't want to go down the rabbit hole of building from source), so I can't really say what Exception is being thrown. IDEA doesn't seem to report it, and the catch clauses in OperatorChain.pushToOperator() (ClassCastException and Exception) are in the call stack, but doesn't catch it, which could suggest an java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO MANY classloading exception going on all the time.

Hold on a second... There are TWO com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One from the Cassandra client that I declared, and on from inside the flink-connector-cassandra_2.11 artifact...

So will it work if I remove my own dependency declaration and that's it?


YEEEEESSSSS!!! FInally.....


SOLVED!

-o-o-o-o-o-
public static void main( String[] args )
throws Exception
{
cli = CommandLine.populateCommand( new ServerCliOptions(), args );
initializeCassandra( cli );
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 );
// createPollDocPipeline( env );
createAdminPipeline( env );
env.execute( "schedule.poll" );
}



private static void createAdminPipeline( StreamExecutionEnvironment env )
{
try
{
FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
SplitStream<AdminCommand> adminStream =
env.addSource( adminSource )
.name( "scheduler.admin" )
.map( value -> {
try
{
return mapper.readValue( value, AdminCommand.class );
}
catch( Throwable e )
{
LOG.error( "Unexpected error deserializing AdminCommand", e );
return null;
}
} )
.name( "admin.command.read" )
.split( value -> singletonList( value.action() ) );

SingleOutputStreamOperator<Tuple3<List<String>, String, String>> insertStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
.map( new GetPollDeclaration() )
.name( "scheduler.admin.insert" )
.map( new PollDeclarationToTuple3Map() )
.name( "scheduler.pollDeclToTuple3" )
.filter( tuple -> tuple != null );

SingleOutputStreamOperator<Tuple3<List<String>, String, String>> deleteStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
.map( new GetPollDeclaration() )
.name( "scheduler.admin.delete" )
.map( new PollDeclarationToTuple3Map() )
.name( "scheduler.pollDeclToTuple3" )
.filter( tuple -> tuple != null );

CassandraSink.addSink( insertStream )
.setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
.setQuery( String.format( INSERT_SCHEDULE, cli.cassandraKeyspace ) )
.build();

CassandraSink.addSink( deleteStream )
.setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
.setQuery( String.format( DELETE_SCHEDULE, cli.cassandraKeyspace ) )
.build();
}
catch( Throwable e )
{
String message = "Unable to start Scheduling Admin";
LOG.error( message );
throw new RuntimeException( message, e );
}
}

private static class GetPollDeclaration
implements MapFunction<AdminCommand, PollDeclaration>
{
private static final Logger LOG = LoggerFactory.getLogger( GetPollDeclaration.class );

@Override
public PollDeclaration map( AdminCommand command )
throws Exception
{
try
{
if( command == null )
{
return null;
}
return (PollDeclaration) command.value();
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}

private static class PollDeclarationToTuple3Map
implements MapFunction<PollDeclaration, Tuple3<List<String>, String, String>>
{
@Override
public Tuple3<List<String>, String, String> map( PollDeclaration decl )
throws Exception
{
try
{
if( decl == null )
{
return null;
}
return new Tuple3<>( singletonList( mapper.writeValueAsString( decl ) ), decl.zoneId + ":" + decl.schedule, decl.url );
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}
Flink Dependencies;
flink         : [
[group: "org.apache.flink", name: "flink-core", version: flinkVersion],
[group: "org.apache.flink", name: "flink-java", version: flinkVersion],
[group: "org.apache.flink", name: "flink-connector-cassandra_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-connector-kafka-0.11_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-queryable-state-runtime_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-streaming-java_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-streaming-scala_2.11", version: flinkVersion]
],




On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <[hidden email]> wrote:
Hi Niclas,

About the second point you mentioned, was the processed message a random one or a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while debugging. Also, before that, you may try fetching the messages with the Kafka console consumer tool to see whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll be helpful.

Best,
Xingcan



On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <[hidden email]> wrote:


So, the producer is run (at the moment) manually (command-line) one message at a time.
Kafka's tooling (different consumer group) shows that a message is added each time.

Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske" <[hidden email]> wrote:
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the topic static?
If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1].

Best, Fabian


2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java





--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java
Reply | Threaded
Open this post in threaded view
|

Re: Only a single message processed

Fabian Hueske-2
Hi Niclas,

Glad that you got it working!
Thanks for sharing the problem and solution.

Best, Fabian

2018-02-19 9:29 GMT+01:00 Niclas Hedhman <[hidden email]>:

(Sorry for the incoherent order and ramblings. I am writing this as I am trying to sort out what is going on...)

1. It is the first message to be processed in the Kafka topic. If I set the offset manually, it will pick up the message at that point, process it, and ignore all following messages.

2. Yes, the Kafka console consumer tool is spitting out the messages without problem. Btw, they are plain Strings, well, serialized JSON objects.

3. Code is a bit messy, but I have copied out the relevant parts below.

I also noticed that a LOT of exceptions are thrown ("breakpoint on any exception"), mostly ClassCastException, classes not found and NoSuchMethodException, but nothing that bubbles up out of the internals. Is this part of Scala raping the JVM, or just the normal JVM class loading sequence (no wonder it is so slow)? Is that expected?

I have tried to use both the ObjectMapper from Jackson proper, as well as the shadowed ObjectMapper in flink. No difference.

Recap; Positioning Kafka consumer to message 8th from the last. Only that message is consumed, the remaining 7 are ignored/swallowed.


Ok, so I think I have traced this down to something happening in the CassandraSink. There is a Exception being thrown somewhere, which I see as the Kafka09Fetcher.runFetchLoop()'s finally clause is called.

Found it (hours later in debugging), on this line (Flink 1.4.1)
org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258
which contains

future.addListener(callbackListener, executor); // IDEA says 'future' is of type DefaultResultSetFuture

throws an Exception without stepping into the addListener() method. There is nothing catching the Exception (and I don't want to go down the rabbit hole of building from source), so I can't really say what Exception is being thrown. IDEA doesn't seem to report it, and the catch clauses in OperatorChain.pushToOperator() (ClassCastException and Exception) are in the call stack, but doesn't catch it, which could suggest an java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO MANY classloading exception going on all the time.

Hold on a second... There are TWO com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One from the Cassandra client that I declared, and on from inside the flink-connector-cassandra_2.11 artifact...

So will it work if I remove my own dependency declaration and that's it?


YEEEEESSSSS!!! FInally.....


SOLVED!

-o-o-o-o-o-
public static void main( String[] args )
throws Exception
{
cli = CommandLine.populateCommand( new ServerCliOptions(), args );
initializeCassandra( cli );
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 );
// createPollDocPipeline( env );
createAdminPipeline( env );
env.execute( "schedule.poll" );
}



private static void createAdminPipeline( StreamExecutionEnvironment env )
{
try
{
FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
SplitStream<AdminCommand> adminStream =
env.addSource( adminSource )
.name( "scheduler.admin" )
.map( value -> {
try
{
return mapper.readValue( value, AdminCommand.class );
}
catch( Throwable e )
{
LOG.error( "Unexpected error deserializing AdminCommand", e );
return null;
}
} )
.name( "admin.command.read" )
.split( value -> singletonList( value.action() ) );

SingleOutputStreamOperator<Tuple3<List<String>, String, String>> insertStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
.map( new GetPollDeclaration() )
.name( "scheduler.admin.insert" )
.map( new PollDeclarationToTuple3Map() )
.name( "scheduler.pollDeclToTuple3" )
.filter( tuple -> tuple != null );

SingleOutputStreamOperator<Tuple3<List<String>, String, String>> deleteStream =
adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
.map( new GetPollDeclaration() )
.name( "scheduler.admin.delete" )
.map( new PollDeclarationToTuple3Map() )
.name( "scheduler.pollDeclToTuple3" )
.filter( tuple -> tuple != null );

CassandraSink.addSink( insertStream )
.setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
.setQuery( String.format( INSERT_SCHEDULE, cli.cassandraKeyspace ) )
.build();

CassandraSink.addSink( deleteStream )
.setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
.setQuery( String.format( DELETE_SCHEDULE, cli.cassandraKeyspace ) )
.build();
}
catch( Throwable e )
{
String message = "Unable to start Scheduling Admin";
LOG.error( message );
throw new RuntimeException( message, e );
}
}

private static class GetPollDeclaration
implements MapFunction<AdminCommand, PollDeclaration>
{
private static final Logger LOG = LoggerFactory.getLogger( GetPollDeclaration.class );

@Override
public PollDeclaration map( AdminCommand command )
throws Exception
{
try
{
if( command == null )
{
return null;
}
return (PollDeclaration) command.value();
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}

private static class PollDeclarationToTuple3Map
implements MapFunction<PollDeclaration, Tuple3<List<String>, String, String>>
{
@Override
public Tuple3<List<String>, String, String> map( PollDeclaration decl )
throws Exception
{
try
{
if( decl == null )
{
return null;
}
return new Tuple3<>( singletonList( mapper.writeValueAsString( decl ) ), decl.zoneId + ":" + decl.schedule, decl.url );
}
catch( Throwable e )
{
LOG.error( "Unable to cast command data to PollDeclaration", e );
return null;
}
}
}
Flink Dependencies;
flink         : [
[group: "org.apache.flink", name: "flink-core", version: flinkVersion],
[group: "org.apache.flink", name: "flink-java", version: flinkVersion],
[group: "org.apache.flink", name: "flink-connector-cassandra_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-connector-kafka-0.11_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-queryable-state-runtime_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-streaming-java_2.11", version: flinkVersion],
[group: "org.apache.flink", name: "flink-streaming-scala_2.11", version: flinkVersion]
],




On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <[hidden email]> wrote:
Hi Niclas,

About the second point you mentioned, was the processed message a random one or a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while debugging. Also, before that, you may try fetching the messages with the Kafka console consumer tool to see whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll be helpful.

Best,
Xingcan



On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <[hidden email]> wrote:


So, the producer is run (at the moment) manually (command-line) one message at a time.
Kafka's tooling (different consumer group) shows that a message is added each time.

Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske" <[hidden email]> wrote:
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the topic static?
If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1].

Best, Fabian


2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it.
I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).

I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java





--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java