NSQ官方简介
NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。
它促进了无单点故障的分布式和分散式拓扑,实现了容错和高可用性以及可靠的消息传递保证。请参阅功能和保证。
在操作上,NSQ易于配置和部署(所有参数都在命令行中指定,编译后的二进制文件没有运行时依赖项)。为了获得最大的灵活性,它与数据格式无关(消息可以是 JSON、MsgPack、Protocol Buffers 或其他任何形式)。官方 Go 和 Python 库是开箱即用的(以及许多其他客户端库),如果您有兴趣构建自己的库,可以使用 协议规范。
鉴权
其实官方文档中是给过一个鉴权的demo的,但是坑爹的是用py写的,原文档截图如下
其实nsq的鉴权逻辑很简单,就是在每个启动的nsqd节点配置一个认证服务,然后有访问了,它就会去这个认证服务请求一把,所以我们只要提供这样一个认证的API就好了
为了方便后期部署和扩展,我自己写了一个并且开源了 github.com/zhimiaox/nsq-auth
启用鉴权
nsq分为了nsqd、nsqlookupd、nsqadmin这三个服务,nsqd是最核心的服务,干活用的,nsqlookup是管理好多个nsqd的用于集群部署,nsqadmin就是个ui管理界面,因此在此处,我们直接只起一个nsqd就好了
我们直接使用官方的docker镜像(其实golang的二进制包也很方便,不过升级的话还得拷贝,还是docker镜像香)
注: It is expected when using authorization that only the nsqd TCP protocol is exposed to external clients, not the HTTP(S) endpoints. See the note below about exposing stats and lookup to clients with auth.
译:只有tcp端口支持鉴权,其它玩意不要暴露出来
创建一个docker-compose.yml
文件
version: "3"
services:
nsqd:
image: nsqio/nsq
depends_on:
- nsq-auth
ports:
- "4150:4150"
# volumes:
# - "./nsq/data:/data"
restart: always
command: /nsqd --auth-http-address="nsq-auth:1325"
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
command: --secret="jV22WdmaXxHWAiAh"
--secret="jV22WdmaXxHWAiAh"
表示启用root权限secret,这个用这个密钥,任何topic和channel都能订阅和推送,nsq-auth还支持用xxxx.csv文件定义权限,大致是这样的
version: "3"
services:
nsq-auth:
image: zhimiao/nsq-auth:latest
restart: always
volumes:
- "./example.csv:/example.csv"
command: --csv "/example.csv"
执行docker-compose up -d
愉快的把服务启动起来
接下来我们写一段测试代码
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"testing"
"time"
"github.com/nsqio/go-nsq"
)
const (
topicName = "t1"
channel1 = "c1"
channel2 = "c2"
address = "localhost:4150"
secret = "jV22WdmaXxHWAiAh"
)
type myMessageHandler struct{}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
fmt.Printf("%s\n", m.Body)
return nil
}
func TestPush(t *testing.T) {
config := nsq.NewConfig()
config.AuthSecret = secret
producer, err := nsq.NewProducer(address, config)
if err != nil {
t.Fatal(err)
}
now := time.Now()
for i := 0; i < 2000000; i++ {
messageBody := []byte(fmt.Sprintf("hello %d", i))
err = producer.Publish(topicName, messageBody)
if err != nil {
t.Fatal(err)
}
// time.Sleep(3 * time.Second)
}
t.Log(time.Now().Sub(now))
producer.Stop()
}
func TestSub1(t *testing.T) {
config := nsq.NewConfig()
config.AuthSecret = secret
consumer, err := nsq.NewConsumer(topicName, channel1, config)
if err != nil {
t.Fatal(err)
}
consumer.AddHandler(&myMessageHandler{})
err = consumer.ConnectToNSQD(address)
if err != nil {
t.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
consumer.Stop()
}
func TestSub2(t *testing.T) {
config := nsq.NewConfig()
config.AuthSecret = secret
consumer, err := nsq.NewConsumer(topicName, channel2, config)
if err != nil {
t.Fatal(err)
}
consumer.AddHandler(&myMessageHandler{})
err = consumer.ConnectToNSQD(address)
if err != nil {
t.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
consumer.Stop()
}
可以看到,调用是成功的,接下来我们去认证服务里面看一下认证接口调用
可以看到,三个服务启动,认证接口调用了三次,以上是鉴权成功的测试用例,接下来我们修改一下secret,看看是否能够正常拦截
可以看到,也是拦截成功了
Last modified on 2021-12-28