Fabric 创建应用通道过程的 sequence 和 version
Contents
TL;DR
对于系统通道而言,创建通道交易只是普通类型的交易,不是系统通道的更新通道配置交易。
bundle.ConfigtxValidator().Sequence()
返回 sequence
值;configtx.NewValidatorImpl()
赋值 sequence
。
系统通道初始 sequence
是 0;应用通道初始 sequence
是系统通道(创建应用通道由系统通道处理)当下的 sequence
加 1。如果是应用通道更新配置的交易,由应用通道处理,新 sequence
是应用通道的旧 sequence
加 1。
应用通道的 /Channel/Application
的初始 version
是 1。
流程分析
orderer 启动流程
lf, _, err := createLedgerFactory(conf, metricsProvider)
func NewProvider(conf *Conf, indexConfig *IndexConfig, metricsProvider metrics.Provider) (*BlockStoreProvider, error) {
// 得到 blockledger.Factory lf,字段 blkstorageProvider 是 BlockStoreProvider(底层是 levelDB)
// 初次启动时 lf.ChannelIDs() 为空(数子目录的个数得到通道的个数),初始化账本;非初次启动仅打印日志
initializeBootstrapChannel(bootstrapBlock, lf)
gl, err := lf.GetOrCreate(channelID) // 走 create,返回 blockledger.ReadWriter 类型
func (p *BlockStoreProvider) Open(ledgerid string) (*BlockStore, error) { // 创建 block store
if err := gl.Append(genesisBlock); err != nil { // 写数据库
// 选择启动集群的区块(选系统通道的最新配置块和创世块中高度较高的)
index, err := protoutil.GetLastConfigIndexFromBlock(lastBlock) // 创世块 LastConfig.Index 为 0
// 初始化
manager := initializeMultichannelRegistrar(
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
configTx := configTx(rl) // 拿通道最新配置块的第 0 个交易(配置块只有一个交易,Envelope 类型)
ledgerResources, err := r.newLedgerResources(configTx)
payload, err := protoutil.UnmarshalPayload(configTx.Payload)
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
bundle, err := channelconfig.NewBundle(chdr.ChannelId, configEnvelope.Config, r.bccsp)
configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager)
return &ValidatorImpl{
namespace: namespace,
pm: pm,
sequence: config.Sequence, // !!! 更新 sequence
configMap: configMap,
channelID: channelID,
configProto: config,
}, nil
return &Bundle{
policyManager: policyManager,
channelConfig: channelConfig,
configtxManager: configtxManager,
}, nil
func (b *Bundle) ConfigtxValidator() configtx.Validator {
return b.configtxManager
}
mutableResources: channelconfig.NewBundleSource(bundle, callbacksCopy...) // *BundleSource 类型,包了 bundle(atomic.Value)
if _, ok := ledgerResources.ConsortiumsConfig(); ok { // 是系统通道时 ok 才为 true
chain, err := newChainSupport(
r.templator = msgprocessor.NewDefaultTemplator(chain, r.bccsp)
chain.Processor = msgprocessor.NewSystemChannel(
logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s, sequence %d",
channelID, protoutil.BlockHeaderHash(genesisBlock.Header), chain.SharedConfig().ConsensusType(), chain.ConfigtxValidator().Sequence()) // sequence 是 0
r.chains[channelID] = chain
r.systemChannelID = channelID
r.systemChannel = chain
defer chain.start()
} else {
chain, err := newChainSupport(
r.chains[channelID] = chain
chain.start()
}
// gRPC
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
if err := grpcServer.Start(); err != nil {
// AtomicBroadcastServer is the server API for AtomicBroadcast service.
type AtomicBroadcastServer interface {
// broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
Broadcast(AtomicBroadcast_BroadcastServer) error
// deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
Deliver(AtomicBroadcast_DeliverServer) error
}
// ConfigEnvelope is designed to contain *_all_* configuration for a chain with no dependency
// on previous configuration transactions.
//
// It is generated with the following scheme:
// 1. Retrieve the existing configuration
// 2. Note the config properties (ConfigValue, ConfigPolicy, ConfigGroup) to be modified
// 3. Add any intermediate ConfigGroups to the ConfigUpdate.read_set (sparsely)
// 4. Add any additional desired dependencies to ConfigUpdate.read_set (sparsely)
// 5. Modify the config properties, incrementing each version by 1, set them in the ConfigUpdate.write_set
// Note: any element not modified but specified should already be in the read_set, so may be specified sparsely
// 6. Create ConfigUpdate message and marshal it into ConfigUpdateEnvelope.update and encode the required signatures
// a) Each signature is of type ConfigSignature
// b) The ConfigSignature signature is over the concatenation of signature_header and the ConfigUpdate bytes (which includes a ChainHeader)
// 5. Submit new Config for ordering in Envelope signed by submitter
// a) The Envelope Payload has data set to the marshaled ConfigEnvelope
// b) The Envelope Payload has a header of type Header.Type.CONFIG_UPDATE
//
// The configuration manager will verify:
// 1. All items in the read_set exist at the read versions
// 2. All items in the write_set at a different version than, or not in, the read_set have been appropriately signed according to their mod_policy
// 3. The new configuration satisfies the ConfigSchema
type ConfigEnvelope struct {
Config *Config `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"`
LastUpdate *Envelope `protobuf:"bytes,2,opt,name=last_update,json=lastUpdate,proto3" json:"last_update,omitempty"`
}
type Config struct {
Sequence uint64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
ChannelGroup *ConfigGroup `protobuf:"bytes,2,opt,name=channel_group,json=channelGroup,proto3" json:"channel_group,omitempty"`
}
orderer 处理创建应用通道交易
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
return s.bh.Handle(&broadcastMsgTracer{
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Broadcast",
},
})
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate()
chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
// [x] configSeq,创建两个不同名通道:SystemChannel 的 configSeq 都是 0
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Configure(config, configSeq) // 交到共识处理
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
}
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}
// ProcessConfigUpdateMsg handles messages of type CONFIG_UPDATE either for the system channel itself
// or, for channel creation. In the channel creation case, the CONFIG_UPDATE is wrapped into a resulting
// ORDERER_TRANSACTION, and in the standard CONFIG_UPDATE case, a resulting CONFIG message
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
if channelID == s.support.ChannelID() {
// 处理系统通道自身的配置更新;明确指定 StandardChannel 的 ProcessConfigUpdateMsg 方法
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
type SystemChannel struct {
*StandardChannel
templator ChannelConfigTemplator
}
}
logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())
// If the channel ID does not match the system channel, then this must be a channel creation transaction
bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
// 包装两次
newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
return wrappedOrdererTransaction, s.support.Sequence(), nil
}
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
configUpdatePayload, err := protoutil.UnmarshalPayload(envConfigUpdate.Payload)
configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
}
bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
ChannelGroup: channelGroup,
}, dt.bccsp)
}
// 校验逻辑
func (vi *ValidatorImpl) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
return vi.proposeConfigUpdate(configtx)
}
func (vi *ValidatorImpl) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
configUpdateEnv, err := protoutil.EnvelopeToConfigUpdate(configtx)
if err != nil {
return nil, errors.Errorf("error converting envelope to config update: %s", err)
}
configMap, err := vi.authorizeUpdate(configUpdateEnv)
err = vi.verifyReadSet(readSet)
if err != nil {
return nil, errors.Errorf("error authorizing update: %s", err)
}
channelGroup, err := configMapToConfig(configMap, vi.namespace)
if err != nil {
return nil, errors.Errorf("could not turn configMap back to channelGroup: %s", err)
}
return &cb.ConfigEnvelope{
Config: &cb.Config{
Sequence: vi.sequence + 1, // 这里 +1
ChannelGroup: channelGroup,
},
LastUpdate: configtx,
}, nil
}
func (vi *ValidatorImpl) verifyReadSet(readSet map[string]comparable) error {
for key, value := range readSet {
existing, ok := vi.configMap[key]
if !ok {
return errors.Errorf("existing config does not contain element for %s but was in the read set", key)
}
if existing.version() != value.version() {
return errors.Errorf("proposed update requires that key %s be at version %d, but it is currently at version %d", key, value.version(), existing.version())
} else {
log.Printf("proposed update requires that key %s be at version %d, currently at version %d", key, value.version(), existing.version())
}
}
return nil
}
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
chdr, err := protoutil.ChannelHeader(msg)
if err != nil {
return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
}
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
sysChan := r.SystemChannel()
if sysChan == nil {
return nil, false, nil, errors.New("channel creation request not allowed because the orderer system channel is not defined")
}
cs = sysChan // 创建通道 path(ChainSupport 为 SystemChannel)
}
isConfig := false
switch cs.ClassifyMsg(chdr) {
case msgprocessor.ConfigUpdateMsg: // 创建通道 path;更新锚节点 path(HeaderType_CONFIG_UPDATE 被 ClassifyMsg 归类为 ConfigUpdateMsg)
isConfig = true
case msgprocessor.ConfigMsg:
return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
default:
}
return chdr, isConfig, cs, nil
}
func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg
case int32(cb.HeaderType_ORDERER_TRANSACTION):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
case int32(cb.HeaderType_CONFIG):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
default:
return NormalMsg
}
}
// Classification represents the possible message types for the system.
type Classification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg Classification = iota
// ConfigUpdateMsg indicates messages of type CONFIG_UPDATE.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
// ConfigMsg indicates message of type ORDERER_TRANSACTION or CONFIG.
// Messages of this type should be processed by ProcessConfigMsg
ConfigMsg
)
// inside func (bw *BlockWriter) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) {
switch chdr.Type {
case int32(cb.HeaderType_ORDERER_TRANSACTION):
bw.registrar.newChain(newChannelConfig)
func (r *Registrar) newChain(configtx *cb.Envelope) {
ledgerResources, err := r.newLedgerResources(configtx)
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
block := blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})
ledgerResources.Append(block)
ledgerResources.WriteConfigBlockToSpecFile(block)
}
cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
chainID := ledgerResources.ConfigtxValidator().ChannelID()
r.chains[chainID] = cs
logger.Infof("Created and starting new channel %s, sequence %d, /CHANNEL/APPLICATION version: %d\n", chainID, ledgerResources.ConfigtxValidator().Sequence(), ledgerResources.ConfigtxValidator().ConfigProto().ChannelGroup.Groups["Application"].Version) // sequence 是 1,version 是 1
cs.start()
}
// 系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道的 sequence + 1
// [x] 如果是应用通道更新配置交易,由应用通道处理;新 sequence 是应用通道的旧 sequence + 1
case int32(cb.HeaderType_CONFIG):
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
err = bw.support.Validate(configEnvelope)
bundle, err := bw.support.CreateBundle(chdr.ChannelId, configEnvelope.Config) // HeaderType_CONFIG 类型
func (cr *configResources) CreateBundle(channelID string, config *cb.Config) (*channelconfig.Bundle, error) {
return channelconfig.NewBundle(channelID, config, cr.bccsp)
}
func NewBundle(channelID string, config *cb.Config, bccsp bccsp.BCCSP) (*Bundle, error) {
configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager) // 这里用 config 里已经更新的 sequence 创建新的 configtxManager
// 等待 WriteBlock 里的协程(有读取 Sequence 操作)结束后再更新 Sequence
// Avoid Bundle update before the go-routine in WriteBlock() finished writing the previous block.
// We do this (in particular) to prevent bw.support.Sequence() from advancing before the go-routine reads it.
// In general, this prevents the StableBundle from changing before the go-routine in WriteBlock() finishes.
bw.committingBlock.Lock()
bw.committingBlock.Unlock()
bw.support.Update(bundle)
func (cr *configResources) Update(bndl *channelconfig.Bundle) {
checkResourcesOrPanic(bndl)
cr.mutableResources.Update(bndl)
}
// // [x] version 何时被更新
// from 1: r.newLedgerResources(configTx), channelconfig.NewBundleSource(bundle, callbacksCopy...):用新 bundle 构造出 ChainSupport,加入 chains 的 map
// from 2: func (bw *BlockWriter) WriteConfigBlock, bw.support.Update(bundle)
func (bs *BundleSource) Update(newBundle *Bundle) {
b, ok := bs.bundle.Load().(*Bundle)
if ok && b != nil {
cp := b.ConfigtxValidator().ConfigProto()
log.Printf("before update: channel name %s\n%d\n", b.ConfigtxValidator().ChannelID(), cp.ChannelGroup.Groups["Application"].Version)
} else {
log.Println(ok, b)
}
bs.bundle.Store(newBundle)
for _, callback := range bs.callbacks {
callback(newBundle)
}
nb := bs.bundle.Load().(*Bundle)
cp := nb.ConfigtxValidator().ConfigProto()
if v, ok := cp.ChannelGroup.Groups["Application"]; ok {
log.Printf("after update: channel name %s, version %d\n", nb.ConfigtxValidator().ChannelID(), v.Version)
}
}
bw.WriteBlock(block, encodedMetadataValue)
// WriteBlock should be invoked for blocks which contain normal transactions.
// It sets the target block as the pending next block, and returns before it is committed.
// Before returning, it acquires the committing lock, and spawns a go routine which will
// annotate the block with metadata and signatures, and write the block to the ledger
// then release the lock. This allows the calling thread to begin assembling the next block
// before the commit phase is complete.
func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
bw.committingBlock.Lock()
bw.lastBlock = block
go func() {
defer bw.committingBlock.Unlock()
bw.commitBlock(encodedMetadataValue)
}()
}
// commitBlock should only ever be invoked with the bw.committingBlock held
// this ensures that the encoded config sequence numbers stay in sync
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
bw.addLastConfig(bw.lastBlock)
bw.addBlockSignature(bw.lastBlock, encodedMetadataValue)
err := bw.support.Append(bw.lastBlock)
if err != nil {
logger.Panicf("[channel: %s] Could not append block: %s", bw.support.ChannelID(), err)
}
logger.Debugf("[channel: %s] Wrote block [%d]", bw.support.ChannelID(), bw.lastBlock.GetHeader().Number)
}
func (bw *BlockWriter) addLastConfig(block *cb.Block) {
configSeq := bw.support.Sequence()
if configSeq > bw.lastConfigSeq {
logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfigBlockNum from %d to %d", bw.support.ChannelID(), bw.lastConfigSeq, configSeq, bw.lastConfigBlockNum, block.Header.Number)
bw.lastConfigBlockNum = block.Header.Number
bw.lastConfigSeq = configSeq
}
lastConfigValue := protoutil.MarshalOrPanic(&cb.LastConfig{Index: bw.lastConfigBlockNum})
logger.Debugf("[channel: %s] About to write block, setting its LAST_CONFIG to %d", bw.support.ChannelID(), bw.lastConfigBlockNum)
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
Value: lastConfigValue,
})
}
创世块创建流程
// configtxgen -profile TwoOrgsOrdererGenesis -channelID system-channel -outputBlock ./system-genesis-block/genesis.block --bccsp=SW
// 通用
var profileConfig *genesisconfig.Profile
profileConfig = genesisconfig.Load(profile)
func InitViper(v *viper.Viper, configName string) error {
var altPath = os.Getenv("FABRIC_CFG_PATH")
// 类别 1
if outputBlock != "" {
if err := doOutputBlock(profileConfig, channelID, outputBlock); err != nil {
if config.Orderer == nil {
return errors.Errorf("refusing to generate block which is missing orderer section")
}
if config.Consortiums == nil {
logger.Warning("Genesis block does not contain a consortiums group definition. This block cannot be used for orderer bootstrap.")
}
genesisBlock := pgen.GenesisBlockForChannel(channelID)
// Block constructs and returns a genesis block for a given channel ID.
func (f *factory) Block(channelID string) *cb.Block {
payloadChannelHeader := protoutil.MakeChannelHeader(cb.HeaderType_CONFIG, msgVersion, channelID, epoch)
payloadSignatureHeader := protoutil.MakeSignatureHeader(nil, protoutil.CreateNonceOrPanic())
protoutil.SetTxID(payloadChannelHeader, payloadSignatureHeader)
payloadHeader := protoutil.MakePayloadHeader(payloadChannelHeader, payloadSignatureHeader)
payload := &cb.Payload{Header: payloadHeader, Data: protoutil.MarshalOrPanic(&cb.ConfigEnvelope{Config: &cb.Config{ChannelGroup: f.channelGroup}})} // cb.Config.Sequence 取默认值,为 0
envelope := &cb.Envelope{Payload: protoutil.MarshalOrPanic(payload), Signature: nil}
block := protoutil.NewBlock(0, nil)
block.Data = &cb.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(envelope)}}
block.Header.DataHash = protoutil.BlockDataHash(block.Data)
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
Value: protoutil.MarshalOrPanic(&cb.LastConfig{Index: 0}),
})
block.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(&cb.Metadata{
Value: protoutil.MarshalOrPanic(&cb.OrdererBlockMetadata{
LastConfig: &cb.LastConfig{Index: 0},
}),
})
return block
}
// OrdererBlockMetadata defines metadata that is set by the ordering service.
type OrdererBlockMetadata struct {
LastConfig *LastConfig `protobuf:"bytes,1,opt,name=last_config,json=lastConfig,proto3" json:"last_config,omitempty"`
ConsenterMetadata []byte `protobuf:"bytes,2,opt,name=consenter_metadata,json=consenterMetadata,proto3" json:"consenter_metadata,omitempty"`
}
type LastConfig struct {
Index uint64 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
}
应用通道 configTx 创建流程
// configtxgen -profile TwoOrgsChannel -outputCreateChannelTx ./channel-artifacts/mychannel.tx -channelID mychannel --bccsp=SW
// 类别 2
var baseProfile *genesisconfig.Profile // nil
if outputChannelCreateTx != "" {
if err := doOutputChannelCreateTx(profileConfig, baseProfile, channelID, outputChannelCreateTx); err != nil {
configtx, err = encoder.MakeChannelCreationTransaction(channelID, nil, conf) // conf 是 profileConfig
template, err := DefaultConfigTemplate(conf)
return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template)
return protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch) // 初始为 HeaderType_CONFIG_UPDATE,func (s *SystemChannel) ProcessConfigUpdateMsg 里会重新包装消息类型
payloadChannelHeader := MakeChannelHeader(txType, msgVersion, channelID, epoch)
err = writeFile(outputChannelCreateTx, protoutil.MarshalOrPanic(configtx), 0640)
func NewChannelCreateConfigUpdate(channelID string, conf *genesisconfig.Profile, templateConfig *cb.ConfigGroup) (*cb.ConfigUpdate, error) {
if conf.Application == nil {
return nil, errors.New("cannot define a new channel with no Application section")
}
if conf.Consortium == "" {
return nil, errors.New("cannot define a new channel with no Consortium value")
}
newChannelGroup, err := NewChannelGroup(conf)
if err != nil {
return nil, errors.Wrapf(err, "could not turn parse profile into channel group")
}
updt, err := update.Compute(&cb.Config{ChannelGroup: templateConfig}, &cb.Config{ChannelGroup: newChannelGroup})
if err != nil {
return nil, errors.Wrapf(err, "could not compute update")
}
// Add the consortium name to create the channel for into the write set as required.
updt.ChannelId = channelID
updt.ReadSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{Version: 0}
updt.WriteSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
Version: 0,
Value: protoutil.MarshalOrPanic(&cb.Consortium{
Name: conf.Consortium,
}),
}
return updt, nil
}
type ConfigValue struct {
Version uint64 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
ModPolicy string `protobuf:"bytes,3,opt,name=mod_policy,json=modPolicy,proto3" json:"mod_policy,omitempty"`
}
func Compute(original, updated *cb.Config) (*cb.ConfigUpdate, error) {
readSet, writeSet, groupUpdated := computeGroupUpdate(original.ChannelGroup, updated.ChannelGroup)
return &cb.ConfigUpdate{
ReadSet: readSet,
WriteSet: writeSet,
}, nil
}
func computeGroupUpdate(original, updated *cb.ConfigGroup) (readSet, writeSet *cb.ConfigGroup, updatedGroup bool) {
return &cb.ConfigGroup{
Version: original.Version,
Policies: readSetPolicies,
Values: readSetValues,
Groups: readSetGroups,
}, &cb.ConfigGroup{
Version: original.Version + 1, // 这里 +1
Policies: writeSetPolicies,
Values: writeSetValues,
Groups: writeSetGroups,
ModPolicy: updated.ModPolicy,
}, true
}
更新 anchor peer
// 类别 3
if outputAnchorPeersUpdate != "" {
if err := doOutputAnchorPeersUpdate(profileConfig, channelID, outputAnchorPeersUpdate, asOrg); err != nil {
应用通道创建成功后得到的配置块结构
{
-"data": {
-"data": [
-{
-"payload": {
-"data": {
-"config": {
-"channel_group": {
-"groups": {
-"Application": {
+"groups": { … },
"mod_policy": "Admins",
+"policies": { … },
+"values": { … },
"version": "1"
},
+"Orderer": { … }
},
"mod_policy": "Admins",
+"policies": { … },
+"values": { … },
"version": "0"
},
"sequence": "1"
},
+"last_update": { … }
},
-"header": {
-"channel_header": {
"channel_id": "mychannel1",
"epoch": "0",
"extension": null,
"timestamp": "2022-07-20T02:35:18Z",
"tls_cert_hash": null,
"tx_id": "",
"type": 1,
"version": 0
},
+"signature_header": { … }
}
},
"signature": "MEUCIQC+YYLNGa1pAFHCm1njE6jVcMdSNyjRvfYuGvV8PPfY+QIgETpMBTVaUDjvdYqdkxf982C0/Q1YmTjVpf6Fk4chV9c="
}
]
},
+"header": { … },
+"metadata": { … }
}
其它
阻塞是因为 select
循环在运行 case sn := <-c.snapC:
,所以 case s := <-submitC
阻塞。
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
if err := c.isRunning(); err != nil {
return err
}
select {
case c.submitC <- nil:
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
return nil
}
chain 的 Sequence 已更新,但还有属于旧 Sequence 的交易进来,此时会执行重新校验。
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.