spark_idea
- 3、打jar包运行
- 2、code
- 1、pom_xml
- 0、创建数据、模型、预测表
- 0、Windows配置scala环境
3、打jar包运行
./bin/spark-submit \
--class spark02 \
--master spark://hadoop102:7077 \
--deploy-mode client \
/home/gpb/scala_spark2.jar
打jar包、存储到虚拟机中
编写执行脚本
touch run_spark.sh
nano run_spark.sh
#!/bin/bash
/opt/module/spark-standalone/bin/spark-submit \
--class spark02 \
--master spark://hadoop102:7077 \
--deploy-mode client \
/home/gpb/scala_spark2.jar
chmod 777 run_spark.sh
//打开终端,进入当前用户的主目录:
cd ~
//执行 crontab -e 命令编辑当前用户的定时任务配置文件。
crontab -e
//在定时任务配置文件末尾添加一行类似下面的命令,表示每 10 分钟执行一次 /home/gpb/run_spark.sh,并把输出日志重定向到 /home/gpb/run_spark.log 文件:
*/10 * * * * /bin/bash /opt/module/spark-standalone/run_spark.sh >/opt/module/spark-standalone/run_spark.log 2>&1
//按下 Ctrl+X,输入 y,确认保存。
//重启 cron 服务,让新的定时任务生效:
sudo systemctl restart cron.service
//查看日志文件 /home/gpb/run_spark.log,确认任务是否成功执行。
[gpb@hadoop102 spark-standalone]$ sudo systemctl restart cron.service
Failed to restart cron.service: Unit not found.
[gpb@hadoop102 spark-standalone]$ sudo /etc/init.d/cron restart
sudo: /etc/init.d/cron:找不到命令
[gpb@hadoop102 spark-standalone]$ sudo service crond restart
Redirecting to /bin/systemctl restart crond.service
[gpb@hadoop102 spark-standalone]$ sudo /etc/init.d/cron restart
sudo: /etc/init.d/cron:找不到命令
[gpb@hadoop102 spark-standalone]$ sudo systemctl restart crond.service
[gpb@hadoop102 spark-standalone]$ pwd
如果你想停止 crond
服务以停止所有相关的定时任务,请使用以下命令:
sudo systemctl stop crond.service
这会立即停止 crond
服务,并停止所有由它管理的任务。如果你希望禁用 crond
服务(即在系统引导时不自动启动该服务),可以使用以下命令:
sudo systemctl disable crond.service
这将禁用 crond
,并防止它在下次系统启动时被自动启动。
如果你希望在不完全禁用服务的情况下停止该服务,可以使用以下命令:
sudo systemctl stop crond.service --now
这会立即停止 crond
服务,但不会禁用它。
2、code
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.Encoders._
import java.io.ByteArrayOutputStream
import java.io.ObjectOutputStream
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.ml.feature.{IndexToString,StringIndexer,VectorIndexer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.sql.Row
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import java.util.Properties
case class Iris(features:org.apache.spark.ml.linalg.Vector,label:String)
object spark02 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Iris Logistic Regression")
.master("local[*]")
.getOrCreate()
// 导入隐式编码器
// 读取mysql数据、训练模型
import spark.implicits._
val url = "jdbc:mysql://hadoop102:3306/mydb"
val user = "root"
val password = "000000"
val table = "flowers8"
val props = new Properties()
props.put("user", user)
props.put("password", password)
val irisDF = spark.read.jdbc(url, table, props)
val data = irisDF.map { row =>
Iris(
Vectors.dense(row.getAs[Double]("sepal_length"), row.getAs[Double]("sepal_width"),
row.getAs[Double]("petal_length"), row.getAs[Double]("petal_width")),
row.getAs[String]("species")
)
}
val labelIndexer=new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data)
val featureIndexer=new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(data)
val lr=new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100).setRegParam(0.3).setElasticNetParam(0.8)
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val lrPipeline = new Pipeline().setStages(Array(labelIndexer,featureIndexer,lr,labelConverter))
val Array(trainingData,testData)=data.randomSplit(Array(0.7,0.3))
val lrPipelineModel = lrPipeline.fit(trainingData)
val lrPredictions=lrPipelineModel.transform(testData)
lrPredictions.
select("predictedLabel","label","features","probability").collect().
foreach{case Row(predictedLabel:String,label:String,features:Vector,prob:Vector)=>println(s"($label,$features) -->prob=$prob,predicted Label=$predictedLabel")}
val evaluator = new MulticlassClassificationEvaluator().
setLabelCol("indexedLabel").setPredictionCol("prediction")
val lrAccuracy = evaluator.evaluate(lrPredictions)
val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
println("Coefficients: \n "+ lrModel.coefficientMatrix++ "\nIntercept:"+lrModel.interceptVector+"\n numClasses: "+lrModel.numClasses+"\n numFeatures: "+lrModel.numFeatures)
val predictions1 = lrModel.transform(testData.withColumnRenamed("features", "indexedFeatures"))
predictions1 .show()
// 保存模型到mysql
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(lrModel)
oos.flush()
val bytes = bos.toByteArray()
val conn: Connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/mydb", "root", "000000")
val stmt: PreparedStatement = conn.prepareStatement("INSERT INTO models8 (name, content) VALUES (?, ?)")
stmt.setString(1, "my_model")
stmt.setBytes(2, bytes)
stmt.executeUpdate()
// 保存预测数据到mysql
val table_data = "predicted_flowers8"
predictions1.select("indexedFeatures", "label", "rawPrediction", "probability", "prediction").foreach { row =>
val indexedFeatures = row.getAs[org.apache.spark.ml.linalg.Vector]("indexedFeatures")
val label = row.getAs[String]("label")
val rawPrediction = row.getAs[org.apache.spark.ml.linalg.Vector]("rawPrediction")
val probability = row.getAs[org.apache.spark.ml.linalg.Vector]("probability")
val prediction = row.getAs[Double]("prediction")
val conn = java.sql.DriverManager.getConnection(url, user, password)
try {
val stmt = conn.createStatement()
val sql = s"""INSERT INTO $table_data
|(indexedFeatures, label, rawPrediction, probability, prediction)
|VALUES ('${indexedFeatures.toArray.mkString(",")}', '$label', '${rawPrediction.toArray.mkString(",")}', '${probability.toArray.mkString(",")}', $prediction)
|""".stripMargin
stmt.executeUpdate(sql)
} finally {
conn.close()
}
()
}
spark.stop()
}
}
1、pom_xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hcl.scala</groupId>
<artifactId>scala_spark1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- Spark SQL dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
0、创建数据、模型、预测表
SHOW DATABASES;
USE mydb;
SHOW TABLES;
CREATE TABLE flowers8 (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
sepal_length FLOAT,
sepal_width FLOAT,
petal_length FLOAT,
petal_width FLOAT,
species VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE models8 (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(50) NOT NULL,
content BLOB NOT NULL,
PRIMARY KEY (id),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE predicted_flowers8 (
id INT NOT NULL AUTO_INCREMENT,
indexedFeatures VARCHAR(255),
label VARCHAR(255),
rawPrediction VARCHAR(255),
probability VARCHAR(255),
prediction DOUBLE,
PRIMARY KEY (id),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
0、Windows配置scala环境
链接: scala安装包下载
链接: 配置scala环境变量
链接: idea插件scala
或者创建maven项目,配置pom_xml依赖,创建scala object文件写代码,打成jar包、在大数据平台上运行。