之前写过一篇jenkins+mqtt实现本地构建和远程自动发版_jenkins远程调用和本地调用-CSDN博客
由于本地搭建jenkins实在太费机器了,这次改用云效搭建。不过云效并没有直接发送mqtt的方法,需要编写中转接口。
中转接口采用go-gin框架实现,代码如下
main.go
package main
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gin-gonic/gin"
"os"
"time"
)
func main() {
router := gin.Default()
router.POST("/publish/notify", func(c *gin.Context) {
obj := struct {
App string `json:"app"`
Link string `json:"link"`
}{}
err := c.BindJSON(&obj)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(obj)
}
//把读取到的json透传发送到mqtt服务器
Publish(obj.App, obj.Link, "/publish/notify")
c.JSON(200, gin.H{
"msg": "this is a post msg",
})
})
router.POST("/publish/notifyProd", func(c *gin.Context) {
obj := struct {
App string `json:"app"`
Link string `json:"link"`
}{}
err := c.BindJSON(&obj)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(obj)
}
//把读取到的json透传发送到mqtt服务器
Publish(obj.App, obj.Link, "/publish/notifyProd")
c.JSON(200, gin.H{
"msg": "this is a post msg",
})
})
// 默认端口是8080,也可以指定端口 r.Run(":80")
router.Run(":80")
}
func Publish(app string, link string, topic string) {
broker := "tcp://broker.emqx.io:1883"
clientId := "go-mqtt-client"
opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientId)
opts.SetUsername("") // 设置用户名
opts.SetPassword("") // 设置密码
opts.SetCleanSession(true)
opts.SetKeepAlive(2 * time.Second)
opts.SetDefaultPublishHandler(f)
opts.OnConnect = func(c mqtt.Client) {
fmt.Println("Connected")
}
opts.OnConnectionLost = func(c mqtt.Client, e error) {
fmt.Println("Disconnected")
}
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 构建JSON消息
message := map[string]string{
"app": app,
"link": link,
}
jsonMessage, err := json.Marshal(message)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
os.Exit(1)
}
// 发布消息
if token := c.Publish(topic, 0, false, jsonMessage); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 等待一段时间以确保消息被发送
time.Sleep(2 * time.Second)
c.Disconnect(250)
}
func f(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
云效调用
在构建制品后选择执行命令
命令内容填写
# input your command here
# 构造 JSON 参数
json_data=$(jq -n --arg app "myapp" --arg link "${artifacts}" '{app: $app, link: $link}')
# 打印 JSON 参数(可选)
echo "$json_data"
# 发送 HTTP POST 请求
curl -X POST "http://transfer.example.com/publish/notify" \
-H "Content-Type: application/json" \
-d "$json_data"
将myapp替换成你想改的app名,transfer.example.com改成你部署在公网的中转接口域名或ip
上面用到了${artifacts}参数,需要在云效中添加artifacts=制品名称xxx
之后在部署的服务器上部署发布的客户端
客户端的逻辑跟上一篇文章类似,代码如下
main.go
package main
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"os/signal"
"strings"
"time"
"github.com/bitly/go-simplejson"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// 全局变量,存储程序启动时的当前工作目录
var baseDir string
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
go handleMessage(client, msg)
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
// 获取程序启动时的当前工作目录
var err error
baseDir, err = os.Getwd()
if err != nil {
log.Fatalf("获取当前工作目录失败: %v", err)
}
fmt.Println("程序启动时的当前工作目录:", baseDir)
//合建chan
c := make(chan os.Signal)
//监听指定信号 ctrl+c kill
signal.Notify(c, os.Interrupt, os.Kill)
//阻塞直到有信号传入
fmt.Println("启动")
//执行具体方法
initmqtt()
//阻塞直至有信号传入
s := <-c
fmt.Println("退出信号", s)
}
func initmqtt() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
}
func sub(client mqtt.Client) {
topic := "/publish/notify"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s\n", topic)
}
// readFile 使用ioutil.ReadFile 直接从文件读取到 []byte中
func readFile(fileName string) string {
f, err := ioutil.ReadFile(fileName)
if err != nil {
log.Printf("读取文件失败:%#v", err)
return ""
}
return string(f)
}
// 读取消息的发布模块名和链接
func readIssueModule(issuejson string) (string, string) {
buf := bytes.NewBuffer([]byte(issuejson))
js, _ := simplejson.NewFromReader(buf)
var each_map = make(map[string]interface{})
each_map, _ = js.Map()
app := each_map["app"].(string)
link := each_map["link"].(string)
return app, link
}
// 根据模块名和模块匹配本地json模块发布
func issueModuleLocalJson(app string, link string, localjson string) {
//读本地配置 war包路径 存储命令等
fmt.Println(localjson)
buf := bytes.NewBuffer([]byte(localjson))
js, _ := simplejson.NewFromReader(buf)
fmt.Println(js)
//获取json字符串中的 数组
rows, _ := js.Array()
fmt.Println(rows)
//遍历rows数组
for _, row := range rows {
each_map := row.(map[string]interface{})
jsonapp := each_map["app"].(string)
warpath := each_map["warpath"].(string)
warname := each_map["warname"].(string)
backpath := each_map["backpath"].(string)
stop := each_map["stop"].(string)
start := each_map["start"].(string)
transfer := each_map["transfer"].(string)
unzip := each_map["unzip"].(string)
if jsonapp == app {
fmt.Println("找到对应模块" + app)
// 创建备份目录
fmt.Println("创建备份目录" + backpath)
exec_shell("mkdir -p " + backpath)
// 获取当前时间并格式化
timestamp := time.Now().Format("20060102150405")
backupDir := fmt.Sprintf("%s/%s/%s", backpath, app, timestamp)
// 创建备份子目录
fmt.Println("创建备份子目录" + backupDir)
err := os.MkdirAll(backupDir, 0755)
if err != nil {
fmt.Println("创建备份子目录失败:", err)
continue
}
// 改变当前工作目录
err = os.Chdir(backupDir)
if err != nil {
fmt.Println("改变目录失败:", err)
continue
}
pwd, _ := os.Getwd()
fmt.Println("当前目录" + pwd)
if strings.Contains(link, "&") {
link = "'" + link + "'"
}
fmt.Println("下载文件" + link)
exec_shell(transfer + " " + link)
fmt.Println("解压文件")
exec_shell(unzip)
fmt.Println("停止服务")
exec_shell(stop)
fmt.Println("拷贝文件到对应目录")
exec_shell("cp -rf" + " " + warname + " " + warpath)
fmt.Println("启动服务")
exec_shell(start)
fmt.Println("完成本次发布")
break
}
}
}
// 阻塞式的执行外部shell命令的函数,等待执行完毕并返回标准输出
func exec_shell(s string) (string, error) {
cmd := exec.Command("/bin/bash", "-c", s)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
checkErr(err)
return out.String(), err
}
// 错误处理函数
func checkErr(err error) {
if err != nil {
fmt.Println(err)
panic(err)
}
}
// 处理MQTT消息的函数
func handleMessage(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
//读取发布配置,备份war包,替换war包,重启tomcat或者docker
issuejson := string(msg.Payload())
fmt.Println(issuejson)
//读本地配置 war包路径 存储命令等
localjson := readFile(baseDir + "/" + "jenkinsmqtt.json")
//关于json的配置说明
/*app: 应用的名称。
warpath: 应用的部署路径。
warname: 应用的 WAR 文件名。
backpath: 备份路径。
stop: 停止应用的命令。
start: 启动应用的命令。
restart: 重启应用的命令。
transfer: 下载应用包的命令。
unzip: 解压应用包的命令。*/
fmt.Println(localjson)
//发布模块解析
app, link := readIssueModule(issuejson)
fmt.Println(app)
fmt.Println(link)
//模块发布
issueModuleLocalJson(app, link, localjson)
}
jenkinsmqtt.json示例
[
{
"app": "myapp",
"warpath": "/data/app/myapp/webapps",
"warname": "myapp.war",
"backpath": "/data/app/cibak",
"stop": "docker stop myapp",
"start": "docker start myapp",
"restart": "docker restart myapp",
"transfer": "wget -O myapp.tgz ",
"unzip": "tar -zxvf myapp.tgz"
}
]