区块链教程Fabric1.0源代码分析Orderer BroadcastServer
兄弟连区块链教程Fabric1.0源代码分析Orderer BroadcastServer,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。
Fabric 1.0源代码笔记 之 Orderer #BroadcastServer(Broadcast服务端)
1、BroadcastServer概述
BroadcastServer相关代码在protos/orderer、orderer目录下。
protos/orderer/ab.pb.go,AtomicBroadcastServer接口定义。
orderer/server.go,go,AtomicBroadcastServer接口实现。
有个图
2、AtomicBroadcastServer接口定义
2.1、AtomicBroadcastServer接口定义
type AtomicBroadcastServer interface { Broadcast(AtomicBroadcast_BroadcastServer) error Deliver(AtomicBroadcast_DeliverServer) error } //代码在protos/orderer/ab.pb.go ··· ### 2.2、gRPC相关实现
var _AtomicBroadcast_serviceDesc = grpc.ServiceDesc{
ServiceName: "orderer.AtomicBroadcast", HandlerType: (*AtomicBroadcastServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "Broadcast", Handler: _AtomicBroadcast_Broadcast_Handler, ServerStreams: true, ClientStreams: true, }, { StreamName: "Deliver", Handler: _AtomicBroadcast_Deliver_Handler, ServerStreams: true, ClientStreams: true, }, }, Metadata: "orderer/ab.proto",
}
func RegisterAtomicBroadcastServer(s *grpc.Server, srv AtomicBroadcastServer) {
s.RegisterService(&_AtomicBroadcast_serviceDesc, srv)
}
func _AtomicBroadcast_Broadcast_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AtomicBroadcastServer).Broadcast(&atomicBroadcastBroadcastServer{stream})
}
func _AtomicBroadcast_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AtomicBroadcastServer).Deliver(&atomicBroadcastDeliverServer{stream})
}
//代码在protos/orderer/ab.pb.go
## 3、AtomicBroadcastServer接口实现 ### 3.1、server结构体 server结构体:
type server struct {
bh broadcast.Handler dh deliver.Handler
}
type broadcastSupport struct {
multichain.Manager broadcast.ConfigUpdateProcessor
}
//代码在orderer/server.go
broadcast.Handler:
type Handler interface {
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}
type handlerImpl struct {
sm SupportManager
}
func NewHandlerImpl(sm SupportManager) Handler {
return &handlerImpl{ sm: sm, }
}
type SupportManager interface {
ConfigUpdateProcessor GetChain(chainID string) (Support, bool)
}
type ConfigUpdateProcessor interface { //处理通道配置更新
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
}
//代码在orderer/common/broadcast/broadcast.go
deliver.Handler:
type Handler interface {
Handle(srv ab.AtomicBroadcast_DeliverServer) error
}
type deliverServer struct {
sm SupportManager
}
type SupportManager interface {
GetChain(chainID string) (Support, bool)
}
//代码在orderer/common/deliver/deliver.go
### 3.2、server结构体相关方法
//构建server结构体
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer
//s.bh.Handle(srv)
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error
//s.dh.Handle(srv)
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error
//代码在orderer/server.go
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer代码如下:
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer {
s := &server{ dh: deliver.NewHandlerImpl(deliverSupport{Manager: ml}), bh: broadcast.NewHandlerImpl(broadcastSupport{ Manager: ml, ConfigUpdateProcessor: configupdate.New(ml.SystemChannelID(), configUpdateSupport{Manager: ml}, signer), }), } return s
}
//代码在orderer/server.go
### 3.3、Broadcast服务端Broadcast处理流程 Broadcast服务端Broadcast处理流程,即broadcast.handlerImpl.Handle方法。 #### 3.3.1、接收Envelope消息,并获取Payload和ChannelHeader
msg, err := srv.Recv() //接收Envelope消息
payload, err := utils.UnmarshalPayload(msg.Payload) //反序列化获取Payload
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) //反序列化获取ChannelHeader
//代码在orderer/common/broadcast/broadcast.go
#### 3.3.2、如果消息类型为channel配置或更新,则使用multichain.Manager处理消息
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { //如果是channel配置或更新
msg, err = bh.sm.Process(msg) //configupdate.Processor.Process方法
}
//代码在orderer/common/broadcast/broadcast.go
msg, err = bh.sm.Process(msg)代码如下:
func (p Processor) Process(envConfigUpdate cb.Envelope) (*cb.Envelope, error) {
channelID, err := channelID(envConfigUpdate) //获取ChannelHeader.ChannelId //multichain.Manager.GetChain方法,获取chainSupport,以及chain是否存在 support, ok := p.manager.GetChain(channelID) if ok { //已存在的channel配置,调取multichain.Manager.ProposeConfigUpdate方法 return p.existingChannelConfig(envConfigUpdate, channelID, support) } //新channel配置,调取multichain.Manager.NewChannelConfig方法 return p.newChannelConfig(channelID, envConfigUpdate)
}
//代码在orderer/configupdate/configupdate.go
#### 3.3.3、其他消息类型或channel消息处理后,接受消息并加入排序
support, ok := bh.sm.GetChain(chdr.ChannelId) //获取chainSupport
_, filterErr := support.Filters().Apply(msg) //filter.RuleSet.Apply方法
//调取Chain.Enqueue方法,接受消息,加入排序
support.Enqueue(msg)
//代码在orderer/common/broadcast/broadcast.go
#### 3.3.4、向客户端发送响应信息
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
//代码在orderer/common/broadcast/broadcast.go
### 3.4、Broadcast服务端Deliver处理流程 Broadcast服务端Deliver处理流程,即deliver.deliverServer.Handle方法。
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
for { //接收客户端查询请求 envelope, err := srv.Recv() payload, err := utils.UnmarshalPayload(envelope.Payload) chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) chain, ok := ds.sm.GetChain(chdr.ChannelId) erroredChan := chain.Errored() select { case <-erroredChan: return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE) default: } lastConfigSequence := chain.Sequence() sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) seekInfo := &ab.SeekInfo{} err = proto.Unmarshal(payload.Data, seekInfo) cursor, number := chain.Reader().Iterator(seekInfo.Start) var stopNum uint64 switch stop := seekInfo.Stop.Type.(type) { case *ab.SeekPosition_Oldest: stopNum = number case *ab.SeekPosition_Newest: stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number if stopNum < number { return sendStatusReply(srv, cb.Status_BAD_REQUEST) } } for { if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY { select { case <-erroredChan: return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE) case <-cursor.ReadyChan(): } } else { select { case <-cursor.ReadyChan(): default: return sendStatusReply(srv, cb.Status_NOT_FOUND) } } currentConfigSequence := chain.Sequence() if currentConfigSequence > lastConfigSequence { lastConfigSequence = currentConfigSequence sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) } block, status := cursor.Next() err := sendBlockReply(srv, block) if stopNum == block.Header.Number { break } } err := sendStatusReply(srv, cb.Status_SUCCESS) }
}
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
C# Lambda表达式详解,及Lambda表达式树的创建
转载自https://www.cnblogs.com/yunfeifei/p/3844814.html雲霏霏博客 Lambda表达式 "Lambda表达式"是一个匿名函数,是一种高效的类似于函数式编程的表达式,Lambda简化了开发中需要编写的代码量。它可以包含表达式和语句,并且可用于创建委托或表达式目录树类型,支持带有可绑定到委托或表达式树的输入参数的内联表达式。所有Lambda表达式都使用Lambda运算符=>,该运算符读作"goes to"。Lambda运算符的左边是输入参数(如果有),右边是表达式或语句块。Lambda表达式x => x * x读作"x goes to x times x"。可以将此表达式分配给委托类型,如下所示: delegate int del(int i);static void Main(string[] args){ del myDelegate = x => x * x; int j = myDelegate(5); //j = 25 }若要创建表达式目录树类型(后面会详细说明): 复制代码using System.Linq.Exp...
- 下一篇
Java容器深入浅出之HashSet、TreeSet和EnumSet
Java集合中的Set接口,定义的是一类无顺序的、不可重复的对象集合。如果尝试添加相同的元素,add()方法会返回false,同时添加失败。Set接口包括3个主要的实现类:HashSet、TreeSet和EnumSet。 通过查看Java源码,事实上Java是先实现了Map,然后通过包装一个所有value都为null的集合,形成Set。 HashSet HashSet基于Hash算法实现,因此存取和查找的性能较好。HashSet的主要特点如下: 1. 无顺序的。与添加顺序不同,并且可变。 2. 线程不安全。 3. 集合元素可以是null 4. HashSet是通过元素的HashCode返回值,来确定元素存储位置。 5. 不可重复。HashSet判断元素是否重复的标准是:该元素对象的HashCode()返回值相等,并且equals()方法相等。换句话说,如果两个元素的equals方法相同,但HashCode返回值不相同,HashSet依然可以添加成功。因此,需要注意: 5.1 用Set类保存的元素,尽量保证其equals相等的同时,HashCode返回的值也相等。 5.2 当保存引用类型...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS6,CentOS7官方镜像安装Oracle11G
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Red5直播服务器,属于Java语言的直播服务器
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7