Code Examples
MongoDB
Read from database
import com.mongodb.spark._
import com.mongodb.spark.sql._
import com.mongodb.spark.config._
val dfFA = spark.read.option("database", "foxconn-analytics")
.option("collection", "collect_pageview")
// option uri can be omitted if it has been defined in Spark interpreter
.option("uri", "mongodb://mongo201,mongo202,mongo203/?replicaSet=bigdata&readPreference=secondaryPreferred")
.mongo()
Read from database with schema specifying by case class
case class EHAVE_TBL(
CNAME: String, CCOVEY: String, ISCORE: String,
IFPASS: String, FACTORY: String, ISEX: String
)
val dsEHAVE = spark.read
.option("database", "IEDB")
.option("collection", "EHAVE_TBL_2016")
.mongo[EHAVE_TBL]()
Write to database
MongoSpark.save(dfStaff2Mongo.write.format("com.mongodb.spark.sql")
.options(Map("database" -> "HR", "collection" -> "EMP_TAGS_20170324"))
.mode("overwrite")
)
MySQL/Maridb
Read from database
val dfLog = spark.read.format("jdbc")
.options(
Map("url" -> "jdbc:mysql://192.168.2.52/IIS?user=ACCOUNT_NAME&password=PASSWORD",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "IIS.TV",
"partitionColumn" -> "id",
"lowerBound" -> "1",
"upperBound" -> "50000000",
"numPartitions" -> "200"))
.load()
.cache()
Write to database
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("user", "<Username>")
prop.setProperty("password", "<Password>")
dfDiff.write
.mode("overwrite")
.jdbc(jdbc:mysql://192.168.2.52/<Database Name>?&useUnicode=yes&characterEncoding=UTF-8, "<Table Name>", prop)
Orc File
Read from database
val df = spark.read.orc("/orc/PromoterLearn.orc")
Write to database
df.select('JobNo, 'Name, 'Sex)
.write
.mode("overwrite")
.orc("/orc/PromoterLearn.orc")
Parquet
Read from database
val df = spark.read.parquet("/parquet/PromoterLearn.parquet")
Write to database
dfTaobao.select('JobNo,'Name,'Sex)
.write
.mode("overwrite")
.parquet("/parquet/PromoterTaobao2017.parquet")
CSV
Read from CSV
val dsProvTest = spark.read
.option("header", true)
.csv("/csv/ProvPlayTest.csv")
Write to CSV
dsProvPlay.write
.mode("overwrite")
.csv("/csv/ProvPlayTest.csv")
Oracle
Read from database
val dfDL = spark.read.format("jdbc")
.options(
Map("url" -> "jdbc:oracle:thin:<AccountName>/<Password>@<DB's IP>:<DB's Port>:<SID Name>",
"driver" -> "oracle.jdbc.driver.OracleDriver",
"dbtable" -> "DL_TBL"))
.load()
Write to database
MS SQL Server
Read from database
val dfSQLServer = spark.read.format("jdbc")
.option("url","jdbc:sqlserver://<DB's IP>:1433;database=educ2013;user=<Username>;password=<Password>")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("dbtable", "user_tbl")
.option("partitionColumn", "IID")
.option("lowerBound", "1")
.option("upperBound", "3370000")
.option("numPartitions", "100")
.load()
Use isin
val lstReceiverName = dfTaobao.groupBy('ReceiverName)
.count()
.orderBy('count.desc)
.filter('count >= 10)
.agg(collect_list($"ReceiverName").alias("RName"))
.first()
.getSeq(0)
dfTaobao.filter('ReceiverName.isin(lstReceiverName:_*))
.groupBy('ReceiverName)
.pivot("GoodsID")
.count()
.na.fill(0)
.withColumn("SubTotal", when('GoodsID.like("LCD-%"), lit("平板電視"))
.groupBy().sum()
.printCSV
Concatenate all columns
concat(dsLiveEPG.columns.map(c => col(c)):_*)
Hash an array of columns
import sqlContext.implicits._
import org.apache.spark.sql.functions.udf
def hasher(data: AnyRef *) = (data.map(_.hashCode).sum % 10)
val getBucket = udf(hasher _)
val df = sc.parallelize(('a' to 'z').map(_.toString) zip (1 to 30)).toDF("c1","c2")
df.withColumn("bucket", getBucket(array(df.columns.map(df.apply): _*))).show()
UDF
User-Defined Functions
UDF is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.
val replaceWrongDevice = udf {(device: String) =>
if(device == "ANOIRD" || device == "ANDOIRD") "ANDROID" else device
}
val createUUID3 = udf { x: String => UUID.nameUUIDFromBytes(x.getBytes).toString }
val createUUID4 = udf { () => java.util.UUID.randomUUID().toString }
converts a Scala list to its Java equivalent
import scala.collection.JavaConverters._
import org.bson.Document
val documents = sc.parallelize(
Seq(new Document("fruits", List("apples", "oranges", "pears").asJava))
)
MongoSpark.save(documents)
Window Function
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
val df = spark.createDataFrame(Seq(
(1, "philip", 2.0, "montreal"),
(2, "john", 4.0, "montreal"),
(3, "charles", 2.0, "texas"))).toDF("Id", "username", "rating", "city")
val w = Window.orderBy($"city")
df.withColumn("id", rank().over(w)).show()
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
Rename Dataset Column Name
val newNames = Seq("datetime", "userid", "URL", "drop")
val dfNewOne = dsUserWeb.toDF(newNames: _*)
Regular Expression to extract values
val dfTaobao = df0207.withColumn("GoodsID",
regexp_extract('GoodsTitle, """((?:LCD|SJ|BCD)\-[a-zA-Z0-9/\-]*).*""", 1))
regexp_extract(ReceiverAddress, '([^\\\\s]*)\\\\s+([^\\\\s]*)\\\\s+.*', 1))
Transpose rows to column
case class Tag(ID: String, TagName: String, TagValue: String)
var dsTag = Seq(Tag("1","A","1"),Tag("1","B","2"),Tag("2","A","2"),Tag("3","B","2")).toDS()
dsTag.groupBy('ID).pivot("TagName").agg(collect_list($"TagValue").getItem(0)).show
Last updated
Was this helpful?