Using Spark SQLContext, HiveContext & Spark Dataframes API with ElasticSearch, MongoDB & Cassandra

In this post we will show how to use the different SQL contexts for data query on Spark.
We will begin with Spark SQL and follow up with HiveContext. In addition to this, we will conduct queries on various NoSQL databases and analyze the advantages / disadvantages of using them, so without further ado, let’s get started!

First of all we need to create a context that will add Spark to the configuration options for connecting to Cassandra:

Spark SQLContext allows us to connect to different Data Sources to write or read data from them, but it has limitations, namely that when the program ends or the Spark shell is closed, all links to the datasoruces we have created are temporary and will not be available in the next session.

This limitation is solved with HiveContext, since it uses a MetaStore to store the information of those “external” tables. In our example, this MetaStore is MySql. This configuration is included in a resource file (hive-site.xml) used by Hive. You can see the different properties in the GitHub project as the user and be careful if you are setting in the environment variables as the HADOOP_USER_NAME and HADOOP_CONF_DIR

At first glance it seems that everything is solved, but we have lost high availability. If you do not want to miss out on this, you can use XDContext with Stratio Crossdata, which is capable of storing the MetaStore in Zookeeper.

To use the datasources’ API we need to know how to create DataFrames. Two concepts that are basic:

  1. Schema:

In one DataFrame Spark is nothing more than an RDD composed of Rows which have a schema where we indicate the name and type of each column of the Rows.

  1. RDD[Row]:

Each element of the RDD has to be a Row, which is a set of values. Normally we have to transform an RDD of another type to an RDD of Rows.

With all this we are able to create a Data Frame both with SqlContext as well as HiveContext:

Another option to create DataFrames is using RDD[Case Class]. Each element of the RDD has to be a case class. Normally we have to transform an RDD of another type to an RDD of our case class.

We will be able to store any DataFrame we have created with simple configuration parameters in tables, indexes or collections in Cassandra, Elasticsearch, or MongoDB, respectively. Thanks to the different implementations that Spark Packages DataStax, Elastic or Stratio offer us.

MongoDB:

Note: We can also insert items in a collection using the functions that the Stratio library offers us.

ElasticSearch:

Note: We can also insert items in a collection using the functions that the Elastic library offers us.

Cassandra:

Now that we know how to write information in each of the NoSQL databases, lets see how we can consult and read from each of them:

 

1. Using the new API functions of DataFrames.

2. Creating the physical tables and temporary external tables within the Spark SqlContext are experimental, if you use HiveContext only create the temporary table, for use this feature correctly you can use CrossdataContext (XDContext).

3. Using HiveContext creating a link to the physical tables and storing it in Hive’s MetaStore.

In this way we can have access to a SQL language with more functionality than each DataSource provides natively, for optimal access I recommend using Crossdata as it optimizes queries that run natively on each of the three databases NoSQL.

 

The biggest advantage it offers, apart from the execution of queries in a cluster of Spark, based on memory is that we can do JOINS on the various NoSQL databases:

In the following link you can see all the code of the project, in GitHub.

In order to run it is necessary to have Elasticsearch, Cassandra and MongoDB installed and running.

Used versions:

* Scala 2.10.4

* Spark 1.5.2

* Spark-MongoDb 0.11.1

* Spark-ElasticSearch 2.2.0

* Spark-Cassandra 1.5.0

* Elasticsearch 1.7.2

* Cassandra 2.2.5

* MongoDB 3.0.7

I hope I have clarified the different ways to access and write data with Spark in each of the three major NoSQL databases.

2 Comments

  1. sampadsays:

    Hi
    I am trying to retrieve Cassandra data using hiveContext. I am trying to create a hive table from cassandra data in this way.
    hiveContext.sql(s”””CREATE TABLE IF NOT EXISTS testCassandra(id STRING)
    |USING “org.apache.spark.sql.cassandra”
    |OPTIONS (
    | table ‘cassandraclient’, keyspace ‘testkeyspace’
    | )
    “””.stripMargin)
    But I failed.
    When I create the jar and run it through spark-submit, I got the following exception
    Exception in thread “main” org.apache.spark.sql.AnalysisException: missing EOF at ‘USING’ near ‘)’;
    Please let me know how can I work on it and how to retrive the data from cassandra datastore.You can mail me in my mail id.

  2. Antonio Gutiérrez Molinuevosays:

    Muy buen post! Muy interesante poder hacer joins entre dataframes provenientes de diferentes fuentes de datos!

Leave a comment

Please be polite. We appreciate that. Your email address will not be published and required fields are marked