# Code Examples

### MongoDB

> Read from database

```
import com.mongodb.spark._
import com.mongodb.spark.sql._
import com.mongodb.spark.config._

val dfFA = spark.read.option("database", "foxconn-analytics")
                     .option("collection", "collect_pageview")
                     // option uri can be omitted if it has been defined in Spark interpreter 
                     .option("uri", "mongodb://mongo201,mongo202,mongo203/?replicaSet=bigdata&readPreference=secondaryPreferred")
                     .mongo()
```

> Read from database with schema specifying by case class

```
case class EHAVE_TBL(
    CNAME: String, CCOVEY: String, ISCORE: String,
    IFPASS: String, FACTORY: String, ISEX: String
)

val dsEHAVE = spark.read
                   .option("database", "IEDB")
                   .option("collection", "EHAVE_TBL_2016")
                   .mongo[EHAVE_TBL]()
```

> Write to database

```
MongoSpark.save(dfStaff2Mongo.write.format("com.mongodb.spark.sql")
                     .options(Map("database" -> "HR", "collection" -> "EMP_TAGS_20170324"))
                     .mode("overwrite")
               )
```

### MySQL/Maridb

> Read from database

```
val dfLog = spark.read.format("jdbc")
                 .options(
                     Map("url" -> "jdbc:mysql://192.168.2.52/IIS?user=ACCOUNT_NAME&password=PASSWORD",
                         "driver" -> "com.mysql.jdbc.Driver",
                         "dbtable" -> "IIS.TV",
                         "partitionColumn" -> "id",
                         "lowerBound" -> "1",
                         "upperBound" -> "50000000",
                         "numPartitions" -> "200"))
                 .load()
                 .cache()
```

> Write to database

```
val prop = new java.util.Properties
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("user", "<Username>")
prop.setProperty("password", "<Password>") 

dfDiff.write
      .mode("overwrite")
      .jdbc(jdbc:mysql://192.168.2.52/<Database Name>?&useUnicode=yes&characterEncoding=UTF-8, "<Table Name>", prop)
```

### Orc File

> Read from database

```
val df = spark.read.orc("/orc/PromoterLearn.orc")
```

> Write to database

```
df.select('JobNo, 'Name, 'Sex)
  .write
  .mode("overwrite")
  .orc("/orc/PromoterLearn.orc")
```

### Parquet

> Read from database

```
val df = spark.read.parquet("/parquet/PromoterLearn.parquet")
```

> Write to database

```
dfTaobao.select('JobNo,'Name,'Sex)
        .write
        .mode("overwrite")
        .parquet("/parquet/PromoterTaobao2017.parquet")
```

### CSV

> Read from CSV

```
val dsProvTest = spark.read
                      .option("header", true)
                      .csv("/csv/ProvPlayTest.csv")
```

> Write to CSV

```
dsProvPlay.write
          .mode("overwrite")
          .csv("/csv/ProvPlayTest.csv")
```

### Oracle

> Read from database

```
val dfDL = spark.read.format("jdbc")
                .options(
                    Map("url" -> "jdbc:oracle:thin:<AccountName>/<Password>@<DB's IP>:<DB's Port>:<SID Name>",
                        "driver" -> "oracle.jdbc.driver.OracleDriver",
                        "dbtable" -> "DL_TBL"))
                .load()
```

> Write to database

```
```

### MS SQL Server

> Read from database

```
val dfSQLServer = spark.read.format("jdbc")
                       .option("url","jdbc:sqlserver://<DB's IP>:1433;database=educ2013;user=<Username>;password=<Password>")
                       .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                       .option("dbtable", "user_tbl")
                       .option("partitionColumn", "IID")
                       .option("lowerBound", "1")
                       .option("upperBound", "3370000")
                       .option("numPartitions", "100")
                       .load()
```

## Use isin

```
val lstReceiverName = dfTaobao.groupBy('ReceiverName)
                              .count()
                              .orderBy('count.desc)
                              .filter('count >= 10)
                              .agg(collect_list($"ReceiverName").alias("RName"))
                              .first()
                              .getSeq(0)

dfTaobao.filter('ReceiverName.isin(lstReceiverName:_*))
        .groupBy('ReceiverName)
        .pivot("GoodsID")
        .count()
        .na.fill(0)
        .withColumn("SubTotal", when('GoodsID.like("LCD-%"), lit("平板電視"))
        .groupBy().sum()
        .printCSV
```

## Concatenate all columns

```
concat(dsLiveEPG.columns.map(c => col(c)):_*)
```

## Hash an array of columns

```
import sqlContext.implicits._
import org.apache.spark.sql.functions.udf
def hasher(data: AnyRef *) = (data.map(_.hashCode).sum % 10)
val getBucket = udf(hasher _)
val df = sc.parallelize(('a' to 'z').map(_.toString) zip (1 to 30)).toDF("c1","c2")
df.withColumn("bucket", getBucket(array(df.columns.map(df.apply): _*))).show()
```

## UDF

**User-Defined Functions**

***UDF*** is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.

```
val replaceWrongDevice = udf {(device: String) => 
  if(device == "ANOIRD" || device == "ANDOIRD") "ANDROID" else device
}
```

```
val createUUID3 = udf { x: String => UUID.nameUUIDFromBytes(x.getBytes).toString }
val createUUID4 = udf { () => java.util.UUID.randomUUID().toString }
```

## converts a Scala list to its Java equivalent

```
import scala.collection.JavaConverters._
import org.bson.Document

val documents = sc.parallelize(
  Seq(new Document("fruits", List("apples", "oranges", "pears").asJava))
)
MongoSpark.save(documents)
```

## Window Function

```
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window

val df = spark.createDataFrame(Seq(
  (1, "philip", 2.0, "montreal"),
  (2, "john", 4.0, "montreal"),
  (3, "charles", 2.0, "texas"))).toDF("Id", "username", "rating", "city")

val w = Window.orderBy($"city")
df.withColumn("id", rank().over(w)).show()
```

```
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
```

## Rename Dataset Column Name

```
val newNames = Seq("datetime", "userid", "URL", "drop")
val dfNewOne = dsUserWeb.toDF(newNames: _*)
```

## Regular Expression to extract values

```
val dfTaobao = df0207.withColumn("GoodsID",
    regexp_extract('GoodsTitle, """((?:LCD|SJ|BCD)\-[a-zA-Z0-9/\-]*).*""", 1))
    regexp_extract(ReceiverAddress, '([^\\\\s]*)\\\\s+([^\\\\s]*)\\\\s+.*', 1))
```

## Transpose rows to column

```
case class Tag(ID: String, TagName: String, TagValue: String)
var dsTag = Seq(Tag("1","A","1"),Tag("1","B","2"),Tag("2","A","2"),Tag("3","B","2")).toDS()
dsTag.groupBy('ID).pivot("TagName").agg(collect_list($"TagValue").getItem(0)).show
```
