spark集成spline相关
spline 分为3部分构成,agent 、server、ui,以下仅介绍agent集成
下载
选择合适的spark和scala版本,这里提供两种方式
直接下载打包好的:https://repo1.maven.org/maven2/za/co/absa/spline/agent/spark/
或者clone代码带本地自己编译: https://github.com/absaoss/spline-spark-agent
安装spline agent
根据官方介绍spline有两种,我们这里这使用第一种codeless方式集成:
- 获取第一步agent;
- 将spark-a.b-spline-agent-bundle_x.y.jar添加到spark lib路径下;
- 修改spark-default.conf;
vim $spark_home/conf/spark-defaults.conf
// 将消息记录在log
spark.sql.queryexecutionlisteners=za.co.absa.spline.harvester.listener.splinequeryexecutionlistener
spark.spline.lineagedispatcher=log
spark.spline.lineagedispatcher.log.level=info
spark.spline.lineagedispatcher.log.classname=za.co.absa.spline.harvester.dispatcher.logginglineagedispatcher
// 将消息记录在kafka
spark.sql.queryexecutionlisteners=za.co.absa.spline.harvester.listener.splinequeryexecutionlistener
spark.spline.lineagedispatcher=kafka
spark.spline.lineagedispatcher.kafka.topic=spark_lineage_test
spark.spline.lineagedispatcher.kafka.producer.bootstrap.servers=localhost:9092
//多种dispatcher合用
spark.sql.queryexecutionlisteners=za.co.absa.spline.harvester.listener.splinequeryexecutionlistener
spark.spline.lineagedispatcher=composite
spark.spline.lineagedispatcher.composite.dispatchers=log,kafka
spark.spline.lineagedispatcher.composite.classname=za.co.absa.spline.harvester.dispatcher.compositelineagedispatcher
spark.spline.lineagedispatcher.composite.failonerrors=false
spark.spline.lineagedispatcher.log.level=info
spark.spline.lineagedispatcher.log.classname=za.co.absa.spline.harvester.dispatcher.logginglineagedispatcher
spark.spline.lineagedispatcher.kafka.topic=spark_lineage_test
spark.spline.lineagedispatcher.kafka.producer.bootstrap.servers=localhost:9092
这里只介绍几种,更多的请参考github:
此时不管是在spark app log中还是kafka topic种,均可找到spline的血缘解析json。
记录一个我集成时的错误,痛苦了好久
在spark app提交后,spline会自动init,而且这东西需要500mb左右driver内存消耗,当时一直不知道,,,报错如下
error applicationmaster: user class threw exception: java.lang.exceptionininitializererror
java.lang.exceptionininitializererror
at za.co.absa.spline.harvester.plugin.registry.autodiscoverypluginregistry.<init>(autodiscoverypluginregistry.scala:51)
at za.co.absa.spline.agent.splineagent$.create(splineagent.scala:66)
at za.co.absa.spline.harvester.sparklineageinitializer.createlistener(sparklineageinitializer.scala:162)
at za.co.absa.spline.harvester.sparklineageinitializer.$anonfun$createlistener$6(sparklineageinitializer.scala:139)
at za.co.absa.spline.harvester.sparklineageinitializer.witherrorhandling(sparklineageinitializer.scala:176)
at za.co.absa.spline.harvester.sparklineageinitializer.createlistener(sparklineageinitializer.scala:138)
at za.co.absa.spline.harvester.listener.splinequeryexecutionlistener.<init>(splinequeryexecutionlistener.scala:37)
at sun.reflect.nativeconstructoraccessorimpl.newinstance0(native method)
at sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.java:62)
at sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccessorimpl.java:45)
at java.lang.reflect.constructor.newinstance(constructor.java:423)
at org.apache.spark.util.utils$.$anonfun$loadextensions$1(utils.scala:2930)
at scala.collection.traversablelike.$anonfun$flatmap$1(traversablelike.scala:293)
at scala.collection.mutable.resizablearray.foreach(resizablearray.scala:62)
at scala.collection.mutable.resizablearray.foreach$(resizablearray.scala:55)
at scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:49)
at scala.collection.traversablelike.flatmap(traversablelike.scala:293)
at scala.collection.traversablelike.flatmap$(traversablelike.scala:290)
at scala.collection.abstracttraversable.flatmap(traversable.scala:108)
at org.apache.spark.util.utils$.loadextensions(utils.scala:2919)
at org.apache.spark.sql.util.executionlistenermanager.$anonfun$new$2(queryexecutionlistener.scala:90)
at scala.runtime.java8.jfunction0$mcv$sp.apply(jfunction0$mcv$sp.java:23)
at org.apache.spark.sql.internal.sqlconf$.withexistingconf(sqlconf.scala:158)
at org.apache.spark.sql.util.executionlistenermanager.$anonfun$new$1(queryexecutionlistener.scala:90)
at org.apache.spark.sql.util.executionlistenermanager.$anonfun$new$1$adapted(queryexecutionlistener.scala:88)
at scala.option.foreach(option.scala:407)
at org.apache.spark.sql.util.executionlistenermanager.<init>(queryexecutionlistener.scala:88)
at org.apache.spark.sql.internal.basesessionstatebuilder.$anonfun$listenermanager$2(basesessionstatebuilder.scala:336)
at scala.option.getorelse(option.scala:189)
at org.apache.spark.sql.internal.basesessionstatebuilder.listenermanager(basesessionstatebuilder.scala:336)
at org.apache.spark.sql.internal.basesessionstatebuilder.build(basesessionstatebuilder.scala:364)
at org.apache.spark.sql.sparksession$.org$apache$spark$sql$sparksession$$instantiatesessionstate(sparksession.scala:1175)
at org.apache.spark.sql.sparksession.$anonfun$sessionstate$2(sparksession.scala:162)
at scala.option.getorelse(option.scala:189)
at org.apache.spark.sql.sparksession.sessionstate$lzycompute(sparksession.scala:160)
at org.apache.spark.sql.sparksession.sessionstate(sparksession.scala:157)
at org.apache.spark.sql.dataframereader.<init>(dataframereader.scala:698)
at org.apache.spark.sql.sparksession.read(sparksession.scala:662)
at com.hs.sdi.utils.deltamultablesdijob.$anonfun$createdataframe$1(deltamultablesdijob.scala:390)
at scala.collection.mutable.hashmap.$anonfun$foreach$1(hashmap.scala:149)
at scala.collection.mutable.hashtable.foreachentry(hashtable.scala:237)
at scala.collection.mutable.hashtable.foreachentry$(hashtable.scala:230)
at scala.collection.mutable.hashmap.foreachentry(hashmap.scala:44)
at scala.collection.mutable.hashmap.foreach(hashmap.scala:149)
at com.hs.sdi.utils.deltamultablesdijob.createdataframe(deltamultablesdijob.scala:387)
at com.hs.sdi.utils.deltamultablesdijob.calculation(deltamultablesdijob.scala:480)
at com.hs.sdi.deltamultablesdijobmain$.main(deltamultablesdijobmain.scala:67)
at com.hs.sdi.deltamultablesdijobmain.main(deltamultablesdijobmain.scala)
at sun.reflect.nativemethodaccessorimpl.invoke0(native method)
at sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62)
at sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)
at java.lang.reflect.method.invoke(method.java:498)
at org.apache.spark.deploy.yarn.applicationmaster$$anon$2.run(applicationmaster.scala:739)
caused by: io.github.classgraph.classgraphexception: uncaught exception during scan
at io.github.classgraph.classgraph.scan(classgraph.java:1558)
at io.github.classgraph.classgraph.scan(classgraph.java:1575)
at io.github.classgraph.classgraph.scan(classgraph.java:1588)
at za.co.absa.spline.harvester.plugin.registry.autodiscoverypluginregistry$.$anonfun$pluginclasses$2(autodiscoverypluginregistry.scala:96)
at za.co.absa.commons.lang.arm$.using(arm.scala:30)
at za.co.absa.commons.lang.arm$resourcewrapper.flatmap(arm.scala:43)
at za.co.absa.spline.harvester.plugin.registry.autodiscoverypluginregistry$.<init>(autodiscoverypluginregistry.scala:96)
at za.co.absa.spline.harvester.plugin.registry.autodiscoverypluginregistry$.<clinit>(autodiscoverypluginregistry.scala)
... 53 more
caused by: java.lang.outofmemoryerror: gc overhead limit exceeded
at nonapi.io.github.classgraph.fileslice.reader.classfilereader.<init>(classfilereader.java:141)
at io.github.classgraph.classpathelementzip$1.openclassfile(classpathelementzip.java:409)
at io.github.classgraph.classfile.<init>(classfile.java:1925)
at io.github.classgraph.scanner$classfilescannerworkunitprocessor.processworkunit(scanner.java:741)
at io.github.classgraph.scanner$classfilescannerworkunitprocessor.processworkunit(scanner.java:664)
at nonapi.io.github.classgraph.concurrency.workqueue.runworkloop(workqueue.java:246)
at nonapi.io.github.classgraph.concurrency.workqueue.access$000(workqueue.java:50)
at nonapi.io.github.classgraph.concurrency.workqueue$1.call(workqueue.java:201)
at nonapi.io.github.classgraph.concurrency.workqueue$1.call(workqueue.java:198)
at java.util.concurrent.futuretask.run(futuretask.java:266)
at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149)
at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624)
at java.lang.thread.run(thread.java:748)
翻遍了官网和各种博客,后来在issues中找到了灵感,因为classgraph组件需要吃一定资源,我把我原本spark app driver memory由500m调整至1g,解决问题,以下是两个灵感来源
发表评论