func (ins *Insert) executeInsertQueries( ctx context.Context, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, insertID int64, ) (*sqltypes.Result, error) { autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval() err := allowOnlyPrimary(rss...) if err != nil { return nil, err } --主要函数入库 result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true /* rollbackOnError */, autocommit) if errs != nil { return nil, vterrors.Aggregate(errs) }
if insertID != 0 { result.InsertID = uint64(insertID) } return result, nil } // ExecuteMultiShard is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) { noOfShards := len(rss) atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards)) err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{}) if err != nil { return nil, []error{err} } --主要函数入口 qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
return qr, errs } // ExecuteMultiShard implements the IExecutor interface func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) { --主要函数入口 return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows) } // ExecuteMultiShard is like Execute, // but each shard gets its own Sql Queries and BindVariables. // // It always returns a non-nil query result and an array of // shard errors which may be nil so that callers can optionally // process a partially-successful operation. func (stc *ScatterConn) ExecuteMultiShard( ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, ) (qr *sqltypes.Result, errs []error) {
if len(rss) != len(queries) { return nil, []error{vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] got mismatched number of queries and shards")} }
// mu protects qr var mu sync.Mutex qr = new(sqltypes.Result)
if session.InLockSession() && session.TriggerLockHeartBeat() { go stc.runLockQuery(ctx, session) } --多条数据通过goroutine并发执行 allErrors := stc.multiGoTransaction( ctx, "Execute", rss, session, autocommit, func(rs *srvtopo.ResolvedShard, i int, info *shardActionInfo) (*shardActionInfo, error) { --函数指针 var ( innerqr *sqltypes.Result err error opts *querypb.ExecuteOptions alias *topodatapb.TabletAlias qs queryservice.QueryService ) transactionID := info.transactionID reservedID := info.reservedID
if session != nil && session.Session != nil { opts = session.Session.Options }
if autocommit { // As this is auto-commit, the transactionID is supposed to be zero. if transactionID != int64(0) { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID) } }
qs, err = getQueryService(rs, info, session, false) if err != nil { return nil, err }
retryRequest := func(exec func()) { retry := checkAndResetShardSession(info, err, session, rs.Target) switch retry { case newQS: // Current tablet is not available, try querying new tablet using gateway. qs = rs.Gateway fallthrough case shard: // if we need to reset a reserved connection, here is our chance to try executing again, // against a new connection exec() } } --具体执行的地方 switch info.actionNeeded { case nothing: innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts) if err != nil { retryRequest(func() { // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserve var state queryservice.ReservedState state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts) reservedID = state.ReservedID alias = state.TabletAlias }) } case begin: var state queryservice.TransactionState state, innerqr, err = qs.BeginExecute(ctx, rs.Target, session.SavePoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts) transactionID = state.TransactionID alias = state.TabletAlias if err != nil { retryRequest(func() { // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin var state queryservice.ReservedTransactionState state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts) transactionID = state.TransactionID reservedID = state.ReservedID alias = state.TabletAlias }) } case reserve: var state queryservice.ReservedState state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts) reservedID = state.ReservedID alias = state.TabletAlias case reserveBegin: var state queryservice.ReservedTransactionState state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts) transactionID = state.TransactionID reservedID = state.ReservedID alias = state.TabletAlias default: return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded) } session.logging.log(primitive, rs.Target, rs.Gateway, queries[i].Sql, info.actionNeeded == begin || info.actionNeeded == reserveBegin, queries[i].BindVariables)
// We need to new shard info irrespective of the error. newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias) if err != nil { return newInfo, err } mu.Lock() defer mu.Unlock()
// Don't append more rows if row count is exceeded. if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows { qr.AppendResult(innerqr) } return newInfo, nil }, )
if !ignoreMaxMemoryRows && len(qr.Rows) > maxMemoryRows { return nil, []error{vterrors.NewErrorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.NetPacketTooLarge, "in-memory row count exceeded allowed limit of %d", maxMemoryRows)} }
return qr, allErrors.GetErrors() } // multiGoTransaction performs the requested 'action' on the specified // ResolvedShards in parallel. For each shard, if the requested // session is in a transaction, it opens a new transactions on the connection, // and updates the Session with the transaction id. If the session already // contains a transaction id for the shard, it reuses it. // The action function must match the shardActionTransactionFunc signature. // // It returns an error recorder in which each shard error is recorded positionally, // i.e. if rss[2] had an error, then the error recorder will store that error // in the second position. --multiGoTransaction具体实现 func (stc *ScatterConn) multiGoTransaction( ctx context.Context, name string, rss []*srvtopo.ResolvedShard, session *SafeSession, autocommit bool, action shardActionTransactionFunc, ) (allErrors *concurrency.AllErrorRecorder) {
numShards := len(rss) allErrors = new(concurrency.AllErrorRecorder)
if numShards == 0 { return allErrors } --函数指针 oneShard := func(rs *srvtopo.ResolvedShard, i int) { var err error startTime, statsKey := stc.startAction(name, rs.Target) defer stc.endAction(startTime, allErrors, statsKey, &err, session)
shardActionInfo, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.mode) if err != nil { return } updated, err := action(rs, i, shardActionInfo) if updated == nil { return } if updated.actionNeeded != nothing && (updated.transactionID != 0 || updated.reservedID != 0) { appendErr := session.AppendOrUpdate(&vtgatepb.Session_ShardSession{ Target: rs.Target, TransactionId: updated.transactionID, ReservedId: updated.reservedID, TabletAlias: updated.alias, }, stc.txConn.mode) if appendErr != nil { err = appendErr } } }
if numShards == 1 { // only one shard, do it synchronously. for i, rs := range rss { oneShard(rs, i) } } else { var wg sync.WaitGroup --多shard并发执行 for i, rs := range rss { wg.Add(1) go func(rs *srvtopo.ResolvedShard, i int) { defer wg.Done() oneShard(rs, i) }(rs, i) } wg.Wait() }
if session.MustRollback() { _ = stc.txConn.Rollback(ctx, session) } return allErrors } |