View on GitHub

Spark-Cassandra-Notes

Cassandra data computing with Apache Spark

< Back Home

Dataset Join

Github repository Path: examples/dataset-join-01 Language: Scala v2.11

Joining two Datasets

  • Since Spark 2.0 Dataset API is a high-level abstraction and an user-defined view of structured and semi-structured data. Dataset is also more space efficient than RDD.
  • Datasets are typed version of DataFrames: DataFrame is Dataset (collection) of Rows.
  • RDD for the contrary is a low-level access interface which is better for unstructured data like streams or for expressiveness.
  • Datasets and DataFrames are built on top of [RDD]https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#resilient-distributed-datasets-rdds.
  • Dataset and DataFrame are distributed as well.

Detailed

// datastax Cassandra Connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.cql.CassandraConnector

// spark sql libraries
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
// setting up Cassandra-ready spark session
val spark = SparkSession
              .builder()
              .appName("SparkCassandraApp")
              .config("spark.cassandra.connection.host", "localhost")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate()
// db session stablishment
val connector = CassandraConnector(spark.sparkContext.getConf)
val session = connector.openSession()
// reading datasets to join
val dsPeople = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "mock_data", "keyspace" -> "examples"))
  .load()

val dsCars = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "mock_cars", "keyspace" -> "examples"))
  .load()
// joining datasets
// for test only: it could be optimized filtering before join them
val dsJoin = dsPeople
  .join(dsCars,dsPeople("id") <=> dsCars("id_owner"))

// printing records of people who own blue honda cars 
val dsBlueHonda = dsJoin.filter("color == 'Blue' and car_make == 'Honda'").select("email","id","car_model","drinker")
println("People with a Blue Honda:")
dsBlueHonda.show()
// printing records of cars owned by people who daily drink :/
val dsDrinkers = dsJoin.filter("drinker == 'Daily'").select("email","id","car_make","car_model")
println("People and his cars who drink daily:")
dsDrinkers.show(200,false)