Apache Spark broadcast variables are not reused -
i face weird behavior when using broadcast variables. each time use broadcasted variable, content gets copied once each node , never reused.
here example in spark-shell --master local[32]: (granted, it's useless , stupid code show behavior)
case class test(a:string) val test = test("123") val bc = sc.broadcast(test) // on 32 core machine, 33 copies of test (expected) // yourkit profiler shows 33 instances of object (32 unreachable) sc.parallelize((1 100)).map(x => bc.value.a).count // doing again, test copies not reused , serialized again (now 65 copies, 64 unreachable) sc.parallelize((1 100)).map(x => bc.value.a).count
in case, variable broadcast several hundred megabytes , composed of millions of small objects (few hashmaps , vectors).
each time run operation on rdd use it, several gigabytes of memory wasted , garbage collector getting more , more of bottleneck!
is design re-broadcast variables each execution of new closure or bug , should reusing copies?
why unreachable immediatly after use?
it particular spark-shell in local mode?
note: i'm using spark-1.3.1-hadoop2.6
update1: according post: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-share-a-nonserializable-variable-among-tasks-in-the-same-worker-node-td11048.html singleton objects not work anymore on spark 1.2.x+ kind of workaround not work either:
val bcmodel = sc.broadcast(bigmodel) object modelcache { @transient lazy private val localmodel = { bcmodel.value } def getmodel = localmodel } sc.parallelize((1 100)).map(x => modelcache.getmodel.somevalue)
update2: i've tried reuse accumulator pattern without success:
class voidaccumulatorparams extends accumulatorparam[bigmodel] { override def addinplace(r1: bigmodel, r2: bigmodel): bigmodel= { r1 } override def zero(initialvalue: bigmodel): bigmodel= { initialvalue } } val acc = sc.accumulator(bigmodel, "bigmodel")(new voidaccumulableparams()) sc.parallelize((1 100)).map(x => acc.localvalue.somevalue)
update3: looks singleton object works when running job spark-submit instead of scala shell.
take @ broadcast tests (broadcastsuite.scala) see expect.
when run first job, object serialized, cut chunks , chunks sent executors (through blockmanager mechanism). deserialize object chunks , use in processing tasks. when finish, discard deserialized object, blockmanager keeps chunks of serialized data cached.
for second job object not need serialized , transmitted. it's deserialized cache , used.
caveats: one, not avoid excessive gc. other thing is, tried validate above theory using mutable state (class test(var a: string) extends serializable
) , mutating between runs. surprise second run saw mutated state! i'm either wrong, or wrong in local
mode. hope can tell which. (i'll try test further myself if remember tomorrow.)
Comments
Post a Comment