package flinkConnector; import java.util.ArrayList; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster.Builder; //import com.datastax.driver.core.utils.UUIDs; public class FlinkCassandraConnector { private static final ArrayList messages = new ArrayList<>(20); static { for (long i = 180; i <= 190; i++) { messages.add("cassandra-" + i); } } public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> stringStream = env.fromCollection(messages).map(new MapFunction>(){ public Tuple1 map(String value) throws Exception { return Tuple1.of(value ); } }); try { ClusterBuilder clusterObj = ClusterBuilderProvider.getClusterBuilder(); CassandraSink.addSink(stringStream) .setQuery("INSERT INTO test.message (body) values (?);").setClusterBuilder(clusterObj).build(); String SELECT_QUERY = "SELECT body FROM test.message;"; DataStream> inputDS = env .createInput(new CassandraInputFormat>(SELECT_QUERY, clusterObj), TupleTypeInfo.of(new TypeHint>() { })); inputDS.print(); env.execute(); } catch (Exception e) { e.printStackTrace(); } } }