vitess insert 代码分析

news2024/11/30 2:41:16

一、总统流程


 

二、源码分析

1.计划器

构建计划入口,每个操作,都有一个单独函数进行计划构建

func createInstructionFor(query string, stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool)

{

switch stmt := stmt.(type) {
 

case *sqlparser.Select: --查询
configuredPlanner, err := getConfiguredPlanner(vschema, buildSelectPlan, stmt, query)
if err != nil {
return nil, err
}
return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner)

case *sqlparser.Insert: --插入
return buildRoutePlan(stmt, reservedVars, vschema, buildInsertPlan)

case *sqlparser.Update: --更新
configuredPlanner, err := getConfiguredPlanner(vschema, buildUpdatePlan, stmt, query)
if err != nil {
return nil, err
}
return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner)
case *sqlparser.Delete: --删除
configuredPlanner, err := getConfiguredPlanner(vschema, buildDeletePlan, stmt, query)
if err != nil {
return nil, err
}
return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner)

...
}


 


 

以insert操作为例,insert执行计划构建函数如下:

buildInsertPlan(stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error)

{

...

--判断是否要进行分片处理

--判断依据不是依据具体sql,是否需要进行分片,而是依据当前Keyspace的配置中的sharded配置

if !rb.eroute.Keyspace.Sharded {
return buildInsertUnshardedPlan(ins, vschemaTable, reservedVars, vschema)
}
if ins.Action == sqlparser.ReplaceAct {
return nil, vterrors.VT12001("REPLACE INTO with sharded keyspace")
}
return buildInsertShardedPlan(ins, vschemaTable, reservedVars, vschema)

}

详情如下:

Keyspace: *vitess.io/vitess/go/vt/vtgate/vindexes.Keyspace

{
Name: "customer",
Sharded: true,},


 

以分片方式为例,构建分片方式insert执行计划函数如下:

func buildInsertShardedPlan(ins *sqlparser.Insert, table *vindexes.Table, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
--构建执行类,后续的具体执行的子类,是由这里确定的

eins := &engine.Insert{
Table: table,
Keyspace: table.Keyspace,
}
tc := &tableCollector{}
tc.addVindexTable(table)
eins.Ignore = bool(ins.Ignore)
if ins.OnDup != nil {
if isVindexChanging(sqlparser.UpdateExprs(ins.OnDup), eins.Table.ColumnVindexes) {
return nil, vterrors.VT12001("DML cannot update vindex column")
}
eins.Ignore = true
}
if ins.Columns == nil && table.ColumnListAuthoritative {

--分析出要插入的列
populateInsertColumnlist(ins, table)
}



applyCommentDirectives(ins, eins)


 

--获取分片方式,如hash等,确定后面计算子类调用,类似如下结果:

[]*vitess.io/vitess/go/vt/vtgate/vindexes.ColumnVindex len: 1, cap: 1, [
*{
Columns: []vitess.io/vitess/go/vt/sqlparser.IdentifierCI len: 1, cap: 1, [
(*"vitess.io/vitess/go/vt/sqlparser.IdentifierCI")(0xc00012c3c0),
],
Type: "hash",
Name: "hash",
Owned: false,
Vindex: vitess.io/vitess/go/vt/vtgate/vindexes.Vindex(*vitess.io/vitess/go/vt/vtgate/vindexes.Hash) ...,
isUnique: true,
cost: 1,
partial: false,},

...

eins.ColVindexes = getColVindexes(eins.Table.ColumnVindexes)

// Till here common plan building done for insert by providing values or select query.


 

--如果sql语句后面跟的不是变量值,则选择构建insert -select执行计划
rows, isRowValues := ins.Rows.(sqlparser.Values)
if !isRowValues {
return buildInsertSelectPlan(ins, table, reservedVars, vschema, eins)
}

--给后面执行器做判断依据
eins.Opcode = engine.InsertSharded

for _, value := range rows {
if len(ins.Columns) != len(value) {
return nil, vterrors.VT13001("column list does not match values")
}
}

if err := modifyForAutoinc(ins, eins); err != nil {
return nil, err
}

// Fill out the 3-d Values structure. Please see documentation of Insert.Values for details.
colVindexes := eins.ColVindexes
routeValues := make([][][]evalengine.Expr, len(colVindexes))
for vIdx, colVindex := range colVindexes {
routeValues[vIdx] = make([][]evalengine.Expr, len(colVindex.Columns))
for colIdx, col := range colVindex.Columns {
routeValues[vIdx][colIdx] = make([]evalengine.Expr, len(rows))
colNum := findOrAddColumn(ins, col)
for rowNum, row := range rows {
innerpv, err := evalengine.Translate(row[colNum], semantics.EmptySemTable())
if err != nil {
return nil, err
}
routeValues[vIdx][colIdx][rowNum] = innerpv
}
}
}
for _, colVindex := range colVindexes {
for _, col := range colVindex.Columns {
colNum := findOrAddColumn(ins, col)
for rowNum, row := range rows {
name := engine.InsertVarName(col, rowNum)
row[colNum] = sqlparser.NewArgument(name)
}
}
}
eins.VindexValues = routeValues

--生成执行语句,转换为以下执行语句,一是补充sql语句,二是用变量替换具体值,生成类似以下语句:

"insert into customer(customer_id, email) values (:_customer_id_0, :vtg2)"

eins.Query = generateQuery(ins)
generateInsertShardedQuery(ins, eins, rows)

--决定了具体执行的子类,更改后执行语句,变量值,以及涉及的表等
return newPlanResult(eins, tc.getTables()...), nil
}


 

2.执行器

执行器函数入口如下,通过多态方式进行接口调用

func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
for try := 0; try < MaxBufferingRetries; try++ {

--engine.Primitive是基类,具体调用的子类实在执行计划阶段已经确定了
res, err := primitive.TryExecute(ctx, vc, bindVars, wantfields)
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

以insert执行为例,执行函数如下:

func (ins *Insert) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, ins.QueryTimeout)
defer cancelFunc()

--3种执行方式:unsharded, sharded, insert-select,具体选择哪种方式,是通过ins.Opcode在构建执行计划阶段确定的值来进行判断的

switch ins.Opcode {
case InsertUnsharded:
return ins.execInsertUnsharded(ctx, vcursor, bindVars)
case InsertSharded:
return ins.execInsertSharded(ctx, vcursor, bindVars)
case InsertSelect:
return ins.execInsertFromSelect(ctx, vcursor, bindVars)
default:
// Unreachable.
return nil, fmt.Errorf("unsupported query route: %v", ins)
}
}

以分片insert执行为例,函数入口如下:

func (ins *Insert) execInsertSharded(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {

--生成insertID,看函数说明,必要的情况下才会生成,默认返回0,作用待分析
insertID, err := ins.processGenerateFromValues(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}

--生成分片路由路径
rss, queries, err := ins.getInsertShardedRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}


--根据路由路径和对应语句,进行分发执行
return ins.executeInsertQueries(ctx, vcursor, rss, queries, insertID)
}

分析下分片的选择,函数入口为:

func (ins *Insert) getInsertShardedRoute(
ctx context.Context,
vcursor VCursor,
bindVars map[string]*querypb.BindVariable,
) ([]*srvtopo.ResolvedShard, []*querypb.BoundQuery, error) {
// vindexRowsValues builds the values of all vindex columns.
// the 3-d structure indexes are colVindex, row, col. Note that
// ins.Values indexes are colVindex, col, row. So, the conversion
// involves a transpose.
// The reason we need to transpose is that all the Vindex APIs
// require inputs in that format.
vindexRowsValues := make([][]sqltypes.Row, len(ins.VindexValues))
rowCount := 0
env := evalengine.EnvWithBindVars(bindVars, vcursor.ConnCollation())
colVindexes := ins.ColVindexes
if colVindexes == nil {
colVindexes = ins.Table.ColumnVindexes
}

-- 得到每行插入的index的值
for vIdx, vColValues := range ins.VindexValues {
if len(vColValues) != len(colVindexes[vIdx].Columns) {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] supplied vindex column values don't match vschema: %v %v", vColValues, colVindexes[vIdx].Columns)
}
for colIdx, colValues := range vColValues {
rowsResolvedValues := make(sqltypes.Row, 0, len(colValues))
for _, colValue := range colValues {
result, err := env.Evaluate(colValue)
if err != nil {
return nil, nil, err
}
rowsResolvedValues = append(rowsResolvedValues, result.Value())
}
// This is the first iteration: allocate for transpose.
if colIdx == 0 {
if len(rowsResolvedValues) == 0 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] rowcount is zero for inserts: %v", rowsResolvedValues)
}
if rowCount == 0 {
rowCount = len(rowsResolvedValues)
}
if rowCount != len(rowsResolvedValues) {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] uneven row values for inserts: %d %d", rowCount, len(rowsResolvedValues))
}
vindexRowsValues[vIdx] = make([]sqltypes.Row, rowCount)
}
// Perform the transpose.
for rowNum, colVal := range rowsResolvedValues {
vindexRowsValues[vIdx][rowNum] = append(vindexRowsValues[vIdx][rowNum], colVal)
}
}
}



// The output from the following 'process' functions is a list of
// keyspace ids. For regular inserts, a failure to find a route
// results in an error. For 'ignore' type inserts, the keyspace
// id is returned as nil, which is used later to drop the corresponding rows.
if len(vindexRowsValues) == 0 || len(colVindexes) == 0 {
return nil, nil, vterrors.NewErrorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.RequiresPrimaryKey, vterrors.PrimaryVindexNotSet, ins.Table.Name)
}

--通过对值进行分片算法操作,得到相应的keyspaceIDs,后面会依据keyspaceIDs来进行分片划分,具体processPrimary分析在下面
keyspaceIDs, err := ins.processPrimary(ctx, vcursor, vindexRowsValues[0], colVindexes[0])
if err != nil {
return nil, nil, err
}

for vIdx := 1; vIdx < len(colVindexes); vIdx++ {
colVindex := colVindexes[vIdx]
var err error
if colVindex.Owned {
err = ins.processOwned(ctx, vcursor, vindexRowsValues[vIdx], colVindex, keyspaceIDs)
} else {
err = ins.processUnowned(ctx, vcursor, vindexRowsValues[vIdx], colVindex, keyspaceIDs)
}
if err != nil {
return nil, nil, err
}
}

// Build 3-d bindvars. Skip rows with nil keyspace ids in case
// we're executing an insert ignore.
for vIdx, colVindex := range colVindexes {
for rowNum, rowColumnKeys := range vindexRowsValues[vIdx] {
if keyspaceIDs[rowNum] == nil {
// InsertIgnore: skip the row.
continue
}
for colIdx, vindexKey := range rowColumnKeys {
col := colVindex.Columns[colIdx]
name := InsertVarName(col, rowNum)
bindVars[name] = sqltypes.ValueBindVariable(vindexKey)
}
}
}

// We need to know the keyspace ids and the Mids associated with
// each RSS. So we pass the ksid indexes in as ids, and get them back
// as values. We also skip nil KeyspaceIds, no need to resolve them.
var indexes []*querypb.Value
var destinations []key.Destination
for i, ksid := range keyspaceIDs {
if ksid != nil {
indexes = append(indexes, &querypb.Value{
Value: strconv.AppendInt(nil, int64(i), 10),
})
destinations = append(destinations, key.DestinationKeyspaceID(ksid))
}
}
if len(destinations) == 0 {
// In this case, all we have is nil KeyspaceIds, we don't do
// anything at all.
return nil, nil, nil
}
--通过destinations中的值,判断哪行属于哪个shard
rss, indexesPerRss, err := vcursor.ResolveDestinations(ctx, ins.Keyspace.Name, indexes, destinations)
if err != nil {
return nil, nil, err
}
--组合执行语句
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
var mids []string
for _, indexValue := range indexesPerRss[i] {
index, _ := strconv.ParseInt(string(indexValue.Value), 0, 64)
if keyspaceIDs[index] != nil {
mids = append(mids, ins.Mid[index])
}
}
rewritten := ins.Prefix + strings.Join(mids, ",") + ins.Suffix
queries[i] = &querypb.BoundQuery{
Sql: rewritten,
BindVariables: bindVars,
}
}
--返回路由表和对应语句,让下一步进行执行
return rss, queries, nil
}

分析下如何更加分片算法计算keyspaceIDs的,入口函数如下:

// processPrimary maps the primary vindex values to the keyspace ids.

--vitess的Primary vindex,类似于MySQL的primary key,Vtindex中配置的分片依据项
func (ins *Insert) processPrimary(ctx context.Context, vcursor VCursor, vindexColumnsKeys []sqltypes.Row, colVindex *vindexes.ColumnVindex) ([]ksID, error) {

--利用多态方式,决定哪种算法执行Map,vindexes具体类型在构建执行计划部分已经构建成功
destinations, err := vindexes.Map(ctx, colVindex.Vindex, vcursor, vindexColumnsKeys)
if err != nil {
return nil, err
}


keyspaceIDs := make([]ksID, len(destinations))
for i, destination := range destinations {
switch d := destination.(type) {
case key.DestinationKeyspaceID:
// This is a single keyspace id, we're good.
keyspaceIDs[i] = d
case key.DestinationNone:
// No valid keyspace id, we may return an error.
if !ins.Ignore {
return nil, fmt.Errorf("could not map %v to a keyspace id", vindexColumnsKeys[i])
}
default:
return nil, fmt.Errorf("could not map %v to a unique keyspace id: %v", vindexColumnsKeys[i], destination)
}
}

return keyspaceIDs, nil
}

以hash算法进行分片为例,函数入口如下:

// Map can map ids to key.Destination objects.

--到hash子类中执行Map
func (vind *Hash) Map(ctx context.Context, vcursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) {
out := make([]key.Destination, len(ids))
for i, id := range ids {
ksid, err := vind.Hash(id)
if err != nil {
out[i] = key.DestinationNone{}
continue
}
out[i] = key.DestinationKeyspaceID(ksid)
}
return out, nil
}

通过destinations中的值,判断哪行属于哪个shard,函数入口如下:

func (vc *vcursorImpl) ResolveDestinations(ctx context.Context, keyspace string, ids []*querypb.Value, destinations []key.Destination) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) {
rss, values, err := vc.resolver.ResolveDestinations(ctx, keyspace, vc.tabletType, ids, destinations)
if err != nil {
return nil, nil, err
}
if enableShardRouting {
rss, err = vc.fixupPartiallyMovedShards(rss)
if err != nil {
return nil, nil, err
}
}
return rss, values, err
}


 


 

// Sample input / output:
// - destinations: dst1, dst2, dst3
// - ids: id1, id2, id3
// If dst1 is in shard1, and dst2 and dst3 are in shard2, the output will be:
// - []*ResolvedShard: shard1, shard2
// - [][]*querypb.Value: [id1], [id2, id3]

func (r *Resolver) ResolveDestinations(ctx context.Context, keyspace string, tabletType topodatapb.TabletType, ids []*querypb.Value, destinations []key.Destination) ([]*ResolvedShard, [][]*querypb.Value, error) {
--从topo.server中获取所有分片信息

keyspace, _, allShards, err := r.GetKeyspaceShards(ctx, keyspace, tabletType)
if err != nil {
return nil, nil, err
}


var result []*ResolvedShard
var values [][]*querypb.Value
resolved := make(map[string]int)
for i, destination := range destinations {

--利用KeyspaceID获得对应分片,并组成result(分片信息)与values(行数值)对应结果
if err := destination.Resolve(allShards, func(shard string) error {
s, ok := resolved[shard]
if !ok {
target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
Cell: r.localCell,
}
// Right now we always set the Cell to ""
// Later we can fallback to another cell if needed.
// We would then need to read the SrvKeyspace there too.
target.Cell = ""
s = len(result)
result = append(result, &ResolvedShard{
Target: target,
Gateway: r.gateway,
})
if ids != nil {
values = append(values, nil)
}
resolved[shard] = s
}
if ids != nil {
values[s] = append(values[s], ids[i])
}
return nil
}); err != nil {
return nil, nil, err
}
}
return result, values, nil
}


 

// Resolve is part of the Destination interface.
func (d DestinationKeyspaceID) Resolve(allShards []*topodatapb.ShardReference, addShard func(shard string) error) error {

--利用KeyspaceID获得对应分片
shard, err := GetShardForKeyspaceID(allShards, d)
if err != nil {
return err
}
return addShard(shard)
}


 

// KeyRangeContains returns true if the provided id is in the keyrange.
func KeyRangeContains(kr *topodatapb.KeyRange, id []byte) bool {
if kr == nil {
return true
}
return bytes.Compare(kr.Start, id) <= 0 &&
(len(kr.End) == 0 || bytes.Compare(id, kr.End) < 0)
}


 

执行操作函数入口:

func (ins *Insert) executeInsertQueries(
ctx context.Context,
vcursor VCursor,
rss []*srvtopo.ResolvedShard,
queries []*querypb.BoundQuery,
insertID int64,
) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
err := allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}

--主要函数入库
result, errs := vcursor.ExecuteMultiShard(ctx, ins, rss, queries, true /* rollbackOnError */, autocommit)
if errs != nil {
return nil, vterrors.Aggregate(errs)
}



if insertID != 0 {
result.InsertID = uint64(insertID)
}
return result, nil
}


 

// ExecuteMultiShard is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, canAutocommit bool) (*sqltypes.Result, []error) {
noOfShards := len(rss)
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
if err != nil {
return nil, []error{err}
}

--主要函数入口
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)

return qr, errs
}
 


 

// ExecuteMultiShard implements the IExecutor interface
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) {
--主要函数入口

return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows)
}


 

// ExecuteMultiShard is like Execute,
// but each shard gets its own Sql Queries and BindVariables.
//
// It always returns a non-nil query result and an array of
// shard errors which may be nil so that callers can optionally
// process a partially-successful operation.
func (stc *ScatterConn) ExecuteMultiShard(
ctx context.Context,
primitive engine.Primitive,
rss []*srvtopo.ResolvedShard,
queries []*querypb.BoundQuery,
session *SafeSession,
autocommit bool,
ignoreMaxMemoryRows bool,
) (qr *sqltypes.Result, errs []error) {



if len(rss) != len(queries) {
return nil, []error{vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] got mismatched number of queries and shards")}
}

// mu protects qr
var mu sync.Mutex
qr = new(sqltypes.Result)

if session.InLockSession() && session.TriggerLockHeartBeat() {
go stc.runLockQuery(ctx, session)
}
--多条数据通过goroutine并发执行
allErrors := stc.multiGoTransaction(
ctx,
"Execute",
rss,
session,
autocommit,
func(rs *srvtopo.ResolvedShard, i int, info *shardActionInfo) (*shardActionInfo, error) { --函数指针
var (
innerqr *sqltypes.Result
err error
opts *querypb.ExecuteOptions
alias *topodatapb.TabletAlias
qs queryservice.QueryService
)
transactionID := info.transactionID
reservedID := info.reservedID

if session != nil && session.Session != nil {
opts = session.Session.Options
}

if autocommit {
// As this is auto-commit, the transactionID is supposed to be zero.
if transactionID != int64(0) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "in autocommit mode, transactionID should be zero but was: %d", transactionID)
}
}

qs, err = getQueryService(rs, info, session, false)
if err != nil {
return nil, err
}

retryRequest := func(exec func()) {
retry := checkAndResetShardSession(info, err, session, rs.Target)
switch retry {
case newQS:
// Current tablet is not available, try querying new tablet using gateway.
qs = rs.Gateway
fallthrough
case shard:
// if we need to reset a reserved connection, here is our chance to try executing again,
// against a new connection
exec()
}
}
--具体执行的地方
switch info.actionNeeded {
case nothing:
innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, info.transactionID, info.reservedID, opts)
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserve
var state queryservice.ReservedState
state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, 0 /*transactionId*/, opts)
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case begin:
var state queryservice.TransactionState
state, innerqr, err = qs.BeginExecute(ctx, rs.Target, session.SavePoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts)
transactionID = state.TransactionID
alias = state.TabletAlias
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
var state queryservice.ReservedTransactionState
state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
})
}
case reserve:
var state queryservice.ReservedState
state, innerqr, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
reservedID = state.ReservedID
alias = state.TabletAlias
case reserveBegin:
var state queryservice.ReservedTransactionState
state, innerqr, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), queries[i].Sql, queries[i].BindVariables, opts)
transactionID = state.TransactionID
reservedID = state.ReservedID
alias = state.TabletAlias
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
session.logging.log(primitive, rs.Target, rs.Gateway, queries[i].Sql, info.actionNeeded == begin || info.actionNeeded == reserveBegin, queries[i].BindVariables)

// We need to new shard info irrespective of the error.
newInfo := info.updateTransactionAndReservedID(transactionID, reservedID, alias)
if err != nil {
return newInfo, err
}
mu.Lock()
defer mu.Unlock()

// Don't append more rows if row count is exceeded.
if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows {
qr.AppendResult(innerqr)
}
return newInfo, nil
},
)

if !ignoreMaxMemoryRows && len(qr.Rows) > maxMemoryRows {
return nil, []error{vterrors.NewErrorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.NetPacketTooLarge, "in-memory row count exceeded allowed limit of %d", maxMemoryRows)}
}

return qr, allErrors.GetErrors()
}


 


 

// multiGoTransaction performs the requested 'action' on the specified
// ResolvedShards in parallel. For each shard, if the requested
// session is in a transaction, it opens a new transactions on the connection,
// and updates the Session with the transaction id. If the session already
// contains a transaction id for the shard, it reuses it.
// The action function must match the shardActionTransactionFunc signature.
//
// It returns an error recorder in which each shard error is recorded positionally,
// i.e. if rss[2] had an error, then the error recorder will store that error
// in the second position.

--multiGoTransaction具体实现
func (stc *ScatterConn) multiGoTransaction(
ctx context.Context,
name string,
rss []*srvtopo.ResolvedShard,
session *SafeSession,
autocommit bool,
action shardActionTransactionFunc,
) (allErrors *concurrency.AllErrorRecorder) {



numShards := len(rss)
allErrors = new(concurrency.AllErrorRecorder)

if numShards == 0 {
return allErrors
}

--函数指针
oneShard := func(rs *srvtopo.ResolvedShard, i int) {
var err error
startTime, statsKey := stc.startAction(name, rs.Target)
defer stc.endAction(startTime, allErrors, statsKey, &err, session)

shardActionInfo, err := actionInfo(ctx, rs.Target, session, autocommit, stc.txConn.mode)
if err != nil {
return
}
updated, err := action(rs, i, shardActionInfo)
if updated == nil {
return
}
if updated.actionNeeded != nothing && (updated.transactionID != 0 || updated.reservedID != 0) {
appendErr := session.AppendOrUpdate(&vtgatepb.Session_ShardSession{
Target: rs.Target,
TransactionId: updated.transactionID,
ReservedId: updated.reservedID,
TabletAlias: updated.alias,
}, stc.txConn.mode)
if appendErr != nil {
err = appendErr
}
}
}

if numShards == 1 {
// only one shard, do it synchronously.
for i, rs := range rss {
oneShard(rs, i)
}
} else {
var wg sync.WaitGroup

--多shard并发执行
for i, rs := range rss {
wg.Add(1)
go func(rs *srvtopo.ResolvedShard, i int) {
defer wg.Done()
oneShard(rs, i)
}(rs, i)
}
wg.Wait()
}

if session.MustRollback() {
_ = stc.txConn.Rollback(ctx, session)
}
return allErrors
}
 


 


 


 


 


 

笔记:

SQL = eins.Prefix+eins.Mid +eins.suffix

(dlv) p eins.Mid
[]string len: 1, cap: 1, [
"(:_customer_id_0, :vtg2)",
]
(dlv) p eins.Prefix
"insert into customer(customer_id, email) values "

(dlv) p ins.Suffix
""


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1555948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android15功能和 API 概览

Android 15 面向开发者引入了一些出色的新功能和 API。以下部分总结了这些功能&#xff0c;以帮助您开始使用相关 API。 如需查看新增、修改和移除的 API 的详细列表&#xff0c;请参阅 API 差异报告。如需详细了解新的 API&#xff0c;请访问 Android API 参考文档&#xff0…

FANUC机器人故障诊断—报警代码(一)

一、SRVO-050碰撞检测报警 [原因]检测出碰撞 [对策] 1.确认机器人是否碰撞。 2.确认是否正确进行了负载设定。 3.确认是否有过载、过度的加速度附加指令。 4.在长期停用后启动&#xff0c;或者外部气温较低时发生该报警。启动后&#xff0c;先短时间内低速运转设备&#…

Vue3:快速上手路由器

本人在B站上关于vue3的尚硅谷的课程&#xff0c;以下是整理一些笔记。 一.路由器和路由的概念 在 Vue 3 中&#xff0c;路由&#xff08;Router&#xff09;和路由器&#xff08;Router&#xff09;是两个相关但不同的概念。 1. 路由&#xff08;Router&#xff09;&#xff…

基于SpringBoot的“游戏分享网站”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“游戏分享网站”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 网站首页界面图 用户注册界面图 …

基于SpringBoot和Vue的房产销售系统的设计与实现

今天要和大家聊的是一款基于SpringBoot和Vue的房产销售系统的设计与实现 &#xff01;&#xff01;&#xff01; 有需要的小伙伴可以通过文章末尾名片咨询我哦&#xff01;&#xff01;&#xff01; &#x1f495;&#x1f495;作者&#xff1a;李同学 &#x1f495;&#x1f…

提取gdip-yolo与ia-seg中的图像自适应模块进行图像去雾与亮度增强

gdip-yolo与ia-seg都是一种将图像自适应模块插入模型前面,从而提升模型在特定数据下检测能力的网络结构。gdip-yolo提出了gdip模块,可以应用到大雾数据与低亮度数据(夜晚环境),然后用于目标检测训练;ia-seg将ia-yolo中的代码修改了一下修车了ipam模块,应用到低亮度数据(…

最优算法100例之13-输出第n个丑数

专栏主页:计算机专业基础知识总结(适用于期末复习考研刷题求职面试)系列文章https://blog.csdn.net/seeker1994/category_12585732.html 题目描述 把只包含因子2、3和5的数称作丑数(Ugly Number)。例如6、8都是丑数,但14不是,因为它包含因子7。 习惯上我们把1当…

如何使用命令行对RK开发板进行OpenHarmony版本烧录?

问题 在 OpenHarmony 自动化测试环境中&#xff0c;需要对流水线上的 RK 设备进行烧录&#xff0c;图形工具只能人工操作&#xff0c;那么有什么方法可以纯命令行进行自动化烧录呢&#xff1f; 思路 我们发现 RK 开发板实际是使用 upgrade_tool 的执行文件进行烧录的&#x…

Jenkins详细安装配置部署

目录 简介一、安装jdk二、安装jenkins这里如果熟悉 Jenkins &#xff0c;可以【选择插件来安装】&#xff0c;如果不熟悉&#xff0c;还是按照推荐来吧。注意&#xff1a; 三、插件安装如果上面插件安装&#xff0c;选择的不是【安装推荐的插件】&#xff0c;而是【选择插件来安…

JavaSE——面向对象高级三(4/5)-认识泛型、定义泛型类、定义泛型接口

目录 认识泛型 定义泛型类 定义泛型接口 认识泛型 泛型 定义类、接口、方法时&#xff0c;同时声明了一个或者多个类型变量&#xff08;如&#xff1a;<E>&#xff09;&#xff0c;称为泛型类、泛型接口、泛型方法、它们统称为泛型。 public class ArrayList<E>…

接雨水(C语言)

题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 首先我们要明白要形成容器&#xff0c;接雨水 需要右边的柱子高于本身&#xff0c;并且需保证左方有高于本身的柱子 也就是这样&#xff0c;才会形成容器 这道题的解法之一是单调栈&#xff0c;并且需保证是递…

MySQL的InnoDB引擎的事务原理以及MVCC

目录 一、事务原理 二、redo log 三、undo log 四、MVCC 1.基础概念 2.隐藏字段 3.undolog 4.readview 5.原理分析 一、事务原理 1). 事务 事务 是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或撤销操作…

k8s下搭建redis集群

记录一下近期实现的在k8s上搭建redis集群的过程 1、新建存储类 主要是为了和其它服务的存储类区分一下 redis-beta-storage 2、编写configMap redis启动时从configMap中读取配置 bind&#xff1a;默认的127.0.0.1可能会导致其它ip地址无法远程访问&#xff0c;因此修改为0.0…

【软件工程导论】——Visio与StarUML的安装

目录 &#x1f552; 1. Visio&#x1f552; 2. StarUML &#x1f552; 1. Visio 1、下载Office Tool Plus并安装&#xff1a;&#x1f50e; Office Tool Plus官网 2、打开软件 → 部署 → 添加产品 3、这里我选择Visio 2021 专业版 LTSC&#xff0c;确定&#xff0c;随后点击“…

【RedHat】使用cron安排周期性任务——周期性创建用户实例

cron用来管理周期性重复执行的任务调度&#xff0c;非常适合日常系统维护工作。计划任务分为系统的计划任务和用户自定义的计划任务。 cron服务每分钟都检查/etc/crontab文件、/etc/cron.d目录和/var/spool/cron目录中的变化。/var/spool/cron目录下的任务需要通过crontab -e 命…

鸿蒙原生应用开发-网络管理HTTP数据请求

一、场景介绍 应用通过HTTP发起一个数据请求&#xff0c;支持常见的GET、POST、OPTIONS、HEAD、PUT、DELETE、TRACE、CONNECT方法。 二、接口说明 HTTP数据请求功能主要由http模块提供。 使用该功能需要申请ohos.permission.INTERNET权限。 涉及的接口如下表&#xff0c;具体的…

MSTP环路避免实验(思科)

华为设备参考&#xff1a;MSTP环路避免实验&#xff08;华为&#xff09; 一&#xff0c;技术简介 MSTP&#xff08;多生成树协议&#xff09;&#xff0c;MSTP解决了STP和RSTP没有考虑vlan的问题&#xff0c;STP和RSTP将所有的vlan共享为一个生成树实例&#xff0c;无法实现…

如何备考2024年AMC10:吃透2000-2023年1250道真题(限时免费送)

有家长朋友问&#xff0c;有没有适合初中学生参加的奥数类比赛&#xff1f;我推荐AMC10美国数学竞赛&#xff0c;在国内可以方便地参加&#xff0c;而且每年全国各省市参加的初中生越来越多。关于AMC10详细的介绍和常见问题解答&#xff0c;可以联系我获得。 那么如何在AMC10竞…

基恩士数码显微镜数据采集项目经验分享

在最近的项目中&#xff0c;我有幸参与了基恩士数码显微镜的开发与数据采集工作。这次的经历让我对科学研究中的数据收集和分析有了更深入的了解&#xff0c;我分享我在项目中的角色以及我们所取得的成果。 作为一个科研工作者&#xff0c;我始终被数字化技术在科学研究中的作用…

消息队列经典应用场景

笔者心中,消息队列,缓存,分库分表是高并发解决方案三剑客。 在职业生涯中,笔者曾经使用过 ActiveMQ 、RabbitMQ 、Kafka 、RocketMQ 这些知名的消息队列 。 这篇文章,笔者结合自己的真实经历,和大家分享消息队列的七种经典应用场景。 1 异步&解耦 笔者曾经负责某电…