Cassamdra Connector in Scala

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Cassamdra Connector in Scala

Eamon Kavanagh
Hey Mailing List,

I'm trying to use the Cassandra connector that came out recently (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html) in Scala but I'm having trouble with types when I use CassandraSink.addSink(in: DataStream). 

If I don't define the type it can't seem to properly infer it and if I do define the type I still get an error saying there's a type mismatch.  The compile errror is

error: type arguments [(String, String, Int),Any] do not conform to method addSink's type parameter bounds [IN,T <: org.apache.flink.api.java.tuple.Tuple]

Is this a Scala issue?  Should I switch over to Java?


Thanks!
Eamon
Reply | Threaded
Open this post in threaded view
|

Re: Cassamdra Connector in Scala

Jamie Grier
This looks like a simple type mismatch.  It's impossible to help with this without seeing your code, though.  Can you post it here?  Thanks.

-Jamie


On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <[hidden email]> wrote:
Hey Mailing List,

I'm trying to use the Cassandra connector that came out recently (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html) in Scala but I'm having trouble with types when I use CassandraSink.addSink(in: DataStream). 

If I don't define the type it can't seem to properly infer it and if I do define the type I still get an error saying there's a type mismatch.  The compile errror is

error: type arguments [(String, String, Int),Any] do not conform to method addSink's type parameter bounds [IN,T <: org.apache.flink.api.java.tuple.Tuple]

Is this a Scala issue?  Should I switch over to Java?


Thanks!
Eamon



--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: Cassamdra Connector in Scala

Eamon Kavanagh
Hey Jamie,

Here's a simple example that I modeled off of the github example (https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java).  Let me know if I'm doing something silly.



import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder

class Test extends App {

  val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"

  val iter = Iterator(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val source = env.fromCollection(iter)

  CassandraSink.addSink(source)
    .setQuery(INSERT)
    .setClusterBuilder(new ClusterBuilder() {
      override def buildCluster(builder: Builder) {
        builder.addContactPoint("127.0.0.1").build()
      }
    })
    .build()

  env.execute("WriteTupleIntoCassandra")
}


On Mon, Jun 20, 2016 at 10:53 PM, Jamie Grier <[hidden email]> wrote:
This looks like a simple type mismatch.  It's impossible to help with this without seeing your code, though.  Can you post it here?  Thanks.

-Jamie


On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <[hidden email]> wrote:
Hey Mailing List,

I'm trying to use the Cassandra connector that came out recently (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html) in Scala but I'm having trouble with types when I use CassandraSink.addSink(in: DataStream). 

If I don't define the type it can't seem to properly infer it and if I do define the type I still get an error saying there's a type mismatch.  The compile errror is

error: type arguments [(String, String, Int),Any] do not conform to method addSink's type parameter bounds [IN,T <: org.apache.flink.api.java.tuple.Tuple]

Is this a Scala issue?  Should I switch over to Java?


Thanks!
Eamon



--

Jamie Grier
data Artisans, Director of Applications Engineering


Reply | Threaded
Open this post in threaded view
|

Re: Cassamdra Connector in Scala

rmetzger0
Thank you for reporting the issue.
We are very happy if people try out new code before the release. Please keep testing our Cassandra connector and report errors or usability issues.

I was not able to compile your code using Scala 2.10, however, I got this version running: (I basically changed the iterator into a List).
I didn't get any type-related exceptions.

import com.datastax.driver.core.Cluster
import org.apache.flink.streaming.api.scala._
import collection.JavaConverters._
import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.{ClusterBuilder, CassandraSink}

object MLQ {
def main(args: Array[String]) {
val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"

val list = List(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))


val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromCollection(list.asJava)

CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Builder): Cluster = {
builder.addContactPoint("127.0.0.1").build()
}
})
.build()

env.execute("WriteTupleIntoCassandra")
}
}
What I got is the following (and I think its perfectly fine for not having a cassandra cluster running):

11:41:33,513 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class scala.Tuple2 must have a default constructor to be used as a POJO.
11:41:34,573 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
11:41:34,975 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
11:41:35,141 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
11:41:35,155 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-01be4415-af52-4def-856e-94626e3f4f22
11:41:35,156 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:40771 - max concurrent requests: 50 - max backlog: 1000
11:41:35,162 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job manager savepoint state backend.
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive_1
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka://flink/user/jobmanager_1.
11:41:35,171 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
11:41:35,175 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
11:41:35,180 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-885000089] - leader session null
11:41:35,182 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
11:41:35,185 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Temporary file directory '/tmp': total 7 GB, usable 7 GB (100.00% usable)
11:41:35,397 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
11:41:35,398 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 1191 MB, memory will be allocated lazily.
11:41:35,400 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900 for spill files.
11:41:35,407 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /tmp/flink-dist-cache-2775b3c6-ca0f-43a2-8432-623c8c99880c
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager_1#2005524421.
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: localhost.localdomain (dataPort=34395)
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 8 task slot(s).
11:41:35,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 85/240/3541 MB, NON HEAP: 27/28/-1 MB (used/committed/max)]
11:41:36,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
11:41:36,026 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager ResourceID{resourceId='0d1a407780ffa127acdd6b036c4867a8'} has registered.
11:41:36,028 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 6ef2a28a61851c9639ad74a7b3ba4cc8. Current number of registered hosts is 1. Current number of alive task slots is 8.
11:41:36,034 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
11:41:36,039 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:40771. Starting BLOB cache.
11:41:36,040 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-295ac2e2-91af-4a9b-a761-60fbd1a86b78
11:41:36,042 INFO  org.apache.flink.metrics.MetricRegistry                       - No metrics reporter configured, exposing metrics via JMX
11:41:36,051 INFO  org.apache.flink.runtime.client.JobClientActor                - Received job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c).
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor                - Could not submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c), because there is no connection to a JobManager.
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager null.
11:41:36,054 INFO  org.apache.flink.runtime.client.JobClientActor                - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-885000089].
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Connected to new JobManager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c) and wait for progress
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Upload jar files to job manager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Submit job to the job manager akka://flink/user/jobmanager_1.
11:41:36,057 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra).
11:41:36,095 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for 502e42477e7a0161c3e678dcabbe1b0c.
11:41:36,125 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra).
11:41:36,125 INFO  org.apache.flink.runtime.client.JobClientActor                - Job was successfully submitted to the JobManager akka://flink/user/jobmanager_1.
11:41:36,126 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Job execution switched to status RUNNING.
06/21/2016 11:41:36 Job execution switched to status RUNNING.
11:41:36,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from CREATED to SCHEDULED
11:41:36,127 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED 
11:41:36,128 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra) changed to RUNNING.
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from SCHEDULED to DEPLOYING
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Collection Source (1/1) (attempt #0) to localhost
11:41:36,130 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING 
11:41:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from CREATED to SCHEDULED
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from SCHEDULED to DEPLOYING
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (1/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from CREATED to SCHEDULED
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from SCHEDULED to DEPLOYING
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (2/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (3/8) (attempt #0) to localhost
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (4/8) (attempt #0) to localhost
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING 
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from CREATED to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (5/8) (attempt #0) to localhost
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from CREATED to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (6/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from CREATED to SCHEDULED
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from SCHEDULED to DEPLOYING
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (7/8) (attempt #0) to localhost
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from CREATED to SCHEDULED
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from SCHEDULED to DEPLOYING
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (8/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING 
11:41:36,155 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Source: Collection Source (1/1)
11:41:36,157 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Collection Source (1/1)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Collection Source (1/1) [DEPLOYING]
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (1/8)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (1/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (1/8) [DEPLOYING]
11:41:36,164 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (2/8) [DEPLOYING]
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (1/8) switched to RUNNING
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (2/8) switched to RUNNING
11:41:36,169 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) switched to RUNNING
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (3/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (4/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (3/8)
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (3/8) [DEPLOYING]
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (3/8) switched to RUNNING
11:41:36,173 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (4/8)
11:41:36,175 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (4/8) [DEPLOYING]
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (4/8) switched to RUNNING
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (5/8)
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (5/8)
11:41:36,177 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (5/8) [DEPLOYING]
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (5/8) switched to RUNNING
11:41:36,177 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,178 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,178 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (6/8)
11:41:36,179 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,179 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,180 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (7/8)
11:41:36,181 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (8/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (7/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (7/8) [DEPLOYING]
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (7/8) switched to RUNNING
11:41:36,183 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,183 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,184 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (6/8)
11:41:36,185 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (8/8)
11:41:36,186 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from DEPLOYING to RUNNING
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (6/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING 
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (6/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING 
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (8/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (8/8) switched to RUNNING
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING 
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING 
11:41:36,190 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,190 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from DEPLOYING to RUNNING
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING 
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING 
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING 
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING 
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING 
11:41:36,196 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from DEPLOYING to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING 
11:41:36,196 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING 
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) switched to FINISHED
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Collection Source (1/1)
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source (3ab880aed927f4375ec55fcd76c05fb5)
11:41:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from RUNNING to FINISHED
11:41:36,207 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED 
11:41:36,373 INFO  com.datastax.driver.core.NettyUtil                            - Found Netty's native epoll transport in the classpath, using it
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,516 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (3/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,898 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (6/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (4/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,904 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,906 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (5/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (7/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (8/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (1/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,911 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (2/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (5/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (2/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (7/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (8/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (1/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (3/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (4/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (6/8)
11:41:43,920 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (da635c97adfd28fdd8b7ff1553a653a6)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (5b11c48da29711e75b8c1dee3491b9b1)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (5dc9a026f642fe50c29cbd895152cf01)
11:41:43,922 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (2695a57c8a28412e62d58f7dbe379def)
11:41:43,924 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (d5d15f4eeb628ecc89751aa22bc9fbef)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (696224c4e158a7925b2d8ae7fc17991f)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (2fcf053fb92f9a4eccd75781a15b5d62)
11:41:43,926 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (31e4ff30d1360f1e4c10090c0a59a498)
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from RUNNING to FAILED
06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from RUNNING to CANCELING
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,929 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(1/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(1/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,929 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 3ab880aed927f4375ec55fcd76c05fb5
11:41:43,929 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Job execution switched to status FAILING.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
06/21/2016 11:41:43 Job execution switched to status FAILING.
11:41:43,931 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from CANCELING to CANCELED
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,931 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra) changed to FAILING.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(4/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(4/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(7/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(7/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELING 
06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELING 
11:41:43,932 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(8/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(8/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,933 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution d5d15f4eeb628ecc89751aa22bc9fbef
11:41:43,933 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELED 
06/21/2016 11:41:43 Sink: Cassandra Sink(2/8) switched to CANCELED 
11:41:43,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra) changed to FAILED.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,933 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Job execution switched to status FAILED.
06/21/2016 11:41:43 Job execution switched to status FAILED.
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 2695a57c8a28412e62d58f7dbe379def
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 5dc9a026f642fe50c29cbd895152cf01
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution da635c97adfd28fdd8b7ff1553a653a6
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 5b11c48da29711e75b8c1dee3491b9b1
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 2fcf053fb92f9a4eccd75781a15b5d62
11:41:43,934 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 696224c4e158a7925b2d8ae7fc17991f
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClientActor                - Terminate JobClientActor.
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClient                     - Job execution failed
11:41:43,937 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-885000089].
11:41:43,937 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Stopping FlinkMiniCluster.
11:41:43,940 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Stopping JobManager akka://flink/user/jobmanager_1.
11:41:43,940 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Stopping TaskManager akka://flink/user/taskmanager_1#2005524421.
11:41:43,941 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
11:41:43,942 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
11:41:43,945 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:40771
11:41:43,947 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900
11:41:43,948 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 1


On Tue, Jun 21, 2016 at 6:36 AM, Eamon Kavanagh <[hidden email]> wrote:
Hey Jamie,

Here's a simple example that I modeled off of the github example (https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java).  Let me know if I'm doing something silly.



import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder

class Test extends App {

  val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"

  val iter = Iterator(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val source = env.fromCollection(iter)

  CassandraSink.addSink(source)
    .setQuery(INSERT)
    .setClusterBuilder(new ClusterBuilder() {
      override def buildCluster(builder: Builder) {
        builder.addContactPoint("127.0.0.1").build()
      }
    })
    .build()

  env.execute("WriteTupleIntoCassandra")
}


On Mon, Jun 20, 2016 at 10:53 PM, Jamie Grier <[hidden email]> wrote:
This looks like a simple type mismatch.  It's impossible to help with this without seeing your code, though.  Can you post it here?  Thanks.

-Jamie


On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <[hidden email]> wrote:
Hey Mailing List,

I'm trying to use the Cassandra connector that came out recently (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html) in Scala but I'm having trouble with types when I use CassandraSink.addSink(in: DataStream). 

If I don't define the type it can't seem to properly infer it and if I do define the type I still get an error saying there's a type mismatch.  The compile errror is

error: type arguments [(String, String, Int),Any] do not conform to method addSink's type parameter bounds [IN,T <: org.apache.flink.api.java.tuple.Tuple]

Is this a Scala issue?  Should I switch over to Java?


Thanks!
Eamon



--

Jamie Grier
data Artisans, Director of Applications Engineering



Reply | Threaded
Open this post in threaded view
|

Re: Cassamdra Connector in Scala

Aljoscha Krettek
Hi,
I think the root problem is that the CassandraSink methods that can work with tuples accept the Tuple type that comes with Flink and not the Scala Tuple types. If I'm not mistaken Robert is using the Flink Tuple types in his example, that's why it works.

Cheers,
Aljoscha 

On Tue, 21 Jun 2016 at 11:54 Robert Metzger <[hidden email]> wrote:
Thank you for reporting the issue.
We are very happy if people try out new code before the release. Please keep testing our Cassandra connector and report errors or usability issues.

I was not able to compile your code using Scala 2.10, however, I got this version running: (I basically changed the iterator into a List).
I didn't get any type-related exceptions.

import com.datastax.driver.core.Cluster
import org.apache.flink.streaming.api.scala._
import collection.JavaConverters._
import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.cassandra.{ClusterBuilder, CassandraSink}

object MLQ {
def main(args: Array[String]) {

val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"

    val list = List(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))


val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromCollection(list.asJava)

CassandraSink.addSink(source)
.setQuery(INSERT)
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Builder): Cluster = {

builder.addContactPoint("127.0.0.1").build()
}
})
.build()

env.execute("WriteTupleIntoCassandra")
}
}
What I got is the following (and I think its perfectly fine for not having a cassandra cluster running):

11:41:33,513 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class scala.Tuple2 must have a default constructor to be used as a POJO.
11:41:34,573 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
11:41:34,975 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
11:41:35,141 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
11:41:35,155 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-01be4415-af52-4def-856e-94626e3f4f22
11:41:35,156 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:40771 - max concurrent requests: 50 - max backlog: 1000
11:41:35,162 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job manager savepoint state backend.
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive_1
11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka://flink/user/jobmanager_1.
11:41:35,171 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
11:41:35,175 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
11:41:35,180 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-885000089] - leader session null
11:41:35,182 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
11:41:35,185 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Temporary file directory '/tmp': total 7 GB, usable 7 GB (100.00% usable)
11:41:35,397 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
11:41:35,398 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 1191 MB, memory will be allocated lazily.
11:41:35,400 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900 for spill files.
11:41:35,407 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /tmp/flink-dist-cache-2775b3c6-ca0f-43a2-8432-623c8c99880c
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager_1#2005524421.
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: localhost.localdomain (dataPort=34395)
11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 8 task slot(s).
11:41:35,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 85/240/3541 MB, NON HEAP: 27/28/-1 MB (used/committed/max)]
11:41:36,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
11:41:36,026 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager ResourceID{resourceId='0d1a407780ffa127acdd6b036c4867a8'} has registered.
11:41:36,028 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 6ef2a28a61851c9639ad74a7b3ba4cc8. Current number of registered hosts is 1. Current number of alive task slots is 8.
11:41:36,034 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
11:41:36,039 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:40771. Starting BLOB cache.
11:41:36,040 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-295ac2e2-91af-4a9b-a761-60fbd1a86b78
11:41:36,042 INFO  org.apache.flink.metrics.MetricRegistry                       - No metrics reporter configured, exposing metrics via JMX
11:41:36,051 INFO  org.apache.flink.runtime.client.JobClientActor                - Received job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c).
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor                - Could not submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c), because there is no connection to a JobManager.
11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager null.
11:41:36,054 INFO  org.apache.flink.runtime.client.JobClientActor                - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-885000089].
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Connected to new JobManager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c) and wait for progress
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Upload jar files to job manager akka://flink/user/jobmanager_1.
11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor                - Submit job to the job manager akka://flink/user/jobmanager_1.
11:41:36,057 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra).
11:41:36,095 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for 502e42477e7a0161c3e678dcabbe1b0c.
11:41:36,125 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra).
11:41:36,125 INFO  org.apache.flink.runtime.client.JobClientActor                - Job was successfully submitted to the JobManager akka://flink/user/jobmanager_1.
11:41:36,126 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Job execution switched to status RUNNING.
06/21/2016 11:41:36 Job execution switched to status RUNNING.
11:41:36,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from CREATED to SCHEDULED
11:41:36,127 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED 
11:41:36,128 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 502e42477e7a0161c3e678dcabbe1b0c (WriteTupleIntoCassandra) changed to RUNNING.
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from SCHEDULED to DEPLOYING
11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Collection Source (1/1) (attempt #0) to localhost
11:41:36,130 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING 
11:41:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from CREATED to SCHEDULED
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from SCHEDULED to DEPLOYING
11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (1/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from CREATED to SCHEDULED
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from SCHEDULED to DEPLOYING
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (2/8) (attempt #0) to localhost
11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (3/8) (attempt #0) to localhost
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from CREATED to SCHEDULED
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from SCHEDULED to DEPLOYING
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (4/8) (attempt #0) to localhost
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING 
11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from CREATED to SCHEDULED
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING 
11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED 
11:41:36,137 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (5/8) (attempt #0) to localhost
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from CREATED to SCHEDULED
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from SCHEDULED to DEPLOYING
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (6/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING 
11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from CREATED to SCHEDULED
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from SCHEDULED to DEPLOYING
11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED 
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (7/8) (attempt #0) to localhost
11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from CREATED to SCHEDULED
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from SCHEDULED to DEPLOYING
11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Sink: Cassandra Sink (8/8) (attempt #0) to localhost
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED 
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED 
11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING 
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING 
11:41:36,155 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Source: Collection Source (1/1)
11:41:36,157 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Collection Source (1/1)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Collection Source (1/1) [DEPLOYING]
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (1/8)
11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (1/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (2/8)
11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (1/8) [DEPLOYING]
11:41:36,164 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (2/8) [DEPLOYING]
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (1/8) switched to RUNNING
11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (2/8) switched to RUNNING
11:41:36,169 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) switched to RUNNING
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (3/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (4/8)
11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (3/8)
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (3/8) [DEPLOYING]
11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (3/8) switched to RUNNING
11:41:36,173 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (4/8)
11:41:36,175 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (4/8) [DEPLOYING]
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (4/8) switched to RUNNING
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (5/8)
11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (5/8)
11:41:36,177 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (5/8) [DEPLOYING]
11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (5/8) switched to RUNNING
11:41:36,177 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,178 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,178 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (6/8)
11:41:36,179 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,179 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,180 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (7/8)
11:41:36,181 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Sink: Cassandra Sink (8/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (7/8)
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (7/8) [DEPLOYING]
11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (7/8) switched to RUNNING
11:41:36,183 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,183 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,184 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (6/8)
11:41:36,185 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Sink: Cassandra Sink (8/8)
11:41:36,186 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from DEPLOYING to RUNNING
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (6/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING 
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (6/8) switched to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING 
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Sink: Cassandra Sink (8/8) [DEPLOYING]
11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (8/8) switched to RUNNING
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from DEPLOYING to RUNNING
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING 
11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING 
11:41:36,190 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
11:41:36,190 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from DEPLOYING to RUNNING
11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from DEPLOYING to RUNNING
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING 
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING 
11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING 
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from DEPLOYING to RUNNING
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING 
11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING 
11:41:36,196 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from DEPLOYING to RUNNING
06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING 
11:41:36,196 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING 
06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING 
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source (1/1) switched to FINISHED
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Collection Source (1/1)
11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source (3ab880aed927f4375ec55fcd76c05fb5)
11:41:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5) switched from RUNNING to FINISHED
11:41:36,207 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED 
06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED 
11:41:36,373 INFO  com.datastax.driver.core.NettyUtil                            - Found Netty's native epoll transport in the classpath, using it
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,516 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:41,515 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error while closing session.
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,896 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (3/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,898 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (6/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (4/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,904 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,906 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (5/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (7/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed. 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (8/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (1/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,911 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Cassandra Sink (2/8) switched to FAILED with exception.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (5/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (2/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (7/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (8/8)
11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (1/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (3/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (4/8)
11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Sink: Cassandra Sink (6/8)
11:41:43,920 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (da635c97adfd28fdd8b7ff1553a653a6)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (5b11c48da29711e75b8c1dee3491b9b1)
11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (5dc9a026f642fe50c29cbd895152cf01)
11:41:43,922 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (2695a57c8a28412e62d58f7dbe379def)
11:41:43,924 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (d5d15f4eeb628ecc89751aa22bc9fbef)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (696224c4e158a7925b2d8ae7fc17991f)
11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (2fcf053fb92f9a4eccd75781a15b5d62)
11:41:43,926 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Sink: Cassandra Sink (31e4ff30d1360f1e4c10090c0a59a498)
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef) switched from RUNNING to FAILED
11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01) switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62) switched from RUNNING to FAILED
06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f) switched from RUNNING to FAILED
11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498) switched from RUNNING to CANCELING
11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor                - 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)

06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED 
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
<span cla