我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版,欢迎购买。点击进入详情
本文旨在概述基于微服务的架构的基础知识,并学习如何使用 Python 构建微服务。
什么是基于微服务的架构?
它是一种架构风格,将复杂的软件系统构建为一组松散耦合的服务,并通过预定义的标准(API) 相互通信。这种架构风格具有敏捷性、可扩展性以及适应不断变化的业务需求的能力等优点。
单体架构和微服务架构之间的区别?
微服务
微服务与单体服务之间的区别
入门
我们将通过构建一个允许用户下订单和查看订单的应用程序(DealBazaar)来创建一个简单的 项目。所以我们将有 4 项服务,即:
- Portal Service——仅渲染 html 内容并将 api 路由到其他服务的服务(该服务不连接到任何数据库)
- 产品管理——允许创建和检索产品的服务
- 订单管理——允许创建和检索订单的服务
- 通知网关 — 允许发送通知(电子邮件)的服务
交易集市
产品管理
产品管理 API 和模型
订单管理
订单管理 API 和模型
通知网关
通知网关 API 和模型
门户服务
门户服务 API
GET / home 和 GET /viewOrder API 只是渲染 html 页面,而不调用其他服务。
获取/订单API
发布/订单API
发布/查看订单 API
1. 设置-安装和下载
- 下载couchbase 服务器 社区 版本。下载后,创建一个包含 3 个新存储桶和 3 个用户的新集群。提供您选择的存储桶名称和用户名,并在.env文件中使用相同的名称(存储桶名称、用户名和密码)。就我而言——
Couchbase 设置详细信息
- 登录/注册
网站并生成 API 密钥和 API 密钥。同样将用于发送邮件邮寄订单提交。替换通知网关服务的.env文件中的值 - 安装flask、mailjet_rest、couchbase、load_dotenv、flask-restx
2. 代码
- cb.py - 所有数据库操作的脚本并且对所有服务通用
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.diagnostics import PingState
from couchbase.exceptions import (
CouchbaseException,
DocumentExistsException,
DocumentNotFoundException,
)
from couchbase.options import ClusterOptions
class CouchbaseClient(object):
def __init__(self, host, bucket, scope, collection, username, pw):
self.host = host
self.bucket_name = bucket
self.collection_name = collection
self.scope_name = scope
self.username = username
self.password = pw
def connect(self, **kwargs):
conn_str = f"couchbase://{self.host}"
try:
cluster_opts = ClusterOptions( authenticator=PasswordAuthenticator(self.username, self.password) )
self._cluster = Cluster(conn_str, cluster_opts, **kwargs)
except CouchbaseException as error:
print(f"Could not connect to cluster. Error: {error}")
raise
self._bucket = self._cluster.bucket(self.bucket_name)
self._collection = self._bucket.scope(self.scope_name).collection(self.collection_name)
def get(self, key):
return self._collection.get(key)
def insert(self, key, doc):
return self._collection.insert(key, doc)
def upsert(self, key, doc):
return self._collection.upsert(key, doc)
def remove(self, key):
return self._collection.remove(key)
def query(self, strQuery, *options, **kwargs):
return self._cluster.query(strQuery, *options, **kwargs)
CB.py
- app.py - 产品管理
import os
import uuid
from datetime import datetime
from dotenv import load_dotenv,find_dotenv
from couchbase.exceptions import (
CouchbaseException,
DocumentExistsException,
DocumentNotFoundException,
)
from src.cb import CouchbaseClient
from flask import Flask, request, jsonify
from flask_restx import Api, Resource, fields
app = Flask(__name__)
env_path = "./src/.env"
load_dotenv(env_path)
api = Api(app)
nsProduct = api.namespace("api/v1/products", "CRUD operations for Product")
productInsert = api.model(
"ProductInsert",
{
"productName": fields.String(required=True, description="Product Name"),
"productId": fields.String(required=True, description="Product's Unique ID"),
"price": fields.Float(required=True, description="Product Price"),
"tax": fields.Float(required=True, description="Product tax percentage"),
"description": fields.String(required=False, description="Description of product"),
"status": fields.String(required=True, description="Product Status"),
"url" : fields.String(required=True, description="Image Url of the Product")
},
)
product = api.model(
"Product",
{
"id": fields.String(required=True, description="Product's system generated Id"),
"productName": fields.String(required=True, description="Product Name"),
"productId": fields.String(required=True, description="Product's Unique ID"),
"price": fields.Float(required=True, description="Product Price"),
"tax": fields.Float(required=True, description="Product tax percentage"),
"description": fields.String(required=False, description="Description of product"),
"status": fields.String(required=True, description="Product Status"),
"url" : fields.String(required=True, description="Image Url of the Product"),
"createdAt" : fields.String(required=True, description="Time product is created")
},
)
@nsProduct.route("")
class Products(Resource):
# tag::post[]
@nsProduct.doc(
"Create Product",
reponses={201: "Created", 409: "Key alreay exists", 500: "Unexpected Error"},
)
@nsProduct.expect(productInsert, validate=True)
@nsProduct.marshal_with(product)
def post(self):
try:
data = request.json
id = uuid.uuid4().__str__()
data["id"] = id
data["createdAt"] = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
cb.insert(id, data)
return data, 201
except DocumentExistsException:
return "Key already exists", 409
except CouchbaseException as e:
return f"Unexpected error: {e}", 500
except Exception as e:
return f"Unexpected error: {e}", 500
@nsProduct.doc(
"Find Products",
reponses={200: "found", 500: "Unexpected Error"},
params={
"status": "Product is ACTIVE/INACTIVE"
},
)
def get(self):
try:
status = request.args.get("status","ACTIVE")
query = f"SELECT p.* FROM {db_info['bucket']}.{db_info['scope']}.{db_info['collection']} p WHERE p.status = $status;"
result = cb.query(query, status=status)
products = [x for x in result]
return products, 200
except Exception as e:
return f"Unexpected error: {e}", 500
@nsProduct.route("/<productId>")
class ProductId(Resource):
@nsProduct.doc(
"Get Profile",
reponses={200: "Document Found", 404: "Document Not Found", 500: "Unexpected Error"},
)
def get(self, productId):
try:
query = f"SELECT p.* FROM {db_info['bucket']}.{db_info['scope']}.{db_info['collection']} p WHERE p.productId = $productId limit 1;"
result = cb.query(query,productId=productId)
return list(result)[0] ,200
except DocumentNotFoundException:
return "Key not found", 404
except CouchbaseException as e:
return f"Unexpected error: {e}", 500
db_info = {
"host": os.getenv("DB_HOST"),
"bucket": os.getenv("BUCKET"),
"scope": os.getenv("SCOPE"),
"collection": os.getenv("COLLECTION"),
"username": os.getenv("USERNAME"),
"password": os.getenv("PASSWORD"),
}
cb = CouchbaseClient(*db_info.values())
cb.connect()
if __name__ == "__main__":
app.run(debug=True,port=5002)
app.py-产品管理
- app.py - 通知网关
import os
import uuid
from datetime import datetime
from dotenv import load_dotenv
from couchbase.exceptions import DocumentExistsException
from src.cb import CouchbaseClient
from flask import Flask, request
from flask_restx import Api, Resource, fields
from mailjet_rest import Client
app = Flask(__name__)
env_path = "./src/.env"
load_dotenv(env_path)
senderMail = os.getenv("SENDER_MAIL")
mailjet = Client(auth=(os.getenv("API_KEY"), os.getenv("API_SECRET")), version='v3.1')
api = Api(app)
nsProduct = api.namespace("api/v1/email", "Send Email")
emailInsert = api.model(
"emailInsert",
{
"orderId": fields.String(required=True, description="Order ID"),
"name": fields.String(required=True, description="Name of Customer"),
"mailId": fields.String(required=True, description="Mail Id of customer"),
"totalCost" : fields.Float(required=True, description="Total cost of order"),
},
)
emailResponse = api.model(
"emailResponse",
{
"id": fields.String(required=True, description="Audit for email sent"),
"orderId": fields.String(required=True, description="Order ID"),
"mailId": fields.String(required=True, description="Mail Id of customer"),
"status" : fields.String(required=True, description="Email Status"),
"deliveredAt" : fields.String(required=True, description="Time email is delivered"),
"statusCode" : fields.Integer(required=True, description="Status code of SMTP")
},
)
@nsProduct.route("/sendMail")
class Email(Resource):
# tag::post[]
@nsProduct.doc(
"Send Email",
reponses={200: "Success", 500: "Unexpected Error"},
)
@nsProduct.expect(emailInsert, validate=True)
@nsProduct.marshal_with(emailResponse)
def post(self):
reqData = request.json
print(reqData)
data = {
'Messages': [
{
"From": {
"Email": senderMail,
"Name": senderMail
},
"To": [
{
"Email": reqData["mailId"],
"Name": reqData["name"]
}
],
"Subject": "Greetings from DealBazaar",
"TextPart": f"Your Order {reqData['orderId']} successfully placed !",
"HTMLPart": f"<h3>Dear {reqData['name']} , Your order {reqData['orderId']} is successfully placed and the total amount of the order is {reqData['totalCost']} .Thank you for shopping with DealBazaar!"
}
]
}
response = {}
rep = mailjet.send.create(data=data)
if rep.status_code==200:
response["status"] = "SUCCESS"
response["deliveredAt"] = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
else:
response["status"] = "FAILURE"
response["deliveredAt"] = None
id = uuid.uuid4().__str__()
response["id"] = id
response["mailId"] = reqData["mailId"]
response["orderId"] = reqData["orderId"]
response["statusCode"] = rep.status_code
try :
cb.insert(id, response)
return response, 202
except DocumentExistsException:
return "Key already exists", 409
except Exception as e:
return f"Unexpected error: {e}", 500
db_info = {
"host": os.getenv("DB_HOST"),
"bucket": os.getenv("BUCKET"),
"scope": os.getenv("SCOPE"),
"collection": os.getenv("COLLECTION"),
"username": os.getenv("USERNAME"),
"password": os.getenv("PASSWORD")
}
cb = CouchbaseClient(*db_info.values())
cb.connect()
if __name__ == "__main__":
app.run(debug=True,port=5004)
app.py-通知网关
- app.py - 订单管理
import os
import uuid
import requests
from datetime import datetime
from dotenv import load_dotenv
from couchbase.exceptions import (
CouchbaseException,
DocumentExistsException,
DocumentNotFoundException,
)
from src.cb import CouchbaseClient
from flask import Flask, request
from flask_restx import Api, Resource, fields
app = Flask(__name__)
env_path = "./src/.env"
load_dotenv(env_path)
api = Api(app)
nsOrder = api.namespace("api/v1/orders", "CRUD operations for Orders")
contact = api.model(
"Contact",
{
"name": fields.String(required=True, description="Customer Name"),
"emailId": fields.String(required=True, description="emailId of customer"),
"phone": fields.String(required=True, description="Phone number of customer"),
"address": fields.String(required=True, description="Address of customer")
},
)
product = api.model(
"Product",
{
"productName": fields.String(required=True, description="Product Name"),
"productId": fields.String(required=True, description="Product's Unique ID"),
"price": fields.Float(required=True, description="Product Price"),
"tax": fields.Float(required=True, description="Product tax percentage"),
"quantity" : fields.Integer(required=True,description="Item count")
},
)
orderInsert = api.model(
"orderInsert",
{
"orderItems": fields.List(fields.Nested(product)),
"contact": fields.Nested(contact)
},
)
order = api.model(
"Order",
{
"id": fields.String(required=True, description="Product's system generated Id"),
"orderId": fields.String(required=True, description="Order Id"),
"orderItems": fields.List(fields.Nested(product)),
"contact": fields.Nested(contact),
"totalCost": fields.Float(required=True, description="Total cost of order"),
"submittedAt" : fields.String(required=True, description="Time order is submitted"),
},
)
@nsOrder.route("/submitOrder")
class Orders(Resource):
@nsOrder.doc(
"Create Product",
reponses={200: "Success", 409: "Key alreay exists", 500: "Unexpected Error"},
)
@nsOrder.expect(orderInsert, validate=True)
@nsOrder.marshal_with(order)
def post(self):
try:
data = request.json
print(data)
data["id"] = uuid.uuid4().__str__()
data["submittedAt"] = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
orderId = cb.get(os.getenv("ORDERID_KEY"))
cb.upsert( os.getenv("ORDERID_KEY"), orderId.value+1)
orderId = "ORD-"+str(orderId.value)
data["orderId"] = orderId
totalCost=0
for cost in data["orderItems"]:
totalCost += (cost["price"] * cost["quantity"]) + (cost["price"] * (cost["tax"])/100) * cost["quantity"]
data["totalCost"] = totalCost
cb.insert(orderId, data)
requestData={}
requestData["orderId"] = data["orderId"]
requestData["name"] = data["contact"]["name"]
requestData["mailId"] = data["contact"]["emailId"]
requestData["totalCost"] = data["totalCost"]
url = "http://"+os.getenv("NG_BASEURL")+":"+os.getenv("NG_PORT")+os.getenv("NG_URL")
response = requests.post(url,json=requestData)
return data, 200
except DocumentExistsException:
return "Key already exists", 409
except CouchbaseException as e:
return f"Unexpected error: {e}", 500
except Exception as e:
return f"Unexpected error: {e}", 500
@nsOrder.route("/<orderId>")
class ProductId(Resource):
@nsOrder.doc(
"Get Profile",
reponses={200: "Document Found", 404: "Document Not Found", 500: "Unexpected Error"},
)
def get(self, orderId):
try:
result = cb.get(orderId)
return result.value ,200
except DocumentNotFoundException:
return "Key not found", 404
except CouchbaseException as e:
return f"Unexpected error: {e}", 500
db_info = {
"host": os.getenv("DB_HOST"),
"bucket": os.getenv("BUCKET"),
"scope": os.getenv("SCOPE"),
"collection": os.getenv("COLLECTION"),
"username": os.getenv("USERNAME"),
"password": os.getenv("PASSWORD"),
}
cb = CouchbaseClient(*db_info.values())
cb.connect()
try:
cb.get(os.getenv("ORDERID_KEY"))
except DocumentNotFoundException:
cb.insert(os.getenv("ORDERID_KEY"),int(os.getenv("ORDERID_BASE")))
if __name__ == "__main__":
app.run(debug=True,port=5003)
app.py-订单管理
- app.py - 门户服务
import os
from dotenv import load_dotenv
import requests
from flask import Flask, request, jsonify,render_template
app = Flask(__name__)
env_path = "./src/.env"
load_dotenv(env_path)
@app.route('/')
@app.route('/home')
def home():
return render_template('home.html')
@app.route('/order',methods=['GET','POST'])
def order():
url = "http://" + os.getenv("PM_BASEURL") + ":" + os.getenv("PM_PORT") + os.getenv("PM_GETPRODUCT_URL")
response = requests.get(url).json()
if request.method == 'POST':
data={}
orderItems = []
contact = {}
for item in response:
orderItem ={}
if int(request.form[item['productId']]):
orderItem["quantity"] = int(request.form[item['productId']])
orderItem["productName"] = item["productName"]
orderItem["price"] = item["price"]
orderItem["productId"] = item["productId"]
orderItem["tax"] = item["tax"]
orderItems.append(orderItem)
contact["name"] = request.form["name"]
contact["address"] = request.form["address"]
contact["emailId"] = request.form["email"]
contact["phone"] = request.form["mobile"]
data["orderItems"] = orderItems
data["contact"] = contact
url = "http://" + os.getenv("OM_BASEURL") + ":" + os.getenv("OM_PORT") + os.getenv("OM_SUBMITORDER_URL")
response = requests.post(url, json=data)
return render_template('orderView.html',data=response.json())
return render_template('order.html',data=response)
@app.route('/viewOrder',methods=['GET','POST'])
def viewOrder():
if request.method == 'POST':
orderId = request.form['orderId']
url = "http://" + os.getenv("OM_BASEURL") + ":" + os.getenv("OM_PORT") + os.getenv("OM_GETORDER_URL") + orderId
response = requests.get(url).json()
return render_template('orderView.html',data=response)
return render_template('view.html')
if __name__ == "__main__":
app.run(debug=True,port=5001)
app.py-门户服务
3. 运行
确保您在不同的端口上运行每个服务。就我而言:
服务及其运行的端口
- 使用以下命令运行每个服务
python app.py
- 打开终端并通过执行以下curl创建一些产品
curl -H 'Content-Type: application/json' -d '{ "productName":"Neutron Gen Track","productId":"DV0001", "price": 15000 , "tax" : 10 , "url" : "https://media.istockphoto.com/id/1342734683/photo/an-accessory-for-men.jpg?s=612x612&w=0&k=20&c=KRrVxbNC34_dGQZmB7GmihvTB3OSIz2oKNs5RLrlsBQ=" , "status": "ACTIVE" ,"description" : "With New 5003 Designer Analog Watch"}' -X POST http://localhost:5002/api/v1/products
curl -H 'Content-Type: application/json' -d '{ "productName":"Proton Gen Track","productId":"DV0002", "price": 10000 , "tax" : 10 , "url" : "https://images.unsplash.com/photo-1523170335258-f5ed11844a49?q=80&w=2380&auto=format&fit=crop&ixlib=rb-4.0.3&ixid=M3wxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8fA%3D%3D" , "status": "ACTIVE" , "description" : "With New 5003 Designer Analog Watch"}' -X POST http://localhost:5002/api/v1/products
用于产品创建的 Curl 执行
- 打开任意浏览器并输入http:://127.0.0.1:5001/。将呈现 DealBazaar 的主页。
首页-DealBazaar
3. 健全性测试
- 提交订单- 点击主页中的“立即订购”按钮,选择产品、数量,填写详细信息(姓名、地址、电子邮件电话),然后点击“提交”按钮
提交订单
订单提交结果
- 查看订单-点击首页查看订单按钮,输入订单Id,点击搜索按钮
查看订单
结论
在这个故事中,我们了解了微服务架构的基础知识,并学习了如何通过简单的步骤使用 python 构建微服务。