Understanding job flow

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding job flow

Vishwas Siravara
Hi guys,
I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ? 

Thanks,
Vishwas  
Reply | Threaded
Open this post in threaded view
|

Re: Understanding job flow

Steven Nelson
The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error.

Sent from my iPhone

> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <[hidden email]> wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ?
>
> Thanks,
> Vishwas  
Reply | Threaded
Open this post in threaded view
|

Re: Understanding job flow

Vishwas Siravara
I did not find this to be true. Here is my code snippet. 

object DruidStreamJob extends Job with SinkFn {

private[flink] val druidConfig = DruidConfig.current

private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

// //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack.
// System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib")
//
// import java.lang.reflect.Field
//
// val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths")
// fieldSysPath.setAccessible(true)
// fieldSysPath.set(null, null)
//
// print(System.getProperty("java.library.path"))

private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
ExecutionEnv.
mockEncryption,
ExecutionEnv.enableEncryption,
ExecutionEnv
.
loadEncryptionSet)

aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption)

sdsClient.init()


/**
* Start streaming job execution .
*
* @param argMap
*/
private[flink] def runJob(argMap: Map[String, String]): Unit = {

val env = ExecutionEnv.executionEnv(argMap)
this.source = ExecutionEnv.sourceTopics

env.enableCheckpointing(1000)
env.setStateBackend(
new FsStateBackend("s3://vishwas.test1/checkpoints"))
sourceAndSinkFn(env, source)
env.execute(jobName = name)
}

/**
* @inheritdoc
* @param env
* @param topics
*/
override private[flink] def sourceAndSinkFn(
env: StreamExecutionEnvironment
,
topics: List[String]) = {
val dataStream = addSource(env)
log.info("Subscribed to topics" + topics)

val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

override def filter(value: GenericRecord): Boolean = {
ExecutionEnv.
messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv
.
pcrList.contains(value.get("CMLS_DEST_PCR").toString)
}
})

val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient
.decrypt(applyValues(record)))))

result.print()
KafkaSink(result).sendToKafka
}

private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = {
aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava)
maptoEncrypt
}

private[flink] def applyValues(
genericRecord: GenericRecord): mutable.Map[
String, Any] = {

collection.mutable.Map(genericRecord.getSchema.getFields.asScala
.map(field =>
field.schema().getType
match {
case Schema.Type.LONG =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Long]
case Schema.Type.INT =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Int]
case Schema.Type.DOUBLE =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Double]
case Schema.Type.STRING =>
field.name() -> genericRecord.get(field.name()).toString
case _ =>
field.name() -> genericRecord.get(field.name()).toString
}): _*)

}

private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = {
try {
if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
}
return payload + ("timestamp" -> System.currentTimeMillis())
}
catch {
case e: Exception => {
errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace())
return payload + ("timestamp" -> System.currentTimeMillis())
}
}
}

}

The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. 

Thanks,
Vishwas 

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <[hidden email]> wrote:
@transient or use a static factory.

In Scala we use a @transient lazy val with an initializer to do this

Sent from my iPhone

On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <[hidden email]> wrote:

Thanks Steven. Is there a way where in I can create a singleton instance in each task manager instead of serializing this object ? 

Thanks,
Vishwas 

On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <[hidden email]> wrote:
The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error.

Sent from my iPhone

> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <[hidden email]> wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ?
>
> Thanks,
> Vishwas 
Reply | Threaded
Open this post in threaded view
|

Re: Understanding job flow

Victor Wong

Hi Vishwas,

 

Since `DruidStreamJob` is an “object” of scala, and the initialization of your sds client is not within any method, it will be called every time ` DruidStreamJob` is loaded (like static block in Java).

Your taskmanagers are different JVM processes, and ` DruidStreamJob` needs to be loaded within them, so the initialization of your sds client is called each time.

 

You can try to put the initialization within `runJob` method, and pass it down as a parameter.

But I wonder if a sds client can be serialized or not, since this kind of client usually holds a http connection, which cannot be serialized.

 

Best,

Victor

 

From: Vishwas Siravara <[hidden email]>
Date: Friday, August 16, 2019 at 11:48 PM
To: Steven Nelson <[hidden email]>, user <[hidden email]>
Subject: Re: Understanding job flow

 

I did not find this to be true. Here is my code snippet. 

 

object DruidStreamJob extends Job with SinkFn {

 
private[flink] val druidConfig = DruidConfig.current

 
private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

 
//  //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack.
 
//  System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib")
  //
  //  import java.lang.reflect.Field
  //
  //  val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths")
  //  fieldSysPath.setAccessible(true)
  //  fieldSysPath.set(null, null)
  //
  //  print(System.getProperty("java.library.path"))

 
private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
    ExecutionEnv.mockEncryption
,
   
ExecutionEnv.enableEncryption,
   
ExecutionEnv
      .loadEncryptionSet)

 
aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

 
val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

 
val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

 
private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption)

  sdsClient.init()


 
/**
   * Start streaming job execution .
   *
   * @param argMap
  
*/
 
private[flink] def runJob(argMap: Map[String, String]): Unit = {

   
val env = ExecutionEnv.executionEnv(argMap)
   
this.source = ExecutionEnv.sourceTopics

   
env.enableCheckpointing(
1000)
    env.setStateBackend(
new FsStateBackend("s3://vishwas.test1/checkpoints"))
    sourceAndSinkFn(env
, source)
    env.execute(jobName = name)
  }

 
/**
   * @inheritdoc
  
* @param env
  
* @param topics
  
*/
 
override private[flink] def sourceAndSinkFn(
    env: StreamExecutionEnvironment
,
   
topics: List[String]) = {
   
val dataStream = addSource(env)
   
log.info("Subscribed to topics" + topics)

   
val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

     
override def filter(value: GenericRecord): Boolean = {
        ExecutionEnv.
messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv
          .
pcrList.contains(value.get("CMLS_DEST_PCR").toString)
      }
    })

   
val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient
     
.decrypt(applyValues(record)))))

    result.print()
    KafkaSink(result).sendToKafka
  }

 
private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = {
   
aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava)
    maptoEncrypt
  }

 
private[flink] def applyValues(
    genericRecord: GenericRecord): mutable.Map[
String, Any] = {

    collection.mutable.Map(genericRecord.getSchema.getFields.asScala
      .map(field =>
        field.schema().getType
match {
         
case Schema.Type.LONG =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[
Long]
          
case Schema.Type.INT =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[
Int]
         
case Schema.Type.DOUBLE =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[
Double]
         
case Schema.Type.STRING =>
            field.name() -> genericRecord.get(field.name()).toString
         
case _ =>
            field.name() -> genericRecord.get(field.name()).toString
        }): _*)

  }

 
private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = {
   
try {
     
if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
       
return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
     
}
     
return payload + ("timestamp" -> System.currentTimeMillis())
    }
catch {
     
case e: Exception => {
       
errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace())
       
return payload + ("timestamp" -> System.currentTimeMillis())
      }
    }
  }

}

 

The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. 

 

Thanks,

Vishwas 

 

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <[hidden email]> wrote:

@transient or use a static factory.

 

In Scala we use a @transient lazy val with an initializer to do this

Sent from my iPhone


On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <[hidden email]> wrote:

Thanks Steven. Is there a way where in I can create a singleton instance in each task manager instead of serializing this object ? 

 

Thanks,

Vishwas 

 

On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <[hidden email]> wrote:

The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error.

Sent from my iPhone

> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <[hidden email]> wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ?
>
> Thanks,
> Vishwas