Stream in loop and not getting to sink (Parquet writer )

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream in loop and not getting to sink (Parquet writer )

avilevi
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Rafi Aroch
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi

Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

vipul singh
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi Avi,

The ParquetAvroWriters cannot be used with the BucketingSink.

In fact the StreamingFIleSink is the "evolution" of the BucketingSink and it supports
all the functionality that the BucketingSink supports.

Given this, why not using the StreamingFileSink?

On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <[hidden email]> wrote:
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks Kostas. I will definitely look into that. but is the StreamingFileSink also support setting the batch size by size and/or by time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

The ParquetAvroWriters cannot be used with the BucketingSink.

In fact the StreamingFIleSink is the "evolution" of the BucketingSink and it supports
all the functionality that the BucketingSink supports.

Given this, why not using the StreamingFileSink?

On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <[hidden email]> wrote:
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi Avi,

For Bulk Formats like Parquet, unfortunately, we do not support setting the batch size.
The part-files roll on every checkpoint. This is a known limitation and there are plans to 
alleviate it in the future.

Setting the batch size (among other things) is supported for RowWise formats.

Cheers,
Kostas

On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <[hidden email]> wrote:
Thanks Kostas. I will definitely look into that. but is the StreamingFileSink also support setting the batch size by size and/or by time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

The ParquetAvroWriters cannot be used with the BucketingSink.

In fact the StreamingFIleSink is the "evolution" of the BucketingSink and it supports
all the functionality that the BucketingSink supports.

Given this, why not using the StreamingFileSink?

On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <[hidden email]> wrote:
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

avilevi
Thanks Kostas,
Ok got it, so bucketingSink might not be a good choice here. can you please advice what will be the best approach ? I have heavy load of data that I consume from kafka that I want to process and put them in a file (doesn't have to be parquet) . I thought that StreamingFileSink might be a good choice but I guess I am doing something wrong there . if there is a good example for that - it will be great .

BR
Avi

On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

For Bulk Formats like Parquet, unfortunately, we do not support setting the batch size.
The part-files roll on every checkpoint. This is a known limitation and there are plans to 
alleviate it in the future.

Setting the batch size (among other things) is supported for RowWise formats.

Cheers,
Kostas

On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <[hidden email]> wrote:
Thanks Kostas. I will definitely look into that. but is the StreamingFileSink also support setting the batch size by size and/or by time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

The ParquetAvroWriters cannot be used with the BucketingSink.

In fact the StreamingFIleSink is the "evolution" of the BucketingSink and it supports
all the functionality that the BucketingSink supports.

Given this, why not using the StreamingFileSink?

On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <[hidden email]> wrote:
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul
Reply | Threaded
Open this post in threaded view
|

Re: Stream in loop and not getting to sink (Parquet writer )

Kostas Kloudas
Hi Avi,

If Parquet is not a requirement then you can use the StreamingFileSink and write as plain text, if this is ok for you.
In this case, you can set the batch size and specify a custom RollingPolicy in general.

For example I would recommend to check [1] where you have, of course, to adjust the Encoder and the RollingPolicy. 


Cheers,
Kostas

On Mon, Dec 3, 2018 at 3:50 PM Avi Levi <[hidden email]> wrote:
Thanks Kostas,
Ok got it, so bucketingSink might not be a good choice here. can you please advice what will be the best approach ? I have heavy load of data that I consume from kafka that I want to process and put them in a file (doesn't have to be parquet) . I thought that StreamingFileSink might be a good choice but I guess I am doing something wrong there . if there is a good example for that - it will be great .

BR
Avi

On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

For Bulk Formats like Parquet, unfortunately, we do not support setting the batch size.
The part-files roll on every checkpoint. This is a known limitation and there are plans to 
alleviate it in the future.

Setting the batch size (among other things) is supported for RowWise formats.

Cheers,
Kostas

On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <[hidden email]> wrote:
Thanks Kostas. I will definitely look into that. but is the StreamingFileSink also support setting the batch size by size and/or by time interval like bucketing sink ?

On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

The ParquetAvroWriters cannot be used with the BucketingSink.

In fact the StreamingFIleSink is the "evolution" of the BucketingSink and it supports
all the functionality that the BucketingSink supports.

Given this, why not using the StreamingFileSink?

On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <[hidden email]> wrote:
Thanks looks good.
Do you know a way to use PaquetWriter or ParquetAvroWriters with a BucketingSink file ? something like :
val bucketingSink = new BucketingSink[String]("/base/path")
bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <[hidden email]> wrote:
And for a Java example which is actually similar to your pipeline, 
you can check the ParquetStreamingFileSinkITCase.



On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

At a first glance I am not seeing anything wrong with your code. 
Did you verify that there are elements flowing in your pipeline and that checkpoints are actually completed? 
And also can you check the logs at Job and Task Manager for anything suspicious?

Unfortunately, we do not allow specifying encoding and other parameters to your writer, which is an omission 
on our part and this should be fixed. Could you open a JIRA for that?

If you want to know more about Flink's Parquet-Avro writer, feel free to have a look at the ParquetAvroWriters
class.

Cheers,
Kostas


On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <[hidden email]> wrote:
Thanks a lot Kostas, but the file not created . what am I doing wrong? 
BTW how can you set the encoding etc' in Flink's Avro - Parquet writer?
object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schema: Schema = new Schema.Parser().parse(schemaString)
val streamingSink = StreamingFileSink.forBulkFormat( path,
ParquetAvroWriters.forGenericRecord(schema))
.build()
env.enableCheckpointing(100)
val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", r.name)
genericReocrd.put("code", r.code.asString)
genericReocrd.put("ts", r.ts)
genericReocrd
}
stream.addSink { r =>
println(s"In Sink $r") //getting this line
streamingSink
}
env.execute()
}
Cheers
Avi

On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <[hidden email]> wrote:

Sorry, previously I got confused and I assumed you were using Flink's StreamingFileSink.

Could you try to use Flink's Avro - Parquet writer?

StreamingFileSink.forBulkFormat(
Path...(MY_PATH),
ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
.build()

Cheers,
Kostas

On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <[hidden email]> wrote:
Thanks.
yes, the env.execute is called and enabled checkpoints 
I think the problem is where to place the writer.close to flush the cache 
If I'll place on the sink after the write event e.g
addSink{
writer.write
writer.close
}
in this case only the first record will be included in the file but not the rest of the stream.


On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <[hidden email]> wrote:
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <[hidden email]> wrote:
Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <[hidden email]> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <[hidden email]> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <[hidden email]> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <[hidden email]> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <[hidden email] wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul