Code Examples

MongoDB

Read from database

import com.mongodb.spark._
import com.mongodb.spark.sql._
import com.mongodb.spark.config._

val dfFA = spark.read.option("database", "foxconn-analytics")
                     .option("collection", "collect_pageview")
                     // option uri can be omitted if it has been defined in Spark interpreter 
                     .option("uri", "mongodb://mongo201,mongo202,mongo203/?replicaSet=bigdata&readPreference=secondaryPreferred")
                     .mongo()

Read from database with schema specifying by case class

case class EHAVE_TBL(
    CNAME: String, CCOVEY: String, ISCORE: String,
    IFPASS: String, FACTORY: String, ISEX: String
)

val dsEHAVE = spark.read
                   .option("database", "IEDB")
                   .option("collection", "EHAVE_TBL_2016")
                   .mongo[EHAVE_TBL]()

Write to database

MongoSpark.save(dfStaff2Mongo.write.format("com.mongodb.spark.sql")
                     .options(Map("database" -> "HR", "collection" -> "EMP_TAGS_20170324"))
                     .mode("overwrite")
               )

MySQL/Maridb

Read from database

val dfLog = spark.read.format("jdbc")
                 .options(
                     Map("url" -> "jdbc:mysql://192.168.2.52/IIS?user=ACCOUNT_NAME&password=PASSWORD",
                         "driver" -> "com.mysql.jdbc.Driver",
                         "dbtable" -> "IIS.TV",
                         "partitionColumn" -> "id",
                         "lowerBound" -> "1",
                         "upperBound" -> "50000000",
                         "numPartitions" -> "200"))
                 .load()
                 .cache()

Write to database

val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("user", "<Username>")
prop.setProperty("password", "<Password>") 

dfDiff.write
      .mode("overwrite")
      .jdbc(jdbc:mysql://192.168.2.52/<Database Name>?&useUnicode=yes&characterEncoding=UTF-8, "<Table Name>", prop)

Orc File

Read from database

val df = spark.read.orc("/orc/PromoterLearn.orc")

Write to database

df.select('JobNo, 'Name, 'Sex)
  .write
  .mode("overwrite")
  .orc("/orc/PromoterLearn.orc")

Parquet

Read from database

val df = spark.read.parquet("/parquet/PromoterLearn.parquet")

Write to database

dfTaobao.select('JobNo,'Name,'Sex)
        .write
        .mode("overwrite")
        .parquet("/parquet/PromoterTaobao2017.parquet")

CSV

Read from CSV

val dsProvTest = spark.read
                      .option("header", true)
                      .csv("/csv/ProvPlayTest.csv")

Write to CSV

dsProvPlay.write
          .mode("overwrite")
          .csv("/csv/ProvPlayTest.csv")

Oracle

Read from database

val dfDL = spark.read.format("jdbc")
                .options(
                    Map("url" -> "jdbc:oracle:thin:<AccountName>/<Password>@<DB's IP>:<DB's Port>:<SID Name>",
                        "driver" -> "oracle.jdbc.driver.OracleDriver",
                        "dbtable" -> "DL_TBL"))
                .load()

Write to database

MS SQL Server

Read from database

val dfSQLServer = spark.read.format("jdbc")
                       .option("url","jdbc:sqlserver://<DB's IP>:1433;database=educ2013;user=<Username>;password=<Password>")
                       .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                       .option("dbtable", "user_tbl")
                       .option("partitionColumn", "IID")
                       .option("lowerBound", "1")
                       .option("upperBound", "3370000")
                       .option("numPartitions", "100")
                       .load()

Use isin

val lstReceiverName = dfTaobao.groupBy('ReceiverName)
                              .count()
                              .orderBy('count.desc)
                              .filter('count >= 10)
                              .agg(collect_list($"ReceiverName").alias("RName"))
                              .first()
                              .getSeq(0)

dfTaobao.filter('ReceiverName.isin(lstReceiverName:_*))
        .groupBy('ReceiverName)
        .pivot("GoodsID")
        .count()
        .na.fill(0)
        .withColumn("SubTotal", when('GoodsID.like("LCD-%"), lit("平板電視"))
        .groupBy().sum()
        .printCSV

Concatenate all columns

concat(dsLiveEPG.columns.map(c => col(c)):_*)

Hash an array of columns

import sqlContext.implicits._
import org.apache.spark.sql.functions.udf
def hasher(data: AnyRef *) = (data.map(_.hashCode).sum % 10)
val getBucket = udf(hasher _)
val df = sc.parallelize(('a' to 'z').map(_.toString) zip (1 to 30)).toDF("c1","c2")
df.withColumn("bucket", getBucket(array(df.columns.map(df.apply): _*))).show()

UDF

User-Defined Functions

UDF is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.

val replaceWrongDevice = udf {(device: String) => 
  if(device == "ANOIRD" || device == "ANDOIRD") "ANDROID" else device
}
val createUUID3 = udf { x: String => UUID.nameUUIDFromBytes(x.getBytes).toString }
val createUUID4 = udf { () => java.util.UUID.randomUUID().toString }

converts a Scala list to its Java equivalent

import scala.collection.JavaConverters._
import org.bson.Document

val documents = sc.parallelize(
  Seq(new Document("fruits", List("apples", "oranges", "pears").asJava))
)
MongoSpark.save(documents)

Window Function

import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window

val df = spark.createDataFrame(Seq(
  (1, "philip", 2.0, "montreal"),
  (2, "john", 4.0, "montreal"),
  (3, "charles", 2.0, "texas"))).toDF("Id", "username", "rating", "city")

val w = Window.orderBy($"city")
df.withColumn("id", rank().over(w)).show()
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

Rename Dataset Column Name

val newNames = Seq("datetime", "userid", "URL", "drop")
val dfNewOne = dsUserWeb.toDF(newNames: _*)

Regular Expression to extract values

val dfTaobao = df0207.withColumn("GoodsID",
    regexp_extract('GoodsTitle, """((?:LCD|SJ|BCD)\-[a-zA-Z0-9/\-]*).*""", 1))
    regexp_extract(ReceiverAddress, '([^\\\\s]*)\\\\s+([^\\\\s]*)\\\\s+.*', 1))

Transpose rows to column

case class Tag(ID: String, TagName: String, TagValue: String)
var dsTag = Seq(Tag("1","A","1"),Tag("1","B","2"),Tag("2","A","2"),Tag("3","B","2")).toDS()
dsTag.groupBy('ID).pivot("TagName").agg(collect_list($"TagValue").getItem(0)).show

Last updated

Was this helpful?