The Correct Way to Connect Spark Shell to Cassandra
tl;dr
- using the cassandra connector in the spark-shell is fairly straightforward
- setting up the connection in a way that doens’t break the existing
sc
is not documented anywhere - the correct solution is to not call
sc.stop
but provide the cassandra host on startup of the shell
Spark and Cassandra
Apache Cassandra is a NoSQL distributed database that’s been gaining popularity recently. It’s also pretty high performance, scoring very high in a (not so) recent comparison of key-value stores (PDF) for different workloads. Among the contenders were HBase, Cassandra, Voldemort, Redis, VoltDB and MySQL, HBase tends to be the winner (by one to two orders of magnitude) when it comes to latency and Cassandra when it comes to throughput - depending on the number of nodes in cluster. A key-value store is nice, but it isn’t much use unless you have something doing reads and writes into it. That’s where spark
comes in.
Every data scientist’s[1] [2][3] favourite new toy spark
is a distributed in-memory data processing framework. Cassandra very helpfully comes with a spark
connector that allows you to pull data into spark as RDD
s or DataFrame
s directly from Cassandra.
Connection Issues
Connecting to a Cassandra host from spark
isn’t all that complicated, just import the connector and tell SparkConf
where to find the Cassandra host from and you’re off to the races. The Cassandra connector docs cover the basic usage pretty well. Aside from the bazillion different versions of the connector getting everything up and running is fairly straightforward.
Start the spark shell with the necessary Cassandra connector dependencies bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.datastax.spark.connector._
# connect to a local cassandra instance
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
# read in some data as a DataFrame
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()
Lovely, you now have a DataFrame that acts just like any other spark
DataFrame. So far so good. Now let’s say you wanted to test something in the spark-shell
and pull in data from Cassandra. No problem, just do what you did before, except that you need to stop the existing SparkContext
that is created automagically when the shell starts up, before you can create a new one. This isn’t really documented anywhere, except sporadically on StackOverflow. The accepted answer is actually the wrong way to do this.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// DO NOT DO THIS
sc.stop // NOOOooo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.datastax.spark.connector._
// connect to a local cassandra instance
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// read in some data as a DataFrame
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "fooTable", "keyspace" -> "bar")).load.cache()
The SparkContext
created above will not function like the old SparkContext
created when the shell started up. This doesn’t actually have anything to do the Cassandra connector perse, it’s just that the setup for the Cassandra connector brings up this issue. To see the problem consider the following simplified code without the Cassandra connector.
1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
sc.stop
val conf = sc.getConf
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(0, 1), (1, 10), (2, 15), (3, 7))
val map = Map(0->3, 1->2, 2->1, 3->0)
val BVmap = sc.broadcast(map)
rdd.map( r => BVmap.value(r._1))
The above doesn’t do anything particularly worthwhile, but it illustrates the problem. Because the SparkContext
was recreated the code will fail in the shell, due to sc
being not serialisable anymore.
The Correct Way
The solution is extremely simple, but suprisingly difficult to find. Instead of calling sc.stop
and then recreating the conf
with the Cassandra host details added, just add the Cassandra host details to the configuration defaults in $SPARK_HOME/conf/spark-defaults.conf
. Should you not have access to the default conf you can also provide the connection host in the call to spark-shell
bin/spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10 --conf spark.cassandra.connection.host=127.0.0.1
This not being included in the official Cassandra connector documentation is bizarre.
misc
[3] I don’t like the term either but that’s what we seem to have settled for.