image

This part aims to present a simple yet effective ETL for GDELT data processing in Scala.

The analysis will be made on a Zeppelin Notebook using some built-in tools. If you have some questions regarding this part, please refer to my article on how to launch Zeppelin Notebooks.

Download the data

Once you’re in your Zeppelin Notebook, import some useful functions :

// Imports
import sys.process._
import java.net.URL
import java.io.File
import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import java.net.HttpURLConnection 
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import org.apache.spark.input.PortableDataStream
import java.util.zip.ZipInputStream
import java.io.BufferedReader
import java.io.InputStreamReader
import org.apache.spark.sql.SQLContext
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.auth.BasicAWSCredentials
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
import org.apache.spark.sql.types.IntegerType

The GDELT data set is quite special. As explained in a previous article, Zipped CSV files are uploaded every 15 minutes and record key events in the world based essentially on articles. The first step is to grab the list of the CSV files and to put those files in an S3 bucket.

We can define a file downloader function to load the data set later on :

def fileDownloader(urlOfFileToDownload: String, fileName: String) = {
    val url = new URL(urlOfFileToDownload)
    val connection = url.openConnection().asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(5000)
    connection.setReadTimeout(5000)
    connection.connect()

    if (connection.getResponseCode >= 400)
        println("error")
    else
        url #> new File(fileName) !!
}

There are 2 lists of files to download, one in English, and one international one.

fileDownloader("http://data.gdeltproject.org/gdeltv2/masterfilelist.txt", "/tmp/masterfilelist.txt") // save the list file to the Spark Master
fileDownloader("http://data.gdeltproject.org/gdeltv2/masterfilelist-translation.txt", "/tmp/masterfilelist_translation.txt") //same for Translation file

You might have to configure your AWS client service first. Once this is done, put those 2 lists in your S3 bucket :

awsClient.putObject("mys3bucket", "masterfilelist.txt", new File("/tmp/masterfilelist.txt") )
awsClient.putObject("mys3bucket", "masterfilelist_translation.txt", new File( "/tmp/masterfilelist_translation.txt") )

Then, load the list of all the files from the year 2018 :

// English Data
val list_csv = spark.read.format("csv").option("delimiter", " ").
    csv("s3a://fabien-mael-telecom-gdelt2018/masterfilelist.txt").
    withColumnRenamed("_c0","size").
    withColumnRenamed("_c1","hash").
    withColumnRenamed("_c2","url")
val list_2018_tot = list_csv.where(col("url").like("%/2018%"))

And download them all!

list_2018_tot.select("url").repartition(100).foreach( r=> {
    val URL = r.getAs[String](0)
    val fileName = r.getAs[String](0).split("/").last
    val dir = "/mnt/tmp/"
    val localFileName = dir + fileName
    fileDownloader(URL,  localFileName)
    val localFile = new File(localFileName)
    AwsClient.s3.putObject("mys3bucket", fileName, localFile )
    localFile.delete()
})

We can replicate this for the translated data set :

val list_csv_translation = spark.read.format("csv").option("delimiter", " ").
    csv("s3a://fabien-mael-telecom-gdelt2018/masterfilelist_translation.txt").
    withColumnRenamed("_c0","size").
    withColumnRenamed("_c1","hash").
    withColumnRenamed("_c2","url")
val list_2018_translation_tot = list_csv_translation.where(col("url").like("%/2018%"))

list_2018_translation_tot.select("url").repartition(100).foreach( r=> {
    val URL = r.getAs[String](0)
    val fileName = r.getAs[String](0).split("/").last
    val dir = "/mnt/tmp/"
    val localFileName = dir + fileName
    fileDownloader(URL,  localFileName)
    val localFile = new File(localFileName)
    AwsClient.s3.putObject("fabien-mael-telecom-gdelt2018", fileName, localFile )
    localFile.delete()

})

Create the data frames

The two major tables from the GDELT data set are Mentions and Export. All the files are stored zipped CSV files. Part of this pipeline is dedicated to unzip data.

// Export English
val exportRDD = sc.binaryFiles("s3a://mys3bucket/201801*.export.CSV.zip"). // Use Regex to load some files from 1st month
    flatMap {  // unzip files
        case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                    case _ => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val exportDF = exportRDD.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()

We have built an RDD for the first month of 2018 (one can use the whole year too). We can replicate this for the Mentions tables too, and also for the translated table.


// Mentions English
val mentionsRDD = sc.binaryFiles("s3a://mys3bucket/201801*.mentions.CSV.zip").
    flatMap {  // unzip files
        case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                    case _ => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }   
    }
val mentionsDF = mentionsRDD.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()
// Mentions Translation
val mentionsRDD_trans = sc.binaryFiles("s3a://mys3bucket/201801*translation.mentions.CSV.zip"). 
    flatMap {  
        case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                    case _ => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val mentionsDF_trans = mentionsRDD_trans.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()

// Export Translation
val exportRDD_trans = sc.binaryFiles("s3a://mys3bucket/201801*translation.export.CSV.zip"). 
    flatMap { 
        case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry).
                takeWhile{ case null => zis.close(); false
                    case _ => true }.
                flatMap { _ =>
                    val br = new BufferedReader(new InputStreamReader(zis))
                    Stream.continually(br.readLine()).takeWhile(_ != null)
                }
    }
val exportDF_trans = exportRDD_trans.map(x => x.split("\t")).map(row => row.mkString(";")).map(x => x.split(";")).toDF()

Conclusion : Our ETL is now defined. We have downloaded and sorted the data. We have created separated RDDs and unzipped files. In the next part, we’ll cover how to optimize the data sets and how to put the data in Cassandra Tables.