原子操作
- 原子操作(atomic operation)指的是由多步操作组成的一个操作。如果该操作不能原子地执行,则要么执行完所有步骤,要么一步也不执行,不可能只执行所有步骤的一个子集。
- 不可中断的一个或者一系列操作, 也就是不会被线程调度机制打断的操作, 运行期间不会有任何的上下文切换(context switch).
事务
- 事务(Transaction)是访问并可能更新数据库中各项数据项的一个程序执行单元(unit)。 事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成。
- 事务是一个不可分割的数据库操作序列,也是数据库并发控制的基本单位,其执行的结果必须使数据库从一种一致性状态变到另一种一致性状态。
- 事务结束有两种,事务中的步骤全部成功执行时,提交事务。如果其中一个失败,那么将会发生回滚操作,并且撤销之前的所有操作。也就是说,事务内的语句,要么全部执行成功,要么全部执行失败。
- 事务是恢复和并发控制的基本单位。
- 事务具有四个特征:原子性、一致性、隔离性和持久性。这四个特征通常称为ACID。
官方文档:
事务(GO)
支持版本
- MongoDB从 3.0版本引入WiredTiger存储引擎之后开始支持事务。
- MongoDB 3.6之前的版本只能支持单文档的事务。
- MongoDB 4.0版本开始支持复制集部署模式下的事务。
- MongoDB 4.2版本开始支持分片集群中的事务。
要获取事务支持则需要安装对应的或高版本的mongodb
和驱动(pymongo(python)
,mongo-driver(go)
)
内置的一些原子操作
事务支持
go
func (s *sessionImpl) WithTransaction(ctx context.Context, fn func(sessCtx SessionContext) (interface{}, error),
opts ...*options.TransactionOptions) (interface{}, error) {
// 超时时间为:120 * time.Second
timeout := time.NewTimer(withTransactionTimeout)
defer timeout.Stop()
var err error
for {
// 开启事务
err = s.StartTransaction(opts...)
if err != nil {
return nil, err
}
// 回调函数(会话上下文)
res, err := fn(NewSessionContext(ctx, s))
// 执行失败终止事务
if err != nil {
if s.clientSession.TransactionRunning() {
// 终止事务
_ = s.AbortTransaction(internal.NewBackgroundContext(ctx))
}
select {
case <-timeout.C:
return nil, err
default:
}
if errorHasLabel(err, driver.TransientTransactionError) {
continue
}
return res, err
}
// 判断在回调函数里面是否直接通过会话上下文终止事务了
err = s.clientSession.CheckAbortTransaction()
if err != nil {
return res, nil
}
if ctx.Err() != nil {
_ = s.AbortTransaction(internal.NewBackgroundContext(ctx))
return nil, ctx.Err()
}
// CommitLoop提交事务循环,还在上面那个for里面
CommitLoop:
for {
// 提交
err = s.CommitTransaction(ctx)
if err == nil {
// 返回成功结果 res, err := fn(NewSessionContext(ctx, s))
return res, nil
}
// 超时判断
select {
case <-timeout.C:
return res, err
default:
}
if cerr, ok := err.(CommandError); ok {
// UnknownTransactionCommitResult = "UnknownTransactionCommitResult"
if cerr.HasErrorLabel(driver.UnknownTransactionCommitResult) && !cerr.IsMaxTimeMSExpiredError() {
continue
}
// errorHasLabel:包含规定的错误信息,返回true
// TransientTransactionError = "TransientTransactionError"
if cerr.HasErrorLabel(driver.TransientTransactionError) {
break CommitLoop
}
}
return res, err
}
}
}
python
def with_transaction(
self,
callback: Callable[["ClientSession"], _T],
read_concern: Optional[ReadConcern] = None,
write_concern: Optional[WriteConcern] = None,
read_preference: Optional[_ServerMode] = None,
max_commit_time_ms: Optional[int] = None,
) -> _T:
start_time = time.monotonic()
while True:
self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms)
try:
ret = callback(self)
except Exception as exc:
if self.in_transaction:
self.abort_transaction()
if (
isinstance(exc, PyMongoError)
and exc.has_error_label("TransientTransactionError")
and _within_time_limit(start_time)
):
# Retry the entire transaction.
continue
raise
if not self.in_transaction:
# Assume callback intentionally ended the transaction.
return ret
while True:
try:
self.commit_transaction()
except PyMongoError as exc:
if (
exc.has_error_label("UnknownTransactionCommitResult")
and _within_time_limit(start_time)
and not _max_time_expired_error(exc)
):
# Retry the commit.
continue
if exc.has_error_label("TransientTransactionError") and _within_time_limit(
start_time
):
# Retry the entire transaction.
break
raise
# Commit succeeded.
return ret
跑通代码
使用更加简便的with方法
go
func main() {
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
client, err := mongo.Connect(context.TODO(), clientOptions)
collection:=client.Database().Collection()
if err != nil {
log.Fatal(err)
}
wc := writeconcern.New(writeconcern.WMajority())
txnOptions := options.Transaction().SetWriteConcern(wc)
session,_:=client.StartSession()
result,_:=session.WithTransaction(context.TODO(), func(sessCtx mongo.SessionContext) (interface{}, error) {
result,err:=collection.Find(context.TODO(),bson.M{})
return result,err
},txnOptions)
cur,ok:=result.(*mongo.Cursor)
if ok{
fmt.Println("断言成功")
for cur.Next(context.TODO()){
m:=make(map[string]interface{})
cur.Decode(m)
fmt.Println(m)
}
}else{
fmt.Println("断言失败")
}
}
python
import pymongo
client = pymongo.MongoClient()
collection = client.get_database().get_collection()
def call_back(ctx: pymongo.ContextManager):
print(collection.find_one())
with client.start_session() as session:
session.with_transaction(call_back)
其余补充
高版本的mongoApi废弃了count()方法,如果需要统计数量可以使用:
- python的
collection.estimated_document_count()
,collection.count_documents(filter)
- go的
collection.CountDocuments(ctx,filter)
,collection.EstimatedDocumentCount(ctx)