Fabric 对区块内交易的校验
记录区块内交易的校验结果,保存在 Block.Metadata.Metadata
中,是一个 []uint8
切片,切片索引是区块中的交易索引,保存的值是校验结果。
func (v *TxValidator) Validate(block *common.Block) error {
txsfltr := txflags.New(len(block.Data.Data)) // []uint8,校验结果状态码最大为 255,校验结果初始化成 TxValidationCode_NOT_VALIDATED
txidArray := make([]string, len(block.Data.Data))
results := make(chan *blockValidationResult)
go func() {
for tIdx, d := range block.Data.Data { // tIdx 是交易在区块中的索引
v.Semaphore.Acquire(context.Background()) // 控制并发数
go func(index int, data []byte) {
defer v.Semaphore.Release()
v.validateTx(&blockValidationRequest{ // 具体校验逻辑
d: data,
block: block,
tIdx: index,
}, results)
}(tIdx, d)
}
}()
for i := 0; i < len(block.Data.Data); i++ {
res := <-results
if res.err != nil {
// if there is an error, we buffer its value, wait for all workers to complete validation
if err == nil || res.tIdx < errPos { // 标记第一个出现 err 的 tx 的位置,记录它的 err
err = res.err
errPos = res.tIdx
}
} else {
txsfltr.SetFlag(res.tIdx, res.validationCode) // 设置校验结果
if res.validationCode == peer.TxValidationCode_VALID {
txidArray[res.tIdx] = res.txid // txidArray 记录合法交易的 tx id
}
}
}
// 返回第一个出现的 err
if err != nil {
return err
}
// 将 txidArray 中重复实现的 txid 在 txsfltr 中打上 TxValidationCode_DUPLICATE_TXID 标记
markTXIdDuplicates(txidArray, txsfltr)
// 确保所有交易都得到了校验
err = v.allValidated(txsfltr, block)
if err != nil {
return err
}
// Initialize metadata structure
protoutil.InitBlockMetadata(block)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
}
type TxValidationCode int32
const (
TxValidationCode_VALID TxValidationCode = 0
TxValidationCode_NIL_ENVELOPE TxValidationCode = 1
TxValidationCode_BAD_PAYLOAD TxValidationCode = 2
TxValidationCode_BAD_COMMON_HEADER TxValidationCode = 3
TxValidationCode_BAD_CREATOR_SIGNATURE TxValidationCode = 4
TxValidationCode_INVALID_ENDORSER_TRANSACTION TxValidationCode = 5
TxValidationCode_INVALID_CONFIG_TRANSACTION TxValidationCode = 6
TxValidationCode_UNSUPPORTED_TX_PAYLOAD TxValidationCode = 7
TxValidationCode_BAD_PROPOSAL_TXID TxValidationCode = 8
TxValidationCode_DUPLICATE_TXID TxValidationCode = 9
TxValidationCode_ENDORSEMENT_POLICY_FAILURE TxValidationCode = 10
TxValidationCode_MVCC_READ_CONFLICT TxValidationCode = 11
TxValidationCode_PHANTOM_READ_CONFLICT TxValidationCode = 12
TxValidationCode_UNKNOWN_TX_TYPE TxValidationCode = 13
TxValidationCode_TARGET_CHAIN_NOT_FOUND TxValidationCode = 14
TxValidationCode_MARSHAL_TX_ERROR TxValidationCode = 15
TxValidationCode_NIL_TXACTION TxValidationCode = 16
TxValidationCode_EXPIRED_CHAINCODE TxValidationCode = 17
TxValidationCode_CHAINCODE_VERSION_CONFLICT TxValidationCode = 18
TxValidationCode_BAD_HEADER_EXTENSION TxValidationCode = 19
TxValidationCode_BAD_CHANNEL_HEADER TxValidationCode = 20
TxValidationCode_BAD_RESPONSE_PAYLOAD TxValidationCode = 21
TxValidationCode_BAD_RWSET TxValidationCode = 22
TxValidationCode_ILLEGAL_WRITESET TxValidationCode = 23
TxValidationCode_INVALID_WRITESET TxValidationCode = 24
TxValidationCode_INVALID_CHAINCODE TxValidationCode = 25
TxValidationCode_NOT_VALIDATED TxValidationCode = 254
TxValidationCode_INVALID_OTHER_REASON TxValidationCode = 255
)
提交区块、应用交易中包含的更新到状态数据库。区块中可能包含不合法的交易,但只有校验通过的交易才会被提交到状态数据库。
func (l *kvLedger) CommitLegacy(pvtdataAndBlock *ledger.BlockAndPvtData, commitOpts *ledger.CommitOptions) error {
txstatsInfo, updateBatchBytes, err := l.txmgr.ValidateAndPrepare(pvtdataAndBlock, true)
// 提交区块
if err = l.commitToPvtAndBlockStore(pvtdataAndBlock); err != nil {
return err
}
// 提交到状态库(l.txmgr.current)
if err = l.txmgr.Commit(); err != nil {
panic(errors.WithMessage(err, "error during commit to txmgr"))
}
if l.historyDB != nil {
logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}
l.updateBlockStats(
elapsedBlockProcessing,
elapsedBlockstorageAndPvtdataCommit,
elapsedCommitState,
txstatsInfo,
)
}
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) ([]*validation.TxStatInfo, []byte, error) {
batch, txstatsInfo, err := txmgr.commitBatchPreparer.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
if err != nil {
txmgr.reset()
return nil, nil, err
}
txmgr.current = ¤t{block: block, batch: batch}
if err := txmgr.invokeNamespaceListeners(); err != nil {
txmgr.reset()
return nil, nil, err
}
updateBytes, err := deterministicBytesForPubAndHashUpdates(batch)
return txstatsInfo, updateBytes, err
}
// * ValidateAndPrepareBatch performs validation of transactions in the block and prepares the batch of final writes
func (p *CommitBatchPreparer) ValidateAndPrepareBatch(blockAndPvtdata *ledger.BlockAndPvtData,
doMVCCValidation bool) (*privacyenabledstate.UpdateBatch, []*TxStatInfo, error) {
// 包含 MVCC 校验,校验通过的 tx 才会被加入 batch,最终被应用到状态数据库
if pubAndHashUpdates, err = p.validator.validateAndPrepareBatch(internalBlock, doMVCCValidation); err != nil {
return nil, nil, err
}
if pvtUpdates, err = validateAndPreparePvtBatch(
internalBlock,
p.db,
pubAndHashUpdates,
blockAndPvtdata.PvtData,
p.customTxProcessors,
); err != nil {
return nil, nil, err
}
txsFilter := txflags.ValidationFlags(blk.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) // 类型转换
for i := range txsFilter {
txsStatInfo[i].ValidationCode = txsFilter.Flag(i)
}
return &privacyenabledstate.UpdateBatch{
PubUpdates: pubAndHashUpdates.publicUpdates,
HashUpdates: pubAndHashUpdates.hashUpdates,
PvtUpdates: pvtUpdates,
}, txsStatInfo, nil
}