scala中通常是通过JDBC组件来连接Mysql。JDBC, 全称为Java DataBase Connectivity standard。
加载依赖
其中包含 JDBC driver
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
1.1 spark组件直接连接(推荐)
通过spark.read直接连接,直接得到dataframe
val database = "test_db"
val table = "test_table"
val user = "hive"
val password = "hive"
val url= "jdbc:mysql://localhost:10101/"+database
val jdbcDF = (spark.read.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.option("driver", "com.mysql.cj.jdbc.Driver")
.load())
jdbcDF.show()
+---+--------+
| id| value|
+---+--------+
| 0|Record 0|
| 1|Record 1|
| 2|Record 2|
| 3|Record 3|
| 4|Record 4|
+---+--------+
//通过connectionProperties避免多次写入配置
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
//可增加.option("","")添加其他参数
.jdbc(url, table , connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc(url, table , connectionProperties)
// 保存数据
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
//或等价的:
jdbcDF2.write
.jdbc(url, table , connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc(url, table , connectionProperties)
//通过query取数据
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
//或放到properties里
connectionProperties.put("query", "select c1, c2 from t1")
spark.read
.jdbc(url, table , connectionProperties)
注意:driver的类名根据不同的JDBC版本不同,早一些的版本为com.mysql.jdbc
,而不是com.mysql.cj.jdbc.Driver
。
2.1 jdbc api方法连接
还可通过 jdbc方法获取或保存数据
/**
* Created by Administrator on 2017/12/23.
*/
import java.sql.{ Connection, DriverManager }
object ScalaJdbcConnectSelect extends App {
// 访问本地MySQL服务器,通过3306端口访问mysql数据库
val url = "jdbc:mysql://localhost:3306/cgjr?useUnicode=true&characterEncoding=utf-8&useSSL=false"
//驱动名称
val driver = "com.mysql.cj.jdbc.Driver"
//用户名
val username = "root"
//密码
val password = "12345"
//初始化数据连接
var connection: Connection = _
try {
//注册Driver
Class.forName(driver)
//得到连接
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement
//执行查询语句,并返回结果
val rs = statement.executeQuery("SELECT name, num FROM persons")//返回java.sql的ResultSet
//打印返回结果
while (rs.next) {
val name = rs.getString("name")
val num = rs.getString("num")
// println(name+"\t"+num)
println("name = %s, num = %s".format(name, num))
}
println("查询数据完成!")
// 执行插入操作
val rs2 = statement.executeUpdate("INSERT INTO `persons` (`name`, `num`) VALUES ('徐志摩', '22')")
println("插入数据完成")
// 执行更新操作
val rs3 = statement.executeUpdate("UPDATE persons set num=55 WHERE `name`=\"徐志摩\"")
println("更新数据完成!")
// 执行删除操作
val rs4 = statement.executeUpdate("delete from persons WHERE `name`=\"徐志摩\"")
println("删除数据完成!")
// 执行调用存储过程操作
val rs5 = statement.executeUpdate("call add_student(3)")
println("调用存储过程完成!")
} catch {
case e: Exception => e.printStackTrace
}
//关闭连接,释放资源
connection.close
}
参考
Spark Scala: Load Data from MySQL
【scala 数据库操作】scala操作mysql数据库
【官方】使用jdbc连接其他数据库