git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

IllegalArgumentException while saving rdd after repartition by cassandra replica set using cassandra spark connector


Hi Folks:

I am working with on a project to save Spark dataframe to cassandra and am getting an exception regarding row size not valid (see below). I tried to trace the code in the connector and it appears that the row size (3 below) is different from the column count (which turns out be 1).  I am trying to follow the example from https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md with customer having two more fields than just the id as mentioned in the example.  In case of the example I think it will work because it has only 1 column (customer_id) but I need to save additional fields.  I've tried searching but have not found any resolution to this.

I am using Spark 2.3.0 and spark-cassandra-connector:2.3.0-s_2.11.

Just a little background - I tried saving the dataframe to cassandra and it works.  However, it is very slow.  So I am trying to see if using repartitionByCassandraReplica will make it faster.  I've tried various combinations of batch rows size, concurrent writers, etc on data frame and it is still very slow.  Therefore I am looking at using repartitionByCassandraReplica before trying to save it to the cassandra table.  If there are other options to make saving dataframe to cassandra faster, please let me know.

Here is my scenario:

Cassandra table in keyspace test - create table customer ( customer_id text primary key, order integer, value integer);
Spark shell commands: 
  import com.datastax.spark.connector._
  import org.apache.spark.sql.cassandra._
  case class Customer(customer_id:String, order_id:Int, value:Int)
  val customers = Seq(Customer("1",1,1),Customer("2",2,2)).toDF("customer_id","order_id","value")
  val customersRdd = customers.rdd.repartitionByCassandraReplica("test","customers")
  customersRdd.saveToCassandra("test","customer")

At this point I get an exception :

java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 1.
at scala.Predef$.require(Predef.scala:224)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
at com.datastax.spark.connector.rdd.partitioner.TokenGenerator.getPartitionKeyBufferFor(TokenGenerator.scala:38)
at com.datastax.spark.connector.rdd.partitioner.ReplicaPartitioner.getPartition(ReplicaPartitioner.scala:70)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/07/18 10:27:51 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 4)
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 1.

Thanks for your help.