最新消息:欢迎访问小松个人博客,博客已经启用lua+redis防火墙功能,请不用用阿里云ISP机子访问博客

nsq nsqlookupd代码解析(二)

golang 27浏览 0评论

nsqlookupd.go Main里主要是侦听两个服务tcp 服务和http 服务

代码片段 github.com\nsqio\nsq\nsqlookupd\nsqlookupd.go

func (l *NSQLookupd) Main() {
    ctx := &Context{l}

    //监听tcp
    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
    if err != nil {
        l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.tcpListener = tcpListener
    l.Unlock()
    
    tcpServer := &tcpServer{ctx: ctx}
    //创建一个TCP的服务端,接受客户端连接(里面是一个死循环,也就是不断的去监听),    启动一个goroutine去执行tcpServer的Handle
    l.waitGroup.Wrap(func() {
        protocol.TCPServer(tcpListener, tcpServer, l.logf)
    })

     //监听tcp
    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
    if err != nil {
        l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.httpListener = httpListener
    l.Unlock()
    //设置路由
    httpServer := newHTTPServer(ctx)
    l.waitGroup.Wrap(func() {
        //创建一个http的服务端
        http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
    })
}

文件里主要是启动了两个线程,tcp服务线程和http服务线程

TCP里执行的Handle
代码片段 github.com\nsqio\nsq\nsqlookupd\tcp.go

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())

    //客户端在初始化自己的时候,需要发送4字节的数据用来标识它自己所有使用协议版本。将来升级协议的时候可以避免使用旧协议的客户端不能使用。
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
        return
    }
    //获取协议内容
    protocolMagic := string(buf)

    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {

    case "  V1":
        //当前只支持"  V1"协议(前两有两个空格,所以总共是4字节),协议在nsqlookupd\lookup_protocol_v1.go文件中定义,这里创建了LookupProtocolV1的实例
        //LookupProtocolV1实现了Protocol接口
        prot = &LookupProtocolV1{ctx: p.ctx}
    default:
        //如果不是"  V1"协议,则协议出错,断开链接,返回。
        protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }
    //如果是"  V1"协议,这里就进入了LookupProtocolV1的IOLoop方法。此方法里有for死循环运行,直到出现error时,才会执行下面的代码。   
    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

开始执行IOLoop
代码片段 D:\go\src\github.com\nsqio\nsq\nsqlookupd\lookup_protocol_v1.go

func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
    var err error
    var line string

    //这个方法文件github.com\nsqio\nsq\nsqlookupd\client_v1.go,这里使用到了go的方法的继承,go继承列子:http://studygolang.com/articles/5351 ,继承了net.Conn,这里有 Read(b []byte) (n int, err error)的接口,所以在下面bufio.NewReader使用的时候不会出错,NewReader 参数为io.Reader,其实也是Read(p []byte) (n int, err error)的一个接口
    client := NewClientV1(conn)
    reader := bufio.NewReader(client)

    for {
        //ReadString读取直到第一次遇到delim字节,这里的delim为\n(换行符)那就是,每次读取一行
        line, err = reader.ReadString('\n')
        if err != nil {
            break
        }
        //去掉前后两端的空格
        line = strings.TrimSpace(line)
        //以空格进行分割,返回的是切片类似于php的explode函数
        params := strings.Split(line, " ")

        var response []byte
        //执行LookupProtocolV1的Exec的方法,这个方法主要是根据每行数据的第一个参数,调用不同的方法
        response, err = p.Exec(client, reader, params)
        //执行Exec有错误
        if err != nil {
            ctx := ""
            if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
                ctx = " - " + parentErr.Error()
            }
            p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

            _, sendErr := protocol.SendResponse(client, []byte(err.Error()))
            if sendErr != nil {
                p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
                break
            }

            // errors of type FatalClientErr should forceably close the connection
            if _, ok := err.(*protocol.FatalClientErr); ok {
                break
            }
            continue
        }
        //如果Exec返回有数据就执行SendResponse发送到客户端
        if response != nil {
            _, err = protocol.SendResponse(client, response)
            if err != nil {
                break
            }
        }
    }

    conn.Close()
    p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client)
    if client.peerInfo != nil {
        registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
        for _, r := range registrations {
            if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
                p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
                    client, r.Category, r.Key, r.SubKey)
            }
        }
    }
    return err
}
QQ交流群:136351212(满) 455721967

如无特别说明,本站文章皆为原创,若要转载,务必请注明以下原文信息:
转载保留版权:小松博客» nsq nsqlookupd代码解析(二)
本文链接地址:https://www.phpsong.com/3306.html

发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
木有头像就木有JJ!点这里按步骤申请Gravatar头像吧!