Exception when using UDT in Spark DataFrame -


i'm trying create user defined type in spark sql, receive: com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype when using example. has made work?

my code:

test("udt serialisation") {     val points = seq(new examplepoint(1.3, 1.6), new examplepoint(1.3, 1.8))     val df = sparkcontextforstdout.context.parallelize(points).todf() }  @sqluserdefinedtype(udt = classof[examplepointudt])  case class examplepoint(val x: double, val y: double)  /**  * user-defined type [[examplepoint]].  */ class examplepointudt extends userdefinedtype[examplepoint] {    override def sqltype: datatype = arraytype(doubletype, false)    override def pyudt: string = "pyspark.sql.tests.examplepointudt"    override def serialize(obj: any): seq[double] = {     obj match {       case p: examplepoint =>         seq(p.x, p.y)     }   }    override def deserialize(datum: any): examplepoint = {     datum match {       case values: seq[_] =>         val xy = values.asinstanceof[seq[double]]         assert(xy.length == 2)         new examplepoint(xy(0), xy(1))       case values: util.arraylist[_] =>         val xy = values.asinstanceof[util.arraylist[double]].asscala         new examplepoint(xy(0), xy(1))     }   }    override def userclass: class[examplepoint] = classof[examplepoint]  } 

the usefull stackstrace this:

com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype java.lang.classcastexception: com.ubs.ged.risk.stdout.spark.examplepointudt cannot cast org.apache.spark.sql.types.structtype     @ org.apache.spark.sql.sqlcontext.createdataframe(sqlcontext.scala:316)     @ org.apache.spark.sql.sqlcontext$implicits$.rddtodataframeholder(sqlcontext.scala:254) 

it seems udt needs used inside of class work (as type of field). 1 solution use directly wrap tuple1:

  test("udt serialisation") {     val points = seq(new tuple1(new examplepoint(1.3, 1.6)), new tuple1(new examplepoint(1.3, 1.8)))     val df = sparkcontextforstdout.context.parallelize(points).todf()     df.collect().foreach(println(_))   } 

Comments

Popular posts from this blog

c++ - Difference between pre and post decrement in recursive function argument -

php - Nothing but 'run(); ' when browsing to my local project, how do I fix this? -

php - How can I echo out this array? -