Unit tests failing, losing stream contents

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit tests failing, losing stream contents

David B. Ciar

Hi everyone,


I've been trying to write unit tests for my data stream bolts (map, flatMap, apply etc.), however the results I've been getting are strange.  The code for testing is here (running with scalatest and sbt):


https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff


It runs the stream process environment once for each check, and for one of the checks (output below) I get an "IllegalStateException: Factory has already been initialized" which I'm not sure of the cause, while for the rest I get an IndexOutOfboundsException. 


The index exception is strange, as the index positions refer to the same number of input tuples to the stream, so it is as if some are being lost, or, the assert is running before the stream has completed processing and adding objects to rawStreamOutput: Iterator[RawObservation] object.


Any pointers on what might be happening would be appreciated.  Also if anyone has suggestions on how to incorporate the redis server in this check, as I had to comment out the server definition here and run it separately to get to the current position.


The two types of exception are first this:


[info] - Do well-formed observations parse OK? *** FAILED ***
[info]   java.lang.IllegalStateException: Factory has already been initialized
[info]   at org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
[info]   at org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
[info]   at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
[info]   at org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
[info]   at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
[info]   at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

The rest are as follows, with the index being the position in the rawStreamOutput Iterator[RawObservation] object I expect:


[info]   java.lang.IndexOutOfBoundsException: 3
[info]   at scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
[info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
[info]   at ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)



Thanks,

David



This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system.
Reply | Threaded
Open this post in threaded view
|

Re: Unit tests failing, losing stream contents

Maximilian Michels
Hi David,

You're starting two executions at the same time (in different
threads). Here's why:

Execution No 1
DataStreamUtils.collect(..) starts a Thread which executes your job
and collects stream elements. It runs asynchronously. The collect(..)
method returns after starting the thread.

Execution No 2
env.execute() also executes your job.

Now these two race against each other causing all kinds of strange behavior.

In general, the use of DataStreamUtils is discouraged. You should
rather define a sink to write your data into a file. Or, you directly
verify behavior as part of your Flink job (e.g. in a map function).

Cheers,
Max

On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <[hidden email]> wrote:

> Hi everyone,
>
>
> I've been trying to write unit tests for my data stream bolts (map, flatMap,
> apply etc.), however the results I've been getting are strange.  The code
> for testing is here (running with scalatest and sbt):
>
>
> https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
>
>
> It runs the stream process environment once for each check, and for one of
> the checks (output below) I get an "IllegalStateException: Factory has
> already been initialized" which I'm not sure of the cause, while for the
> rest I get an IndexOutOfboundsException.
>
>
> The index exception is strange, as the index positions refer to the same
> number of input tuples to the stream, so it is as if some are being lost,
> or, the assert is running before the stream has completed processing and
> adding objects to rawStreamOutput: Iterator[RawObservation] object.
>
>
> Any pointers on what might be happening would be appreciated.  Also if
> anyone has suggestions on how to incorporate the redis server in this check,
> as I had to comment out the server definition here and run it separately to
> get to the current position.
>
>
> The two types of exception are first this:
>
>
> [info] - Do well-formed observations parse OK? *** FAILED ***
> [info]   java.lang.IllegalStateException: Factory has already been
> initialized
> [info]   at
> org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> [info]   at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> [info]   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
> The rest are as follows, with the index being the position in the
> rawStreamOutput Iterator[RawObservation] object I expect:
>
>
> [info]   java.lang.IndexOutOfBoundsException: 3
> [info]   at
> scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> [info]   at
> ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>
>
>
> Thanks,
>
> David
>
>
> ________________________________
> This message (and any attachments) is for the recipient only. NERC is
> subject to the Freedom of Information Act 2000 and the contents of this
> email and any reply you make may be disclosed by NERC unless it is exempt
> from release under the Act. Any material supplied to NERC may be stored in
> an electronic records management system.
> ________________________________
Reply | Threaded
Open this post in threaded view
|

Re: Unit tests failing, losing stream contents

Stephan Ewen
Hi David!

I would guess that the first exception happens once in a while, as part of a rare race condition. As Max said, two executions happen simultaneously.
We should fix that race condition, though.

The second exception looks like it is purely part of your application code.

Greetings,
Stephan

On Fri, Aug 12, 2016 at 11:37 AM, Maximilian Michels <[hidden email]> wrote:
Hi David,

You're starting two executions at the same time (in different
threads). Here's why:

Execution No 1
DataStreamUtils.collect(..) starts a Thread which executes your job
and collects stream elements. It runs asynchronously. The collect(..)
method returns after starting the thread.

Execution No 2
env.execute() also executes your job.

Now these two race against each other causing all kinds of strange behavior.

In general, the use of DataStreamUtils is discouraged. You should
rather define a sink to write your data into a file. Or, you directly
verify behavior as part of your Flink job (e.g. in a map function).

Cheers,
Max

On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <[hidden email]> wrote:
> Hi everyone,
>
>
> I've been trying to write unit tests for my data stream bolts (map, flatMap,
> apply etc.), however the results I've been getting are strange.  The code
> for testing is here (running with scalatest and sbt):
>
>
> https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
>
>
> It runs the stream process environment once for each check, and for one of
> the checks (output below) I get an "IllegalStateException: Factory has
> already been initialized" which I'm not sure of the cause, while for the
> rest I get an IndexOutOfboundsException.
>
>
> The index exception is strange, as the index positions refer to the same
> number of input tuples to the stream, so it is as if some are being lost,
> or, the assert is running before the stream has completed processing and
> adding objects to rawStreamOutput: Iterator[RawObservation] object.
>
>
> Any pointers on what might be happening would be appreciated.  Also if
> anyone has suggestions on how to incorporate the redis server in this check,
> as I had to comment out the server definition here and run it separately to
> get to the current position.
>
>
> The two types of exception are first this:
>
>
> [info] - Do well-formed observations parse OK? *** FAILED ***
> [info]   java.lang.IllegalStateException: Factory has already been
> initialized
> [info]   at
> org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> [info]   at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> [info]   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
> The rest are as follows, with the index being the position in the
> rawStreamOutput Iterator[RawObservation] object I expect:
>
>
> [info]   java.lang.IndexOutOfBoundsException: 3
> [info]   at
> scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> [info]   at
> ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>
>
>
> Thanks,
>
> David
>
>
> ________________________________
> This message (and any attachments) is for the recipient only. NERC is
> subject to the Freedom of Information Act 2000 and the contents of this
> email and any reply you make may be disclosed by NERC unless it is exempt
> from release under the Act. Any material supplied to NERC may be stored in
> an electronic records management system.
> ________________________________

Reply | Threaded
Open this post in threaded view
|

Re: Unit tests failing, losing stream contents

David B. Ciar

Hello Maximilian, Stephan,


Thanks for the help/information.  For the race condition, that makes sense - I'll stop using DataStreamUtils.  FYI it is still listed on the documentation page:


https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#iterator-data-sink


For the second exception, I thought it was because I was trying to access objects in the iterator that correspond to the input `env.fromElements`.  As there were seven inputs, I expected seven outputs, and I assumed they would have been processed before the `.collect` method returned as even though it's a stream environment, the input to the stream is a bounded collection.  With the advice that collect() returns once it starts the thread though, I can see why the exception is thrown.


I'll try Maximilian's advice and write to a system file.


Thanks,

David


From: Stephan Ewen <[hidden email]>
Sent: 12 August 2016 10:39:16
To: [hidden email]
Subject: Re: Unit tests failing, losing stream contents
 
Hi David!

I would guess that the first exception happens once in a while, as part of a rare race condition. As Max said, two executions happen simultaneously.
We should fix that race condition, though.

The second exception looks like it is purely part of your application code.

Greetings,
Stephan

On Fri, Aug 12, 2016 at 11:37 AM, Maximilian Michels <[hidden email]> wrote:
Hi David,

You're starting two executions at the same time (in different
threads). Here's why:

Execution No 1
DataStreamUtils.collect(..) starts a Thread which executes your job
and collects stream elements. It runs asynchronously. The collect(..)
method returns after starting the thread.

Execution No 2
env.execute() also executes your job.

Now these two race against each other causing all kinds of strange behavior.

In general, the use of DataStreamUtils is discouraged. You should
rather define a sink to write your data into a file. Or, you directly
verify behavior as part of your Flink job (e.g. in a map function).

Cheers,
Max

On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <[hidden email]> wrote:
> Hi everyone,
>
>
> I've been trying to write unit tests for my data stream bolts (map, flatMap,
> apply etc.), however the results I've been getting are strange.  The code
> for testing is here (running with scalatest and sbt):
>
>
> https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
>
>
> It runs the stream process environment once for each check, and for one of
> the checks (output below) I get an "IllegalStateException: Factory has
> already been initialized" which I'm not sure of the cause, while for the
> rest I get an IndexOutOfboundsException.
>
>
> The index exception is strange, as the index positions refer to the same
> number of input tuples to the stream, so it is as if some are being lost,
> or, the assert is running before the stream has completed processing and
> adding objects to rawStreamOutput: Iterator[RawObservation] object.
>
>
> Any pointers on what might be happening would be appreciated.  Also if
> anyone has suggestions on how to incorporate the redis server in this check,
> as I had to comment out the server definition here and run it separately to
> get to the current position.
>
>
> The two types of exception are first this:
>
>
> [info] - Do well-formed observations parse OK? *** FAILED ***
> [info]   java.lang.IllegalStateException: Factory has already been
> initialized
> [info]   at
> org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> [info]   at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> [info]   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
> The rest are as follows, with the index being the position in the
> rawStreamOutput Iterator[RawObservation] object I expect:
>
>
> [info]   java.lang.IndexOutOfBoundsException: 3
> [info]   at
> scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> [info]   at
> ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>
>
>
> Thanks,
>
> David
>
>
> ________________________________
> This message (and any attachments) is for the recipient only. NERC is
> subject to the Freedom of Information Act 2000 and the contents of this
> email and any reply you make may be disclosed by NERC unless it is exempt
> from release under the Act. Any material supplied to NERC may be stored in
> an electronic records management system.
> ________________________________


This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system.
Reply | Threaded
Open this post in threaded view
|

Re: Unit tests failing, losing stream contents

David B. Ciar
In reply to this post by Stephan Ewen

Hello again Stephan,


I tried modifying the way I call the stream job, while still using the DataStreamUtils.collect(), and I found that it behaves as expected when suites are run individually, or a small number of suites are run as a group. 


My different suites work when calling `test` within sbt, however, I've found I've got to a point where a larger number of suites always leads to around 2 or 3 being aborted due to the `Factory has already been initialized` exception below, while a smaller number of suites is OK.


The modified code for one of the suites is here so you can see the outline:


https://gist.github.com/dbciar/247c4513980c76bf0dd214fcbcc45545


I thought it might be of interest as you said in your reply the `Factory` output indicated something to fix, and it's happening in (what appears to me to be) a slightly different situation where I _think_ one stream environment instance isn't being stopped before another is being started?


Either way, I thought it might be of interest,


Cheers,

David






info] QCBlockLogic_MetaValue *** ABORTED ***
[info]   java.lang.RuntimeException: Failed to receive next element: Receiving stream failed: Factory has already been initialized
[info]   at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:114)
[info]   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
[info]   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[info]   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
[info]   at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:732)
[info]   at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:708)
[info]   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
[info]   at scala.collection.AbstractIterator.to(Iterator.scala:1336)
[info]   at scala.collection.TraversableOnce$class.toIndexedSeq(TraversableOnce.scala:300)
[info]   ...
[info]   Cause: java.lang.Exception: Receiving stream failed: Factory has already been initialized
[info]   at org.apache.flink.contrib.streaming.SocketStreamIterator.readNextFromStream(SocketStreamIterator.java:168)
[info]   at org.apache.flink.contrib.streaming.SocketStreamIterator.hasNext(SocketStreamIterator.java:112)
[info]   at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
[info]   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
[info]   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
[info]   at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:732)
[info]   at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:708)
[info]   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
[info]   at scala.collection.AbstractIterator.to(Iterator.scala:1336)
[info]   ...
[info]   Cause: java.lang.IllegalStateException: Factory has already been initialized
[info]   at org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
[info]   at org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
[info]   at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
[info]   at org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
[info]   at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
[info]   at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)




From: Stephan Ewen <[hidden email]>
Sent: 12 August 2016 10:39
To: [hidden email]
Subject: Re: Unit tests failing, losing stream contents
 
Hi David!

I would guess that the first exception happens once in a while, as part of a rare race condition. As Max said, two executions happen simultaneously.
We should fix that race condition, though.

The second exception looks like it is purely part of your application code.

Greetings,
Stephan

On Fri, Aug 12, 2016 at 11:37 AM, Maximilian Michels <[hidden email]> wrote:
Hi David,

You're starting two executions at the same time (in different
threads). Here's why:

Execution No 1
DataStreamUtils.collect(..) starts a Thread which executes your job
and collects stream elements. It runs asynchronously. The collect(..)
method returns after starting the thread.

Execution No 2
env.execute() also executes your job.

Now these two race against each other causing all kinds of strange behavior.

In general, the use of DataStreamUtils is discouraged. You should
rather define a sink to write your data into a file. Or, you directly
verify behavior as part of your Flink job (e.g. in a map function).

Cheers,
Max

On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <[hidden email]> wrote:
> Hi everyone,
>
>
> I've been trying to write unit tests for my data stream bolts (map, flatMap,
> apply etc.), however the results I've been getting are strange.  The code
> for testing is here (running with scalatest and sbt):
>
>
> https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
>
>
> It runs the stream process environment once for each check, and for one of
> the checks (output below) I get an "IllegalStateException: Factory has
> already been initialized" which I'm not sure of the cause, while for the
> rest I get an IndexOutOfboundsException.
>
>
> The index exception is strange, as the index positions refer to the same
> number of input tuples to the stream, so it is as if some are being lost,
> or, the assert is running before the stream has completed processing and
> adding objects to rawStreamOutput: Iterator[RawObservation] object.
>
>
> Any pointers on what might be happening would be appreciated.  Also if
> anyone has suggestions on how to incorporate the redis server in this check,
> as I had to comment out the server definition here and run it separately to
> get to the current position.
>
>
> The two types of exception are first this:
>
>
> [info] - Do well-formed observations parse OK? *** FAILED ***
> [info]   java.lang.IllegalStateException: Factory has already been
> initialized
> [info]   at
> org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> [info]   at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> [info]   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
> The rest are as follows, with the index being the position in the
> rawStreamOutput Iterator[RawObservation] object I expect:
>
>
> [info]   java.lang.IndexOutOfBoundsException: 3
> [info]   at
> scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> [info]   at
> ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>
>
>
> Thanks,
>
> David
>
>
> ________________________________
> This message (and any attachments) is for the recipient only. NERC is
> subject to the Freedom of Information Act 2000 and the contents of this
> email and any reply you make may be disclosed by NERC unless it is exempt
> from release under the Act. Any material supplied to NERC may be stored in
> an electronic records management system.
> ________________________________


This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system.