TL;DR

对于系统通道而言,创建通道交易只是普通类型的交易,不是系统通道的更新通道配置交易。

bundle.ConfigtxValidator().Sequence() 返回 sequence 值;configtx.NewValidatorImpl() 赋值 sequence

系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道(创建应用通道由系统通道处理)当下的 sequence 加 1。如果是应用通道更新配置的交易,由应用通道处理,新 sequence 是应用通道的旧 sequence 加 1。

应用通道的 /Channel/Application 的初始 version 是 1。

流程分析

orderer 启动流程

lf, _, err := createLedgerFactory(conf, metricsProvider)
    func NewProvider(conf *Conf, indexConfig *IndexConfig, metricsProvider metrics.Provider) (*BlockStoreProvider, error) {
// 得到 blockledger.Factory lf,字段 blkstorageProvider 是 BlockStoreProvider(底层是 levelDB)

// 初次启动时 lf.ChannelIDs() 为空(数子目录的个数得到通道的个数),初始化账本;非初次启动仅打印日志
initializeBootstrapChannel(bootstrapBlock, lf)
    gl, err := lf.GetOrCreate(channelID) // 走 create,返回 blockledger.ReadWriter 类型
        func (p *BlockStoreProvider) Open(ledgerid string) (*BlockStore, error) { // 创建 block store
    if err := gl.Append(genesisBlock); err != nil { // 写数据库

// 选择启动集群的区块(选系统通道的最新配置块和创世块中高度较高的)
    index, err := protoutil.GetLastConfigIndexFromBlock(lastBlock) // 创世块 LastConfig.Index 为 0

// 初始化
manager := initializeMultichannelRegistrar(
    func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
        configTx := configTx(rl) // 拿通道最新配置块的第 0 个交易(配置块只有一个交易,Envelope 类型)
        ledgerResources, err := r.newLedgerResources(configTx)
            payload, err := protoutil.UnmarshalPayload(configTx.Payload)   
            configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data) 
            bundle, err := channelconfig.NewBundle(chdr.ChannelId, configEnvelope.Config, r.bccsp)
                configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager)
                    return &ValidatorImpl{
                        namespace:   namespace,
                        pm:          pm,
                        sequence:    config.Sequence, // !!! 更新 sequence
                        configMap:   configMap,
                        channelID:   channelID,
                        configProto: config,
                    }, nil
                return &Bundle{
                    policyManager:   policyManager,
                    channelConfig:   channelConfig,
                    configtxManager: configtxManager,
                }, nil

                func (b *Bundle) ConfigtxValidator() configtx.Validator {
                    return b.configtxManager
                }
            mutableResources: channelconfig.NewBundleSource(bundle, callbacksCopy...) // *BundleSource 类型,包了 bundle(atomic.Value)

        if _, ok := ledgerResources.ConsortiumsConfig(); ok { // 是系统通道时 ok 才为 true
            chain, err := newChainSupport(
            r.templator = msgprocessor.NewDefaultTemplator(chain, r.bccsp)
			chain.Processor = msgprocessor.NewSystemChannel(

			logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s, sequence %d",
			channelID, protoutil.BlockHeaderHash(genesisBlock.Header), chain.SharedConfig().ConsensusType(), chain.ConfigtxValidator().Sequence()) // sequence 是 0
            r.chains[channelID] = chain
			r.systemChannelID = channelID
			r.systemChannel = chain
            defer chain.start()
        } else {
            chain, err := newChainSupport(
            r.chains[channelID] = chain
			chain.start()
        }

// gRPC
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
if err := grpcServer.Start(); err != nil {
    // AtomicBroadcastServer is the server API for AtomicBroadcast service.
    type AtomicBroadcastServer interface {
        // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
        Broadcast(AtomicBroadcast_BroadcastServer) error
        // deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
        Deliver(AtomicBroadcast_DeliverServer) error
    }

// ConfigEnvelope is designed to contain *_all_* configuration for a chain with no dependency
// on previous configuration transactions.
//
// It is generated with the following scheme:
//   1. Retrieve the existing configuration
//   2. Note the config properties (ConfigValue, ConfigPolicy, ConfigGroup) to be modified
//   3. Add any intermediate ConfigGroups to the ConfigUpdate.read_set (sparsely)
//   4. Add any additional desired dependencies to ConfigUpdate.read_set (sparsely)
//   5. Modify the config properties, incrementing each version by 1, set them in the ConfigUpdate.write_set
//      Note: any element not modified but specified should already be in the read_set, so may be specified sparsely
//   6. Create ConfigUpdate message and marshal it into ConfigUpdateEnvelope.update and encode the required signatures
//     a) Each signature is of type ConfigSignature
//     b) The ConfigSignature signature is over the concatenation of signature_header and the ConfigUpdate bytes (which includes a ChainHeader)
//   5. Submit new Config for ordering in Envelope signed by submitter
//     a) The Envelope Payload has data set to the marshaled ConfigEnvelope
//     b) The Envelope Payload has a header of type Header.Type.CONFIG_UPDATE
//
// The configuration manager will verify:
//   1. All items in the read_set exist at the read versions
//   2. All items in the write_set at a different version than, or not in, the read_set have been appropriately signed according to their mod_policy
//   3. The new configuration satisfies the ConfigSchema
type ConfigEnvelope struct {
	Config               *Config   `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"`
	LastUpdate           *Envelope `protobuf:"bytes,2,opt,name=last_update,json=lastUpdate,proto3" json:"last_update,omitempty"`
}
    type Config struct {
        Sequence             uint64       `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
        ChannelGroup         *ConfigGroup `protobuf:"bytes,2,opt,name=channel_group,json=channelGroup,proto3" json:"channel_group,omitempty"`
    }

orderer 处理创建应用通道交易

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
    return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}
		resp := bh.ProcessMessage(msg, addr)
		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}
		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}
}
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
	tracker := &MetricsTracker{
		ChannelID: "unknown",
		TxType:    "unknown",
		Metrics:   bh.Metrics,
	}
	defer func() {
		// This looks a little unnecessary, but if done directly as
		// a defer, resp gets the (always nil) current state of resp
		// and not the return value
		tracker.Record(resp)
	}()
	tracker.BeginValidate()
	chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
	if chdr != nil {
		tracker.ChannelID = chdr.ChannelId
		tracker.TxType = cb.HeaderType(chdr.Type).String()
	}
	if err != nil {
		logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
		return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
	}
	if !isConfig {
		logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
		configSeq, err := processor.ProcessNormalMsg(msg)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
		}
		tracker.EndValidate()
		tracker.BeginEnqueue()
		if err = processor.WaitReady(); err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
		err = processor.Order(msg, configSeq)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
	} else { // isConfig
		logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
        // [x] configSeq,创建两个不同名通道:SystemChannel 的 configSeq 都是 0
		config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
		}
		tracker.EndValidate()
		tracker.BeginEnqueue()
		if err = processor.WaitReady(); err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
		err = processor.Configure(config, configSeq) // 交到共识处理
		if err != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
			return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
		}
	}
	logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
	return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

// ProcessConfigUpdateMsg handles messages of type CONFIG_UPDATE either for the system channel itself
// or, for channel creation.  In the channel creation case, the CONFIG_UPDATE is wrapped into a resulting
// ORDERER_TRANSACTION, and in the standard CONFIG_UPDATE case, a resulting CONFIG message
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    if channelID == s.support.ChannelID() {
		// 处理系统通道自身的配置更新;明确指定 StandardChannel 的 ProcessConfigUpdateMsg 方法
		return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate) 

            type SystemChannel struct {
                *StandardChannel
                templator ChannelConfigTemplator
            }
	}

	logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())
	// If the channel ID does not match the system channel, then this must be a channel creation transaction
	bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
	// 包装两次
    newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    return wrappedOrdererTransaction, s.support.Sequence(), nil
}

func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
	configUpdatePayload, err := protoutil.UnmarshalPayload(envConfigUpdate.Payload)
	configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
	configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
	if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
		return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
	}
	bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
		ChannelGroup: channelGroup,
	}, dt.bccsp)
}

// 校验逻辑
func (vi *ValidatorImpl) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
	return vi.proposeConfigUpdate(configtx)
}
func (vi *ValidatorImpl) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {
	configUpdateEnv, err := protoutil.EnvelopeToConfigUpdate(configtx)
	if err != nil {
		return nil, errors.Errorf("error converting envelope to config update: %s", err)
	}
	configMap, err := vi.authorizeUpdate(configUpdateEnv)
	err = vi.verifyReadSet(readSet)
	if err != nil {
		return nil, errors.Errorf("error authorizing update: %s", err)
	}
	channelGroup, err := configMapToConfig(configMap, vi.namespace)
	if err != nil {
		return nil, errors.Errorf("could not turn configMap back to channelGroup: %s", err)
	}
	return &cb.ConfigEnvelope{
		Config: &cb.Config{
			Sequence:     vi.sequence + 1, // 这里 +1
			ChannelGroup: channelGroup,
		},
		LastUpdate: configtx,
	}, nil
}
func (vi *ValidatorImpl) verifyReadSet(readSet map[string]comparable) error {
	for key, value := range readSet {
		existing, ok := vi.configMap[key]
		if !ok {
			return errors.Errorf("existing config does not contain element for %s but was in the read set", key)
		}
		if existing.version() != value.version() {
			return errors.Errorf("proposed update requires that key %s be at version %d, but it is currently at version %d", key, value.version(), existing.version())
		} else {
			log.Printf("proposed update requires that key %s be at version %d, currently at version %d", key, value.version(), existing.version())
		}
	}
	return nil
}

// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
	chdr, err := protoutil.ChannelHeader(msg)
	if err != nil {
		return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
	}
	cs := r.GetChain(chdr.ChannelId)
	// New channel creation
	if cs == nil {
		sysChan := r.SystemChannel()
		if sysChan == nil {
			return nil, false, nil, errors.New("channel creation request not allowed because the orderer system channel is not defined")
		}
		cs = sysChan    // 创建通道 path(ChainSupport 为 SystemChannel)
	}
	isConfig := false
	switch cs.ClassifyMsg(chdr) {
	case msgprocessor.ConfigUpdateMsg: // 创建通道 path;更新锚节点 path(HeaderType_CONFIG_UPDATE 被 ClassifyMsg 归类为 ConfigUpdateMsg)
		isConfig = true
	case msgprocessor.ConfigMsg:
		return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
	default:
	}
	return chdr, isConfig, cs, nil
}
	func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
		switch chdr.Type {
		case int32(cb.HeaderType_CONFIG_UPDATE):
			return ConfigUpdateMsg
		case int32(cb.HeaderType_ORDERER_TRANSACTION):
			// In order to maintain backwards compatibility, we must classify these messages
			return ConfigMsg
		case int32(cb.HeaderType_CONFIG):
			// In order to maintain backwards compatibility, we must classify these messages
			return ConfigMsg
		default:
			return NormalMsg
		}
	}
    // Classification represents the possible message types for the system.
    type Classification int
    const (
        // NormalMsg is the class of standard (endorser or otherwise non-config) messages.
        // Messages of this type should be processed by ProcessNormalMsg.
        NormalMsg Classification = iota
        // ConfigUpdateMsg indicates messages of type CONFIG_UPDATE.
        // Messages of this type should be processed by ProcessConfigUpdateMsg.
        ConfigUpdateMsg
        // ConfigMsg indicates message of type ORDERER_TRANSACTION or CONFIG.
        // Messages of this type should be processed by ProcessConfigMsg
        ConfigMsg
    )

// inside func (bw *BlockWriter) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) {
switch chdr.Type {
case int32(cb.HeaderType_ORDERER_TRANSACTION):
	bw.registrar.newChain(newChannelConfig) 
	func (r *Registrar) newChain(configtx *cb.Envelope) {
		ledgerResources, err := r.newLedgerResources(configtx)
		// If we have no blocks, we need to create the genesis block ourselves.
		if ledgerResources.Height() == 0 {
			block := blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})
			ledgerResources.Append(block)
			ledgerResources.WriteConfigBlockToSpecFile(block)
		}
		cs, err := newChainSupport(r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, r.bccsp)
		chainID := ledgerResources.ConfigtxValidator().ChannelID()
		r.chains[chainID] = cs
		logger.Infof("Created and starting new channel %s, sequence %d, /CHANNEL/APPLICATION version: %d\n", chainID, ledgerResources.ConfigtxValidator().Sequence(), ledgerResources.ConfigtxValidator().ConfigProto().ChannelGroup.Groups["Application"].Version) // sequence 是 1,version 是 1
		cs.start()
	}

// 系统通道初始 sequence 是 0;应用通道初始 sequence 是系统通道的 sequence + 1
// [x] 如果是应用通道更新配置交易,由应用通道处理;新 sequence 是应用通道的旧 sequence + 1

case int32(cb.HeaderType_CONFIG):
	configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
	err = bw.support.Validate(configEnvelope)
	bundle, err := bw.support.CreateBundle(chdr.ChannelId, configEnvelope.Config) // HeaderType_CONFIG 类型
		func (cr *configResources) CreateBundle(channelID string, config *cb.Config) (*channelconfig.Bundle, error) {
			return channelconfig.NewBundle(channelID, config, cr.bccsp)
		}
			func NewBundle(channelID string, config *cb.Config, bccsp bccsp.BCCSP) (*Bundle, error) {
				configtxManager, err := configtx.NewValidatorImpl(channelID, config, RootGroupKey, policyManager) // 这里用 config 里已经更新的 sequence 创建新的 configtxManager
	// 等待 WriteBlock 里的协程(有读取 Sequence 操作)结束后再更新 Sequence
	// Avoid Bundle update before the go-routine in WriteBlock() finished writing the previous block.
	// We do this (in particular) to prevent bw.support.Sequence() from advancing before the go-routine reads it.
	// In general, this prevents the StableBundle from changing before the go-routine in WriteBlock() finishes.
	bw.committingBlock.Lock()
	bw.committingBlock.Unlock()
	bw.support.Update(bundle)
		func (cr *configResources) Update(bndl *channelconfig.Bundle) {
			checkResourcesOrPanic(bndl)
			cr.mutableResources.Update(bndl)
		}
			// // [x] version 何时被更新
			// from 1: r.newLedgerResources(configTx), channelconfig.NewBundleSource(bundle, callbacksCopy...):用新 bundle 构造出 ChainSupport,加入 chains 的 map
			// from 2: func (bw *BlockWriter) WriteConfigBlock, bw.support.Update(bundle) 
			func (bs *BundleSource) Update(newBundle *Bundle) {
				b, ok := bs.bundle.Load().(*Bundle)
				if ok && b != nil {
					cp := b.ConfigtxValidator().ConfigProto()
					log.Printf("before update: channel name %s\n%d\n", b.ConfigtxValidator().ChannelID(), cp.ChannelGroup.Groups["Application"].Version)
				} else {
					log.Println(ok, b)
				}
				bs.bundle.Store(newBundle)
				for _, callback := range bs.callbacks {
					callback(newBundle)
				}
				nb := bs.bundle.Load().(*Bundle)
				cp := nb.ConfigtxValidator().ConfigProto()
				if v, ok := cp.ChannelGroup.Groups["Application"]; ok {
					log.Printf("after update: channel name %s, version %d\n", nb.ConfigtxValidator().ChannelID(), v.Version)
				}
			}
bw.WriteBlock(block, encodedMetadataValue)
	// WriteBlock should be invoked for blocks which contain normal transactions.
	// It sets the target block as the pending next block, and returns before it is committed.
	// Before returning, it acquires the committing lock, and spawns a go routine which will
	// annotate the block with metadata and signatures, and write the block to the ledger
	// then release the lock.  This allows the calling thread to begin assembling the next block
	// before the commit phase is complete.
	func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
		bw.committingBlock.Lock()
		bw.lastBlock = block
		go func() {
			defer bw.committingBlock.Unlock()
			bw.commitBlock(encodedMetadataValue)
		}()
	}

// commitBlock should only ever be invoked with the bw.committingBlock held
// this ensures that the encoded config sequence numbers stay in sync
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
	bw.addLastConfig(bw.lastBlock)
	bw.addBlockSignature(bw.lastBlock, encodedMetadataValue)
	err := bw.support.Append(bw.lastBlock)
	if err != nil {
		logger.Panicf("[channel: %s] Could not append block: %s", bw.support.ChannelID(), err)
	}
	logger.Debugf("[channel: %s] Wrote block [%d]", bw.support.ChannelID(), bw.lastBlock.GetHeader().Number)
}
func (bw *BlockWriter) addLastConfig(block *cb.Block) {
	configSeq := bw.support.Sequence()
	if configSeq > bw.lastConfigSeq {
		logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfigBlockNum from %d to %d", bw.support.ChannelID(), bw.lastConfigSeq, configSeq, bw.lastConfigBlockNum, block.Header.Number)
		bw.lastConfigBlockNum = block.Header.Number
		bw.lastConfigSeq = configSeq
	}
	lastConfigValue := protoutil.MarshalOrPanic(&cb.LastConfig{Index: bw.lastConfigBlockNum})
	logger.Debugf("[channel: %s] About to write block, setting its LAST_CONFIG to %d", bw.support.ChannelID(), bw.lastConfigBlockNum)
	block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
		Value: lastConfigValue,
	})
}

创世块创建流程

// configtxgen -profile TwoOrgsOrdererGenesis -channelID system-channel -outputBlock ./system-genesis-block/genesis.block --bccsp=SW
// 通用
var profileConfig *genesisconfig.Profile
profileConfig = genesisconfig.Load(profile)
    func InitViper(v *viper.Viper, configName string) error {
	    var altPath = os.Getenv("FABRIC_CFG_PATH")

// 类别 1
if outputBlock != "" {
    if err := doOutputBlock(profileConfig, channelID, outputBlock); err != nil {

if config.Orderer == nil {
    return errors.Errorf("refusing to generate block which is missing orderer section")
}
if config.Consortiums == nil {
    logger.Warning("Genesis block does not contain a consortiums group definition.  This block cannot be used for orderer bootstrap.")
}
genesisBlock := pgen.GenesisBlockForChannel(channelID)
    // Block constructs and returns a genesis block for a given channel ID.
    func (f *factory) Block(channelID string) *cb.Block {
        payloadChannelHeader := protoutil.MakeChannelHeader(cb.HeaderType_CONFIG, msgVersion, channelID, epoch)
        payloadSignatureHeader := protoutil.MakeSignatureHeader(nil, protoutil.CreateNonceOrPanic())
        protoutil.SetTxID(payloadChannelHeader, payloadSignatureHeader)
        payloadHeader := protoutil.MakePayloadHeader(payloadChannelHeader, payloadSignatureHeader)
        payload := &cb.Payload{Header: payloadHeader, Data: protoutil.MarshalOrPanic(&cb.ConfigEnvelope{Config: &cb.Config{ChannelGroup: f.channelGroup}})} // cb.Config.Sequence 取默认值,为 0
        envelope := &cb.Envelope{Payload: protoutil.MarshalOrPanic(payload), Signature: nil}
        block := protoutil.NewBlock(0, nil)
        block.Data = &cb.BlockData{Data: [][]byte{protoutil.MarshalOrPanic(envelope)}}
        block.Header.DataHash = protoutil.BlockDataHash(block.Data)
        block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&cb.Metadata{
            Value: protoutil.MarshalOrPanic(&cb.LastConfig{Index: 0}),
        })
        block.Metadata.Metadata[cb.BlockMetadataIndex_SIGNATURES] = protoutil.MarshalOrPanic(&cb.Metadata{
            Value: protoutil.MarshalOrPanic(&cb.OrdererBlockMetadata{
                LastConfig: &cb.LastConfig{Index: 0},
            }),
        })
        return block
    }
        // OrdererBlockMetadata defines metadata that is set by the ordering service.
        type OrdererBlockMetadata struct {
            LastConfig           *LastConfig `protobuf:"bytes,1,opt,name=last_config,json=lastConfig,proto3" json:"last_config,omitempty"`
            ConsenterMetadata    []byte      `protobuf:"bytes,2,opt,name=consenter_metadata,json=consenterMetadata,proto3" json:"consenter_metadata,omitempty"`
        }
            type LastConfig struct {
                Index                uint64   `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
            }

应用通道 configTx 创建流程

// configtxgen -profile TwoOrgsChannel -outputCreateChannelTx ./channel-artifacts/mychannel.tx -channelID mychannel --bccsp=SW
// 类别 2
var baseProfile *genesisconfig.Profile // nil
if outputChannelCreateTx != "" {
    if err := doOutputChannelCreateTx(profileConfig, baseProfile, channelID, outputChannelCreateTx); err != nil {
        configtx, err = encoder.MakeChannelCreationTransaction(channelID, nil, conf) // conf 是 profileConfig
            template, err := DefaultConfigTemplate(conf)
            return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template)
                return protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch) // 初始为 HeaderType_CONFIG_UPDATE,func (s *SystemChannel) ProcessConfigUpdateMsg 里会重新包装消息类型
					payloadChannelHeader := MakeChannelHeader(txType, msgVersion, channelID, epoch)
		err = writeFile(outputChannelCreateTx, protoutil.MarshalOrPanic(configtx), 0640)

func NewChannelCreateConfigUpdate(channelID string, conf *genesisconfig.Profile, templateConfig *cb.ConfigGroup) (*cb.ConfigUpdate, error) {
	if conf.Application == nil {
		return nil, errors.New("cannot define a new channel with no Application section")
	}
	if conf.Consortium == "" {
		return nil, errors.New("cannot define a new channel with no Consortium value")
	}
	newChannelGroup, err := NewChannelGroup(conf)
	if err != nil {
		return nil, errors.Wrapf(err, "could not turn parse profile into channel group")
	}
	updt, err := update.Compute(&cb.Config{ChannelGroup: templateConfig}, &cb.Config{ChannelGroup: newChannelGroup})
	if err != nil {
		return nil, errors.Wrapf(err, "could not compute update")
	}

	// Add the consortium name to create the channel for into the write set as required.
	updt.ChannelId = channelID
	updt.ReadSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{Version: 0}
	updt.WriteSet.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
		Version: 0,
		Value: protoutil.MarshalOrPanic(&cb.Consortium{
			Name: conf.Consortium,
		}),
	}
	return updt, nil
}
    type ConfigValue struct {
        Version              uint64   `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
        Value                []byte   `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
        ModPolicy            string   `protobuf:"bytes,3,opt,name=mod_policy,json=modPolicy,proto3" json:"mod_policy,omitempty"`
    }

func Compute(original, updated *cb.Config) (*cb.ConfigUpdate, error) {
	readSet, writeSet, groupUpdated := computeGroupUpdate(original.ChannelGroup, updated.ChannelGroup)
	return &cb.ConfigUpdate{
		ReadSet:  readSet,
		WriteSet: writeSet,
	}, nil
}
func computeGroupUpdate(original, updated *cb.ConfigGroup) (readSet, writeSet *cb.ConfigGroup, updatedGroup bool) {
	return &cb.ConfigGroup{
		Version:  original.Version,
		Policies: readSetPolicies,
		Values:   readSetValues,
		Groups:   readSetGroups,
	}, &cb.ConfigGroup{
		Version:   original.Version + 1, // 这里 +1
		Policies:  writeSetPolicies,
		Values:    writeSetValues,
		Groups:    writeSetGroups,
		ModPolicy: updated.ModPolicy,
	}, true
}

更新 anchor peer

// 类别 3
if outputAnchorPeersUpdate != "" {
    if err := doOutputAnchorPeersUpdate(profileConfig, channelID, outputAnchorPeersUpdate, asOrg); err != nil {

应用通道创建成功后得到的配置块结构

{
	-"data": {
		-"data": [
			-{
				-"payload": {
					-"data": {
						-"config": {
							-"channel_group": {
								-"groups": {
									-"Application": {
										+"groups": { … },
										"mod_policy": "Admins",
										+"policies": { … },
										+"values": { … },
										"version": "1"
									},
									+"Orderer": { … }
								},
								"mod_policy": "Admins",
								+"policies": { … },
								+"values": { … },
								"version": "0"
							},
							"sequence": "1"
						},
						+"last_update": { … }
					},
					-"header": {
							-"channel_header": {
							"channel_id": "mychannel1",
							"epoch": "0",
							"extension": null,
							"timestamp": "2022-07-20T02:35:18Z",
							"tls_cert_hash": null,
							"tx_id": "",
							"type": 1,
							"version": 0
						},
						+"signature_header": { … }
					}
				},
				"signature": "MEUCIQC+YYLNGa1pAFHCm1njE6jVcMdSNyjRvfYuGvV8PPfY+QIgETpMBTVaUDjvdYqdkxf982C0/Q1YmTjVpf6Fk4chV9c="
			}
		]
	},
	+"header": { … },
	+"metadata": { … }
}

其它

阻塞是因为 select 循环在运行 case sn := <-c.snapC:,所以 case s := <-submitC 阻塞。

// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
	if err := c.isRunning(); err != nil {
		return err
	}

	select {
	case c.submitC <- nil:
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	}

	return nil
}

chain 的 Sequence 已更新,但还有属于旧 Sequence 的交易进来,此时会执行重新校验。

// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.