data operations with spark and cassandra

Data Operations with Spark and Cassandra

In this blog, we will discuss a few different data operations we can do with Apache Spark and Cassandra; as well as, steps that you can use to try it out yourself.

We will cover a few different data operations, which include: using Spark to take data from one Cassandra table, transforming it, and writing it into another Cassandra table, deleting data from Cassandra tables using Spark, and then how to import data from one Cassandra cluster to another.

Prerequisites

Read, Transform, and Write

We will start by starting a Docker container that is running the latest Apache Cassandra Image.

docker run --name cassandra -p 9042:9042 -d cassandra:latest

Once it has completed setup, we will start the CQLSH terminal.

docker exec -it cassandra cqlsh 

We will create a keyspace and then create a table called spacecraft_journey_catalog.

CREATE KEYSPACE demo WITH REPLICATION={'class': 'SimpleStrategy', 'replication_factor': 1};

use demo ;

CREATE TABLE IF NOT EXISTS spacecraft_journey_catalog (
  	spacecraft_name text,
  	journey_id timeuuid,
  	start timestamp,
  	end timestamp,
  	active boolean,
  	summary text,
  	PRIMARY KEY ((spacecraft_name), journey_id)
) WITH CLUSTERING ORDER BY (journey_id desc);

We will need to insert some data into the newly created table. Use this text file and copy and paste it into your CQLSH terminal to insert 1000 records.

Now, we will start our local instance of Apache Spark. Open a new terminal / tab, and cd in to the Spark directory, i.e. cd spark-3.0.1-bin-hadoop2.7/

Once in the Spark directory, we will start a cluster with the master.

./sbin/start-master.sh

Additionally, we will start a worker and point it to the master node. To do this, we will need the Spark master URL, which can be found at localhost:8080

./sbin/start-slave.sh <Spark master URL>

Once the worker is started, you can confirm by going to localhost:8080 and refreshing the page. You should now see a worker logged under the “Workers” section.

Now, we can start our Spark shell. We will be using the basic Scala shell and you can use the below template to copy and paste in the terminal running in the Spark directory.

./bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 \
--master <Spark master URL> \
--conf spark.cassandra.connection.host=127.0.0.1 \
--conf spark.cassandra.connection.port=9042 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

If you noticed, we are also passing in the DataStax Spark-cassandra-connector, which is a package that will be needed to expose Cassandra tables as Spark RDDs and Datasets/DataFrames, write Spark RDDs and Datasets/DataFrames to Cassandra tables and execute arbitrary CQL queries in Spark applications.

Once the Spark shell has run, we will need to import a few packages and functions:

import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql._

There are 2 methods we can use to load data into Spark from Cassandra and do transformations on. The first method would be the load() method, and the second method would be using a catalog. We will cover both for you.

load() Method

val df = spark.read.cassandraFormat("spacecraft_journey_catalog", "demo").load()

Once the data is loaded, we could then transform the data 2 ways. If we noticed in the first table we made, we have an end time and start time for the individual journeys made by spacecrafts; however, we do not have the durations. If that is something we want to be calculated and put into another table, for displaying / BI purposes, then we can do so with Spark.

The first method is to use the select() method below. In this, we are taking the summary and journey columns from the data frame we loaded the initial tables data into, as well as, using the datediff function to calculate the number of days each journey took using the end and start columns as the duration_in_days column.

val newDF = df.select($"summary", $"journey_id", datediff($"end", $"start") as "duration_in_days")

Another way to do this would be to make the initial df a temp view so that we can do SQL on it like so:

df.createOrReplaceTempView("df")
val newDF = spark.sql("select summary, journey_id, datediff(end, start) as duration_in_days from df")

Now we can take a look at the catalog method to generate newDF.

Catalog

Instead of loading the data with .load, we will set a config option in the shell, like below:

spark.conf.set(s"spark.sql.catalog.cassandra", "com.datastax.spark.connector.datasource.CassandraCatalog")

Now we can make SQL queries directly on our Cassandra data using the database.keyspace.table method and generating newDF that way.

val newDF = spark.sql("select summary, journey_id, datediff(end, start) as duration_in_days from cassandra.demo.spacecraft_journey_catalog")

No matter which method you choose to generate newDF, we can now write it back into Cassandra as a new table.

Writing

To write to Cassandra, we will need to first create a new table using the new dataframe we just created using Spark. We will call the table duration_by_journey_summary and have the summary column as the partition column, and have journey_id as the clustering column.

newDF.createCassandraTable("demo", "duration_by_journey_summary", partitionKeyColumns = Some(Seq("summary")), clusteringKeyColumns = Some(Seq("journey_id")))

Now, we can write the dataframe into the newly created Cassandra table.

newDF.write.cassandraFormat("duration_by_journey_summary", "demo").mode("append").save()

You can then go to the CQLSH terminal and run the below to confirm that the data was correctly written to Cassandra.

SELECT * FROM demo.duration_by_journey_summary ;

That wraps up how to quickly do read, transform, and write data operations with Spark and Cassandra. Now, we will take a look at deleting data.

Deleting Data

We will delete data from Cassandra using Spark using 2 methods. The first method will delete an entire partition, and the second method will delete select rows from a partition.

Delete Entire Partition

To delete an entire partition we can run the following statement. This will use the where function to filter our partition and then we can pass that into the deleteFromCassandra function to delete the selected rows.

sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='ISS crew rotation.'").deleteFromCassandra("demo", "duration_by_journey_summary")

You can confirm this using the CQLSH terminal by running the below statement.

SELECT * FROM demo.duration_by_journey_summary where summary='ISS crew rotation.' ;

Deleting Selected Rows Within A Partition

In this method, we will delete selected rows within a partition by adding additional conditions within the where function.

sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='Bring supplies to space center' and duration_in_days<200").deleteFromCassandra("demo", "duration_by_journey_summary")

To confirm if the data was deleted or not, you can run the below statement in the CQLSH terminal. There should be less than ~10 records left

SELECT * from demo.duration_by_journey_summary where summary='Bring supplies to space center' ;

If there are still records that show a duration that is less than 200 days, then we may need to make some data model changes. Remember when we created the table from newDF? We only used summary and journey_id in the primary key. To change this, we can drop the table using CQLSH as seen below:

DROP TABLE demo.duration_by_journey_summary ;

Then we can rerun the create table statement from Spark with one additional item: adding duration_in_days as a clustering column. The statement can be found below:

newDF.createCassandraTable("demo", "duration_by_journey_summary", partitionKeyColumns = Some(Seq("summary")), clusteringKeyColumns = Some(Seq("duration_in_days", "journey_id")))

Then we can rerun writing to the table using the below statement:

newDF.write.cassandraFormat("duration_by_journey_summary", "demo").mode("append").save()

Now, if we go back and try deleting the data again, then we would be able to visualize the changes.

sc.cassandraTable("demo", "duration_by_journey_summary").where("summary='Bring supplies to space center' and duration_in_days<200").deleteFromCassandra("demo", "duration_by_journey_summary")

and in CQLSH

SELECT * from demo.duration_by_journey_summary where summary='Bring supplies to space center' ;

That wraps a quick introduction to deleting Cassandra data using Spark. Now we will take a look at how to migrate data from one cluster to another.

Import data from one Cassandra cluster to another

We will cover how to migrate data between two Cassandra clusters in two ways: using catalogs and RDDs.

Catalog Method

Using the catalog method from DataStax’s spark-cassandra-connector, you can set up 2 different catalogs, one for each cluster, and write statements to transfer data between them. An example is provided below:

//Catalog Cass100 for Cluster at 127.0.0.100
spark.conf.set(s"spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set(s"spark.sql.catalog.cass100.spark.cassandra.connection.host", "127.0.0.100")

//Catalog Cass200 for Cluster at 127.0.0.200
spark.conf.set(s"spark.sql.catalog.cass200", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set(s"spark.sql.catalog.cass200.spark.cassandra.connection.host", "127.0.0.200")

spark.sql("INSERT INTO cass200.ks.tab SELECT * from cass100.ks.tab")
//Or
spark.read.table("cass100.ks.tab").writeTo("cass200.ks.tab").append

RDD Method

def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","table")
  }

  {
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","table")
  }

}

If you are trying to connect a non-SSL cluster to a SSL enabled one, you can use this method:

import com.datastax.spark.connector._ 
import com.datastax.spark.connector.cql._ 
import org.apache.spark.SparkContext 
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "10.xxx.xxx.xx")) 

val rddFromClusterOne = { 
// Sets connectorToClusterOne as default connection for everything in this code block 
Implicit val c = connectorToClusterOne sc.cassandraTable("keyspace1","table1") } 

val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "10.xxx.xxx.xx").set("spark.cassandra.auth.username", "<username>").set("spark.cassandra.auth.password", "<password>").set("spark.cassandra.connection.ssl.enabled", "true").set("spark.cassandra.connection.ssl.trustStore.path", "/etc/dse/security/datatruststore-stage.jks").set("spark.cassandra.connection.ssl.trustStore.password", "<password>")) 
{ 
//Sets connectorToClusterTwo as the default connection for everything in this code block implicit val c = connectorToClusterTwo rddFromClusterOne.saveToCassandra("keyspace1","table1") } 

And with that, you can quickly get started with doing data operations with Spark and Cassandra.

Cassandra.Link

Cassandra.Link is a knowledge base that we created for all things Apache Cassandra. Our goal with Cassandra.Link was to not only fill the gap of Planet Cassandra but to bring the Cassandra community together. Feel free to reach out if you wish to collaborate with us on this project in any capacity.

We are a technology company that specializes in building business platforms. If you have any questions about the tools discussed in this post or about any of our services, feel free to send us an email!