以太坊源码分析 启动流程
以太坊入口代码位于 cmd/geth/main.go,先看一下 main 函数:
func main() { if err := app.Run(os.Args); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } }
这里有个疑问,为什么没有看到 app 的 flag 和 command 配置呢?我们了解一下Go语言的执行流程:
可以看到,main() 并不是真正意义上的入口,在初始化完常量和变量以后,会先调用模块的 init() 函数,然后才是 main() 函数,所以初始化的工作是在 init() 函数里完成的:
func init() { // Initialize the CLI app and start Geth app.Action = geth app.HideVersion = true // we have a command to print the version app.Copyright = "Copyright 2013-2018 The go-ethereum Authors" app.Commands = []cli.Command{ // See chaincmd.go: initCommand, …… } sort.Sort(cli.CommandsByName(app.Commands)) app.Flags = append(app.Flags, nodeFlags...) …… app.Before = func(ctx *cli.Context) error { runtime.GOMAXPROCS(runtime.NumCPU()) if err := debug.Setup(ctx); err != nil { return err } // Start system runtime metrics collection go metrics.CollectProcessMetrics(3 * time.Second) utils.SetupNetwork(ctx) return nil } app.After = func(ctx *cli.Context) error { debug.Exit() console.Stdin.Close() // Resets terminal mode. return nil } }
可以看到:app.Action=geth,如果没有添加任何command参数的话,主入口是geth()函数。
flag的配置代码位于cmd/utils/flags.go,command的配置代码跟main.go在同一个包中,分散在不同的文件里。
在进入主入口之前,app.Before 做了 3 件事情:
- I. runtime.GOMAXPROCS():设置最大可用处理器数
- II. metrics.CollectProcessMetrics():创建一个goroutine,每3秒监测一次系统的 ram 和 disk 状态
- III. utils.SetupNetwork():配置gas limit值
下面开始看geth()函数:
func geth(ctx *cli.Context) error { node := makeFullNode(ctx) startNode(ctx, node) node.Wait() return nil }
可以看到,主要做了3件事情:
- I. 创建结点
- II. 启动结点
- III. 结点进入等待状态
1. 创建结点
func makeFullNode(ctx *cli.Context) *node.Node { stack, cfg := makeConfigNode(ctx) utils.RegisterEthService(stack, &cfg.Eth) …… return stack }
中间一堆代码默认情况下不会走进去,先忽略~
所以这个函数就干了2件事:创建结点、注册EthService。
1.1 创建结点
先看makeConfigNode()函数:
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { // Load defaults. cfg := gethConfig{ Eth: eth.DefaultConfig, Shh: whisper.DefaultConfig, Node: defaultNodeConfig(), Dashboard: dashboard.DefaultConfig, } // Load config file. if file := ctx.GlobalString(configFileFlag.Name); file != "" { if err := loadConfig(file, &cfg); err != nil { utils.Fatalf("%v", err) } } // Apply flags. utils.SetNodeConfig(ctx, &cfg.Node) stack, err := node.New(&cfg.Node) if err != nil { utils.Fatalf("Failed to create the protocol stack: %v", err) } utils.SetEthConfig(ctx, stack, &cfg.Eth) if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) { cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name) } utils.SetShhConfig(ctx, stack, &cfg.Shh) utils.SetDashboardConfig(ctx, &cfg.Dashboard) return stack, cfg }
主要是初始化和加载一些配置,其中最重要的一行是通过node.New()创建结点,变量名叫stack,代表协议栈的含义。
先看一下结点的默认配置,代码位于node/defaults.go:
var DefaultConfig = Config{ DataDir: DefaultDataDir(), HTTPPort: DefaultHTTPPort, HTTPModules: []string{"net", "web3"}, HTTPVirtualHosts: []string{"localhost"}, WSPort: DefaultWSPort, WSModules: []string{"net", "web3"}, P2P: p2p.Config{ ListenAddr: ":30303", MaxPeers: 25, NAT: nat.Any(), }, }
mac默认的datadir位于$HOME/Library/Ethereum,HTTP默认端口号8545,WebSocket默认端口号8546,P2P默认端口号30303,最大支持25个对等结点。
utils.SetNodeConfig()代码位于cmd/utils/flags.go,主要是检查有没有一些global的配置,如果有的话覆盖掉刚刚的默认配置。代码比较简单就不分析了。
接下来就是调用node.New()创建结点了,代码位于node/node.go:
func New(conf *Config) (*Node, error) { // Copy config and resolve the datadir so future changes to the current // working directory don't affect the node. confCopy := *conf conf = &confCopy if conf.DataDir != "" { absdatadir, err := filepath.Abs(conf.DataDir) if err != nil { return nil, err } conf.DataDir = absdatadir } …… // Ensure that the AccountManager method works before the node has started. // We rely on this in cmd/geth. am, ephemeralKeystore, err := makeAccountManager(conf) …… // Note: any interaction with Config that would create/touch files // in the data directory or instance directory is delayed until Start. return &Node{ accman: am, ephemeralKeystore: ephemeralKeystore, config: conf, serviceFuncs: []ServiceConstructor{}, ipcEndpoint: conf.IPCEndpoint(), httpEndpoint: conf.HTTPEndpoint(), wsEndpoint: conf.WSEndpoint(), eventmux: new(event.TypeMux), log: conf.Logger, }, nil }
可以看出,主要做了3件事:
I. 把datadir转成绝对路径
II. 调用makeAccountManager()初始化账号管理器
III. 创建一个Node实例并返回
关于账号管理系统后面会专门写一篇文章分析,这里就先略过了。
至此,我们获得了一个Node实例,makeConfigNode()函数就分析完了。
看一下Node结构中的一些重要成员:
I. accman:刚刚创建的账号管理器实例
II. config:创建该结点使用的配置
III. serviceFuncs:一个函数指针数组,保存所有注册Service的构造函数
那么Service是一个什么样的概念呢?先看一下构造函数的原型:
type ServiceConstructor func(ctx *ServiceContext) (Service, error)
出现了两个新类型:ServiceContext和Service。先看一下ServiceContext的定义:
// ServiceContext is a collection of service independent options inherited from // the protocol stack, that is passed to all constructors to be optionally used; // as well as utility methods to operate on the service environment. type ServiceContext struct { config *Config services map[reflect.Type]Service // Index of the already constructed services EventMux *event.TypeMux // Event multiplexer used for decoupled notifications AccountManager *accounts.Manager // Account manager created by the node. }
从注释可以看出,ServiceContext主要是存储了一些从结点(或者叫协议栈)那里继承过来的、和具体Service无关的一些信息,比如结点config、account manager等。其中有一个services字段保存了当前正在运行的所有Service,是一个map类型,key是Service的类型,value是Service实例。
接下来看一下Service的定义:
type Service interface { // Protocols retrieves the P2P protocols the service wishes to start. Protocols() []p2p.Protocol // APIs retrieves the list of RPC descriptors the service provides APIs() []rpc.API // Start is called after all services have been constructed and the networking // layer was also initialized to spawn any goroutines required by the service. Start(server *p2p.Server) error // Stop terminates all goroutines belonging to the service, blocking until they // are all terminated. Stop() error }
Service是一个接口,定义了4个需要实现的函数。换句话说,任何实现了这4个方法的类型,都可以称之为一个Service。
1.2 注册eth Service
接下来分析utils.RegisterEthService()方法:(cmd/utils/flags.go)
func RegisterEthService(stack *node.Node, cfg *eth.Config) { var err error if cfg.SyncMode == downloader.LightSync { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return les.New(ctx, cfg) }) } else { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { fullNode, err := eth.New(ctx, cfg) if fullNode != nil && cfg.LightServ > 0 { ls, _ := les.NewLesServer(fullNode, cfg) fullNode.AddLesServer(ls) } return fullNode, err }) } if err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } }
这里有2个分支,如果是配置成轻量级结点会进上面那个分支,如果是全结点会进else分支。然后调用Node的Register()方法注册Service:(node/node.go)
func (n *Node) Register(constructor ServiceConstructor) error { n.lock.Lock() defer n.lock.Unlock() if n.server != nil { return ErrNodeRunning } n.serviceFuncs = append(n.serviceFuncs, constructor) return nil }
看到了吧?所谓的注册,其实就是把Service的构造函数放进结点的serviceFuncs数组。
这里只是注册,具体要等到启动结点的时候才真正调用构造函数创建Service。
2. 启动结点
我们回到cmd/geth/main.go分析下一个函数startNode():
func startNode(ctx *cli.Context, stack *node.Node) { // Start up the node itself utils.StartNode(stack) // Subscribe and process account related events …… }
后面一部分订阅和处理账号event相关的代码先忽略,看一下utils.StartNode()函数,代码位于cmd/utils/cmd.go:
func StartNode(stack *node.Node) { if err := stack.Start(); err != nil { Fatalf("Error starting protocol stack: %v", err) } go func() { sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigc) <-sigc log.Info("Got interrupt, shutting down...") go stack.Stop() for i := 10; i > 0; i-- { <-sigc if i > 1 { log.Warn("Already shutting down, interrupt more to panic.", "times", i-1) } } debug.Exit() // ensure trace and CPU profile data is flushed. debug.LoudPanic("boom") }() }
后半段的goroutine主要是为了捕获中断信号以停止结点运行的,所以这里实际就是调用了Node的Start()函数。这个函数比较长,我们分成几段来看:
I. 创建P2P server
II. 创建Service
III. 启动P2P server
IV. 启动Service
V. 启动RPC server
2.1 创建P2P server
以太坊是一个去中心化的平台,所以首要任务是创建P2P server:
func (n *Node) Start() error { …… n.lock.Lock() defer n.lock.Unlock() // Short circuit if the node's already running if n.server != nil { return ErrNodeRunning } if err := n.openDataDir(); err != nil { return err } // Initialize the p2p server. This creates the node key and // discovery databases. n.serverConfig = n.config.P2P n.serverConfig.PrivateKey = n.config.NodeKey() n.serverConfig.Name = n.config.NodeName() n.serverConfig.Logger = n.log if n.serverConfig.StaticNodes == nil { n.serverConfig.StaticNodes = n.config.StaticNodes() } if n.serverConfig.TrustedNodes == nil { n.serverConfig.TrustedNodes = n.config.TrustedNodes() } if n.serverConfig.NodeDatabase == "" { n.serverConfig.NodeDatabase = n.config.NodeDB() } running := &p2p.Server{Config: n.serverConfig} n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name) …… }
代码首先做了一些检查工作:加锁、判断结点是否已经运行、检查datadir是否可以打开,然后初始化P2P server配置,最后用该配置创建了一个p2p.Server实例。
2.2 创建Service
// Otherwise copy and specialize the P2P configuration services := make(map[reflect.Type]Service) for _, constructor := range n.serviceFuncs { // Create a new context for the particular service ctx := &ServiceContext{ config: n.config, services: make(map[reflect.Type]Service), EventMux: n.eventmux, AccountManager: n.accman, } for kind, s := range services { // copy needed for threaded access ctx.services[kind] = s } // Construct and save the service service, err := constructor(ctx) if err != nil { return err } kind := reflect.TypeOf(service) if _, exists := services[kind]; exists { return &DuplicateServiceError{Kind: kind} } services[kind] = service }
首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。
2.3 启动P2P server
// Gather the protocols and start the freshly assembled P2P server for _, service := range services { running.Protocols = append(running.Protocols, service.Protocols()...) } if err := running.Start(); err != nil { return convertFileLockError(err) }
首先把所有Service支持的协议集合到一起,然后调用p2p.Server的Start()方法启动P2P server(代码位于p2p/server.go)。P2P server会绑定一个UDP端口和一个TCP端口,端口号是相同的(默认30303)。UDP端口主要用于结点发现,TCP端口主要用于业务数据传输,基于RLPx加密传输协议。所以具体来说,Start()方法做了以下几件事情:
I. 侦听UDP端口:用于结点发现
II. 发起UDP请求获取结点表:内部会启动goroutine来完成
III. 侦听TCP端口:用于业务数据传输,基于RLPx协议
IV. 发起TCP请求连接到其他结点:也是启动goroutine完成
相对应的代码如下所示:
func (srv *Server) Start() (err error) { …… srv.running = true …… // 侦听UDP端口(用于结点发现) if !srv.NoDiscovery || srv.DiscoveryV5 { addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr) if err != nil { return err } conn, err = net.ListenUDP("udp", addr) if err != nil { return err } realaddr = conn.LocalAddr().(*net.UDPAddr) if srv.NAT != nil { if !realaddr.IP.IsLoopback() { go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery") } // TODO: react to external IP changes over time. if ext, err := srv.NAT.ExternalIP(); err == nil { realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } } …… // 发起UDP请求获取结点表(内部会启动goroutine) if !srv.NoDiscovery { cfg := discover.Config{ PrivateKey: srv.PrivateKey, AnnounceAddr: realaddr, NodeDBPath: srv.NodeDatabase, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodes, Unhandled: unhandled, } ntab, err := discover.ListenUDP(conn, cfg) if err != nil { return err } srv.ntab = ntab } …… dynPeers := srv.maxDialedConns() dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) …… // 侦听TCP端口(用于业务数据传输,基于RLPx协议) if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } …… // 启动新线程发起TCP连接请求 go srv.run(dialer) …… }
2.4 启动Service
// Start each of the services started := []reflect.Type{} for kind, service := range services { // Start the next service, stopping all previous upon failure if err := service.Start(running); err != nil { for _, kind := range started { services[kind].Stop() } running.Stop() return err } // Mark the service started for potential cleanup started = append(started, kind) }
主要就是依次调用每个Service的Start()方法,然后把启动的Service的类型存储到started表中。另外如果启动过程中发现某个Service之前已经启动过了,则返回错误。
2.5 启动RPC server
// Lastly start the configured RPC interfaces if err := n.startRPC(services); err != nil { for _, service := range services { service.Stop() } running.Stop() return err }
RPC即远程调用接口,也就是Service对外暴露出来的API。具体调用方式可以分为以下几种:
I. InProc:进程内调用,严格来说这种不能算是RPC,不过出于架构上的统一,以太坊也为这种调用方式配置了一个handler
II. IPC:进程间调用,通过Unix Domain Socket(datadir/geth.ipc)
III. HTTP:通过HTTP协议调用
IV. WS:通过WebSocket调用
接下来看下startRPC()函数就比较清楚了,主要就是启动这几项RPC服务。每种RPC服务都需要提供一个handler,另外除了InProc之外,其他3种服务还需要启动一个server来监听外部连接请求。RPC具体实现细节留待后面的文章分析。代码如下:
func (n *Node) startRPC(services map[reflect.Type]Service) error { // Gather all the possible APIs to surface apis := n.apis() for _, service := range services { apis = append(apis, service.APIs()...) } // Start the various API endpoints, terminating all in case of errors if err := n.startInProc(apis); err != nil { return err } if err := n.startIPC(apis); err != nil { n.stopInProc() return err } if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil { n.stopIPC() n.stopInProc() return err } if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil { n.stopHTTP() n.stopIPC() n.stopInProc() return err } // All API endpoints started successfully n.rpcAPIs = apis return nil }
到这里结点启动的流程就分析完了。
3. 结点进入等待状态
其实就是让主线程进入阻塞状态,保持进程不退出,直到从channel中收到stop消息。
具体就是调用Node的Wait()函数:
func (n *Node) Wait() { n.lock.RLock() if n.server == nil { n.lock.RUnlock() return } stop := n.stop n.lock.RUnlock() <-stop }
至此,以太坊的启动代码就走读完了,总结一下:
- I. 以太坊启动主要做了3件事:创建结点、启动结点、结点进入等待状态
- II. 创建结点过程主要做了2件事:根据配置创建Node实例、注册eth Service
- III. 启动结点过程主要做了5件事:创建P2P server、创建Service、启动P2P server、启动Service、启动RPC server
- IV. 最后结点进入等待状态,等待退出
使用 Go 语言编写命令行程序经常会使用了urfave/cli这个库,比如以太坊软件 geth。在 C 语言中,我们需要根据 argc/argv 解析命令行参数,调用不同的函数,最后还要写一个 usag ...