Exception when using UDT in Spark DataFrame -
i'm trying create user defined type in spark sql, receive: com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype when using example. has made work?
my code:
test("udt serialisation") { val points = seq(new examplepoint(1.3, 1.6), new examplepoint(1.3, 1.8)) val df = sparkcontextforstdout.context.parallelize(points).todf() } @sqluserdefinedtype(udt = classof[examplepointudt]) case class examplepoint(val x: double, val y: double) /** * user-defined type [[examplepoint]]. */ class examplepointudt extends userdefinedtype[examplepoint] { override def sqltype: datatype = arraytype(doubletype, false) override def pyudt: string = "pyspark.sql.tests.examplepointudt" override def serialize(obj: any): seq[double] = { obj match { case p: examplepoint => seq(p.x, p.y) } } override def deserialize(datum: any): examplepoint = { datum match { case values: seq[_] => val xy = values.asinstanceof[seq[double]] assert(xy.length == 2) new examplepoint(xy(0), xy(1)) case values: util.arraylist[_] => val xy = values.asinstanceof[util.arraylist[double]].asscala new examplepoint(xy(0), xy(1)) } } override def userclass: class[examplepoint] = classof[examplepoint] }
the usefull stackstrace this:
com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype java.lang.classcastexception: com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype @ org.apache.spark.sql.sqlcontext.createdataframe(sqlcontext.scala:316) @ org.apache.spark.sql.sqlcontext$implicits$.rddtodataframeholder(sqlcontext.scala:254)
it seems udt needs used inside of class work (as type of field). 1 solution use directly wrap tuple1:
test("udt serialisation") { val points = seq(new tuple1(new examplepoint(1.3, 1.6)), new tuple1(new examplepoint(1.3, 1.8))) val df = sparkcontextforstdout.context.parallelize(points).todf() df.collect().foreach(println(_)) }
Comments
Post a Comment