| func (ins *Insert) executeInsertQueries(ctx context.Context,
 vcursor VCursor,
 rss []*srvtopo.ResolvedShard,
 queries []*querypb.BoundQuery,
 insertID int64,
 ) (*sqltypes.Result, error) {
 autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
 err := allowOnlyPrimary(rss...)
 if err != nil {
 return nil, err
 }
 --主要函数入库result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true /* rollbackOnError */, autocommit)
 if errs != nil {
 return nil, vterrors.Aggregate(errs)
 }
 
 if insertID != 0 {
 result.InsertID = uint64(insertID)
 }
 return result, nil
 }
 
 // ExecuteMultiShard is part of the engine.VCursor interface.func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
 noOfShards := len(rss)
 atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
 err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
 if err != nil {
 return nil, []error{err}
 }
 --主要函数入口qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows)
 vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
 
 return qr, errs
 }
 
 
 // ExecuteMultiShard implements the IExecutor interfacefunc (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) {
 --主要函数入口
 return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows)}
 
 // ExecuteMultiShard is like Execute,// but each shard gets its own Sql Queries and BindVariables.
 //
 // It always returns a non-nil query result and an array of
 // shard errors which may be nil so that callers can optionally
 // process a partially-successful operation.
 func (stc *ScatterConn) ExecuteMultiShard(
 ctx context.Context,
 primitive engine.Primitive,
 rss []*srvtopo.ResolvedShard,
 queries []*querypb.BoundQuery,
 session *SafeSession,
 autocommit bool,
 ignoreMaxMemoryRows bool,
 ) (qr *sqltypes.Result, errs []error) {
 
 if len(rss) != len(queries) {
 return nil, []error{vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] got mismatched number of queries and shards")}
 }
 
 // mu protects qr
 var mu sync.Mutex
 qr = new(sqltypes.Result)
 
 if session.InLockSession() && session.TriggerLockHeartBeat() {
 go stc.runLockQuery(ctx, session)
 }
 --多条数据通过goroutine并发执行
 allErrors := stc.multiGoTransaction(
 ctx,
 "Execute",
 rss,
 session,
 autocommit,
 func(rs *srvtopo.ResolvedShard, i int, info *shardActionInfo) (*shardActionInfo, error) { --函数指针
 var (
 innerqr *sqltypes.Result
 err error
 opts *querypb.ExecuteOptions
 alias *topodatapb.TabletAlias
 qs queryservice.QueryService
 )
 transactionID := info.transactionID
 reservedID := info.reservedID
 
 if session != nil && session.Session != nil {
 opts = session.Session.Options
 }
 
 if autocommit {
 // As this is auto-commit, the transactionID is supposed to be zero.
 if transactionID != int64(0) {
 return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID)
 }
 }
 
 qs, err = getQueryService(rs, info, session, false)
 if err != nil {
 return nil, err
 }
 
 retryRequest := func(exec func()) {
 retry := checkAndResetShardSession(info, err, session, rs.Target)
 switch retry {
 case newQS:
 // Current tablet is not available, try querying new tablet using gateway.
 qs = rs.Gateway
 fallthrough
 case shard:
 // if we need to reset a reserved connection, here is our chance to try executing again,
 // against a new connection
 exec()
 }
 }
 --具体执行的地方
 switch info.actionNeeded {
 case nothing:
 innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts)
 if err != nil {
 retryRequest(func() {
 // we seem to have lost our connection. it was a reserved connection, let's try to recreate it
 info.actionNeeded = reserve
 var state queryservice.ReservedState
 state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
 reservedID = state.ReservedID
 alias = state.TabletAlias
 })
 }
 case begin:
 var state queryservice.TransactionState
 state, innerqr, err = qs.BeginExecute(ctx, rs.Target, session.SavePoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts)
 transactionID = state.TransactionID
 alias = state.TabletAlias
 if err != nil {
 retryRequest(func() {
 // we seem to have lost our connection. it was a reserved connection, let's try to recreate it
 info.actionNeeded = reserveBegin
 var state queryservice.ReservedTransactionState
 state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
 transactionID = state.TransactionID
 reservedID = state.ReservedID
 alias = state.TabletAlias
 })
 }
 case reserve:
 var state queryservice.ReservedState
 state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
 reservedID = state.ReservedID
 alias = state.TabletAlias
 case reserveBegin:
 var state queryservice.ReservedTransactionState
 state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
 transactionID = state.TransactionID
 reservedID = state.ReservedID
 alias = state.TabletAlias
 default:
 return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
 }
 session.logging.log(primitive, rs.Target, rs.Gateway, queries[i].Sql, info.actionNeeded == begin || info.actionNeeded == reserveBegin, queries[i].BindVariables)
 
 // We need to new shard info irrespective of the error.
 newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias)
 if err != nil {
 return newInfo, err
 }
 mu.Lock()
 defer mu.Unlock()
 
 // Don't append more rows if row count is exceeded.
 if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows {
 qr.AppendResult(innerqr)
 }
 return newInfo, nil
 },
 )
 
 if !ignoreMaxMemoryRows && len(qr.Rows) > maxMemoryRows {
 return nil, []error{vterrors.NewErrorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.NetPacketTooLarge, "in-memory row count exceeded allowed limit of %d", maxMemoryRows)}
 }
 
 return qr, allErrors.GetErrors()
 }
 
 
 // multiGoTransaction performs the requested 'action' on the specified// ResolvedShards in parallel. For each shard, if the requested
 // session is in a transaction, it opens a new transactions on the connection,
 // and updates the Session with the transaction id. If the session already
 // contains a transaction id for the shard, it reuses it.
 // The action function must match the shardActionTransactionFunc signature.
 //
 // It returns an error recorder in which each shard error is recorded positionally,
 // i.e. if rss[2] had an error, then the error recorder will store that error
 // in the second position.
 --multiGoTransaction具体实现func (stc *ScatterConn) multiGoTransaction(
 ctx context.Context,
 name string,
 rss []*srvtopo.ResolvedShard,
 session *SafeSession,
 autocommit bool,
 action shardActionTransactionFunc,
 ) (allErrors *concurrency.AllErrorRecorder) {
 
 numShards := len(rss)
 allErrors = new(concurrency.AllErrorRecorder)
 
 if numShards == 0 {
 return allErrors
 }
 --函数指针oneShard := func(rs *srvtopo.ResolvedShard, i int) {
 var err error
 startTime, statsKey := stc.startAction(name, rs.Target)
 defer stc.endAction(startTime, allErrors, statsKey, &err, session)
 
 shardActionInfo, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.mode)
 if err != nil {
 return
 }
 updated, err := action(rs, i, shardActionInfo)
 if updated == nil {
 return
 }
 if updated.actionNeeded != nothing && (updated.transactionID != 0 || updated.reservedID != 0) {
 appendErr := session.AppendOrUpdate(&vtgatepb.Session_ShardSession{
 Target: rs.Target,
 TransactionId: updated.transactionID,
 ReservedId: updated.reservedID,
 TabletAlias: updated.alias,
 }, stc.txConn.mode)
 if appendErr != nil {
 err = appendErr
 }
 }
 }
 
 if numShards == 1 {
 // only one shard, do it synchronously.
 for i, rs := range rss {
 oneShard(rs, i)
 }
 } else {
 var wg sync.WaitGroup
 --多shard并发执行for i, rs := range rss {
 wg.Add(1)
 go func(rs *srvtopo.ResolvedShard, i int) {
 defer wg.Done()
 oneShard(rs, i)
 }(rs, i)
 }
 wg.Wait()
 }
 
 if session.MustRollback() {
 _ = stc.txConn.Rollback(ctx, session)
 }
 return allErrors
 }
 
 
 |