ETL with PySpark – MySql 2 Cassandra

PySpark can be used as an ETL tool, replacing traditional tools like ODI or Informatica. Spark provides an ideal middleware framework for writing code that gets the job done fast.

I will show you an example of how to move data from a MySql database to a Cassandra DB with PySpark, run with Jupyter Notebook.

ETL is the process of extracting , transforming and loading the data , from one source system to another.

The data used : https://www.kaggle.com/currie32/crimes-in-chicago

  1. Prerequisites
    • Spark 3
    • MySql Connector Java
    • Anaconda / with Jupyter
  2. Import CSV in the MySql DB . I used phpmyadmin, it’s simplier and faster with the interface.

3. Prepare TARGET. Create KEYSPACE and table definition in Cassandra.

I am doing this on o virtualbox machine, so we’ll keep replication on 1. The most important thing is to define the primary key, because in Cassandra the primary key is also the partition key.

Another important thing to mention, as it can be seen, the column names are in double quotes. The reason for this is that, Create table in Cassandra makes by default lowercase column names, and the spark-cassandra-connector is case sensitive. So, if the Source system has uppercase columns, they have to be renamed or edited in the dataframe.

CREATE KEYSPACE crimes WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

use crimes;

CREATE TABLE crimes (
  "ID" int  ,
  "Case_Number" text  ,
  "Date" text  ,
  "Block" text  ,
  "IUCR" text  ,
  "Primary_Type" text  ,
  "Description" text  ,
  "Location_Description" text  ,
  "Arrest" text  ,
  "Domestic" text  ,
  "Beat" text  ,
  "District" text  ,
  "Ward" text  ,
  "Community_Area" text  ,
  "FBI_Code" text  ,
  "X_Coordinate" text  ,
  "Y_Coordinate" text  ,
  "Year" int  ,
  "Updated_On" text  ,
  "Latitude" text  ,
  "Longitude" text  ,
  "Location" text , 
  PRIMARY KEY ("ID")
) ;

4. PySpark 🙂

Because we are using PySpark with Jupyter, it has to be launched with connectors included.

The mysql connector can be downloaded from here: https://dev.mysql.com/downloads/connector/j/

pyspark –jars /usr/share/java/mysql-connector-java-8.0.20.jar –packages com.datastax.spark:spark-cassandra-connector_2.12:2.5.0 –conf spark.cassandra.connection.host=127.0.0.1

It is also important to use the right cassandra connector. 2.12 if SCALA 2.12 is installed or 2.11 if SCALA 2.11 is installed.

import os
from pyspark.sql import SQLContext
from pyspark import SparkContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/share/java/mysql-connector-java-8.0.20.jar  --packages com.datastax.spark:spark-cassandra-connector_2.12:2.5.0    --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'



sc=SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

def read_mysql():
    #Mysql DB Info
    hostname = "localhost" 
    dbname = "kosmin"
    jdbcPort = 3306
    username = "kosmin"
    password = "????"
    
    jdbc_url = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}&serverTimezone=UTC".format(hostname,jdbcPort, dbname,username,password)
    
    query = "(select * from t_crimes) t_crimes"
    #read into Dataframe
    df1 = sqlContext.read.format('jdbc').options(driver = 'com.mysql.jdbc.Driver',url=jdbc_url, dbtable=query ).load()
    df1.show()
    return df1

def write_cassandra(df1):
     # Write it into Cassandra
    df1.write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="crimes", keyspace="crimes")\
        .save()
    
if __name__ == "__main__":
       df= read_mysql()
       write_cassandra(df)

And the result :

About the author: cosmin chauciuc

Leave a Reply

Your email address will not be published.