Re: Understanding job flow

Posted by Vishwas Siravara on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Understanding-job-flow-tp29435p29451.html

I did not find this to be true. Here is my code snippet. 

object DruidStreamJob extends Job with SinkFn {

private[flink] val druidConfig = DruidConfig.current

private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

// //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack.
// System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib")
//
// import java.lang.reflect.Field
//
// val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths")
// fieldSysPath.setAccessible(true)
// fieldSysPath.set(null, null)
//
// print(System.getProperty("java.library.path"))

private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
ExecutionEnv.
mockEncryption,
ExecutionEnv.enableEncryption,
ExecutionEnv
.
loadEncryptionSet)

aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption)

sdsClient.init()


/**
* Start streaming job execution .
*
* @param argMap
*/
private[flink] def runJob(argMap: Map[String, String]): Unit = {

val env = ExecutionEnv.executionEnv(argMap)
this.source = ExecutionEnv.sourceTopics

env.enableCheckpointing(1000)
env.setStateBackend(
new FsStateBackend("s3://vishwas.test1/checkpoints"))
sourceAndSinkFn(env, source)
env.execute(jobName = name)
}

/**
* @inheritdoc
* @param env
* @param topics
*/
override private[flink] def sourceAndSinkFn(
env: StreamExecutionEnvironment
,
topics: List[String]) = {
val dataStream = addSource(env)
log.info("Subscribed to topics" + topics)

val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

override def filter(value: GenericRecord): Boolean = {
ExecutionEnv.
messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv
.
pcrList.contains(value.get("CMLS_DEST_PCR").toString)
}
})

val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient
.decrypt(applyValues(record)))))

result.print()
KafkaSink(result).sendToKafka
}

private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = {
aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava)
maptoEncrypt
}

private[flink] def applyValues(
genericRecord: GenericRecord): mutable.Map[
String, Any] = {

collection.mutable.Map(genericRecord.getSchema.getFields.asScala
.map(field =>
field.schema().getType
match {
case Schema.Type.LONG =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Long]
case Schema.Type.INT =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Int]
case Schema.Type.DOUBLE =>
field.name() -> genericRecord.get(field.name()).asInstanceOf[
Double]
case Schema.Type.STRING =>
field.name() -> genericRecord.get(field.name()).toString
case _ =>
field.name() -> genericRecord.get(field.name()).toString
}): _*)

}

private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = {
try {
if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
}
return payload + ("timestamp" -> System.currentTimeMillis())
}
catch {
case e: Exception => {
errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace())
return payload + ("timestamp" -> System.currentTimeMillis())
}
}
}

}

The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. 

Thanks,
Vishwas 

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <[hidden email]> wrote:
@transient or use a static factory.

In Scala we use a @transient lazy val with an initializer to do this

Sent from my iPhone

On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <[hidden email]> wrote:

Thanks Steven. Is there a way where in I can create a singleton instance in each task manager instead of serializing this object ? 

Thanks,
Vishwas 

On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <[hidden email]> wrote:
The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error.

Sent from my iPhone

> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <[hidden email]> wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I initialize the encryptor in the main method and apply it in the map function. How is this encryptor shared when I have my job running on multiple task managers with parallelism > 1 ?
>
> Thanks,
> Vishwas