diff --git a/cmd/api/biz/middleware/es/init.go b/cmd/api/biz/middleware/es/init.go new file mode 100644 index 00000000..2724bfc6 --- /dev/null +++ b/cmd/api/biz/middleware/es/init.go @@ -0,0 +1,39 @@ +package es + +import ( + "fmt" + "log" + + "github.com/ozline/tiktok/config" + "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" + "github.com/sirupsen/logrus" + + elasticsearch "github.com/elastic/go-elasticsearch" +) + +var ( + EsClient *elasticsearch.Client +) + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.APIServiceName) + if err != nil { + panic(err) + } + + return hook +} + +// InitEs 初始化es +func Init() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + log.Panic(err) + } + EsClient = client +} diff --git a/cmd/api/main.go b/cmd/api/main.go index 526139c6..e2c6d587 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -14,6 +14,7 @@ import ( hertzUtils "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/protocol/consts" "github.com/cloudwego/kitex/pkg/klog" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" "github.com/ozline/tiktok/cmd/api/biz/rpc" "github.com/ozline/tiktok/config" "github.com/ozline/tiktok/pkg/constants" @@ -22,6 +23,8 @@ import ( "github.com/ozline/tiktok/pkg/utils" hertzSentinel "github.com/hertz-contrib/opensergo/sentinel/adapter" + + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" ) var ( @@ -37,6 +40,12 @@ func Init() { rpc.Init() tracer.InitJaeger(constants.APIServiceName) + + es.Init() + + // set log + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) } func main() { diff --git a/cmd/chat/dal/cache/init.go b/cmd/chat/dal/cache/init.go index 5b15f01c..d3693ce5 100644 --- a/cmd/chat/dal/cache/init.go +++ b/cmd/chat/dal/cache/init.go @@ -5,14 +5,12 @@ import ( "time" "github.com/ozline/tiktok/config" - "github.com/panjf2000/ants/v2" redis "github.com/redis/go-redis/v9" "gorm.io/gorm" ) var ( - RedisDB *redis.Client - AntsPool *ants.PoolWithFunc + RedisDB *redis.Client ) type Message struct { diff --git a/cmd/chat/dal/init.go b/cmd/chat/dal/init.go index 2e686d8f..49a01062 100644 --- a/cmd/chat/dal/init.go +++ b/cmd/chat/dal/init.go @@ -4,7 +4,6 @@ import ( "github.com/ozline/tiktok/cmd/chat/dal/cache" "github.com/ozline/tiktok/cmd/chat/dal/db" "github.com/ozline/tiktok/cmd/chat/dal/mq" - "github.com/ozline/tiktok/pkg/ants" ) func Init() { @@ -13,5 +12,4 @@ func Init() { mq.InitRabbitMQ() mq.InitMessageMQ() mq.InitChatMQ() - ants.Init() } diff --git a/cmd/chat/main.go b/cmd/chat/main.go index adbaa2d9..4a9127ad 100644 --- a/cmd/chat/main.go +++ b/cmd/chat/main.go @@ -2,27 +2,33 @@ package main import ( "flag" + "fmt" "net" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/limit" "github.com/cloudwego/kitex/pkg/rpcinfo" "github.com/cloudwego/kitex/server" + "github.com/elastic/go-elasticsearch" + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" etcd "github.com/kitex-contrib/registry-etcd" + trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" + "github.com/ozline/tiktok/cmd/chat/dal" "github.com/ozline/tiktok/config" chat "github.com/ozline/tiktok/kitex_gen/chat/messageservice" - - "github.com/ozline/tiktok/cmd/chat/dal" "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" "github.com/ozline/tiktok/pkg/tracer" "github.com/ozline/tiktok/pkg/utils" - - trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/sirupsen/logrus" ) var ( path *string listenAddr string // listen port + + EsClient *elasticsearch.Client ) func Init() { @@ -33,6 +39,32 @@ func Init() { dal.Init() tracer.InitJaeger(constants.ChatServiceName) + + EsInit() + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) +} + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.ChatServiceName) + if err != nil { + panic(err) + } + + return hook +} + +// InitEs 初始化es +func EsInit() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + panic(err) + } + EsClient = client } func main() { diff --git a/cmd/follow/main.go b/cmd/follow/main.go index c58dc043..bbdb92bd 100644 --- a/cmd/follow/main.go +++ b/cmd/follow/main.go @@ -2,26 +2,34 @@ package main import ( "flag" + "fmt" "net" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/limit" "github.com/cloudwego/kitex/pkg/rpcinfo" "github.com/cloudwego/kitex/server" + "github.com/elastic/go-elasticsearch" + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" etcd "github.com/kitex-contrib/registry-etcd" trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" "github.com/ozline/tiktok/cmd/follow/dal" "github.com/ozline/tiktok/cmd/follow/rpc" "github.com/ozline/tiktok/config" follow "github.com/ozline/tiktok/kitex_gen/follow/followservice" "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" "github.com/ozline/tiktok/pkg/tracer" "github.com/ozline/tiktok/pkg/utils" + "github.com/sirupsen/logrus" ) var ( path *string listenAddr string // listen port + + EsClient *elasticsearch.Client ) func Init() { @@ -33,6 +41,32 @@ func Init() { dal.Init() rpc.Init() tracer.InitJaeger(constants.FollowServiceName) + + EsInit() + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) +} + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.FollowServiceName) + if err != nil { + panic(err) + } + + return hook +} + +// InitEs 初始化es +func EsInit() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + panic(err) + } + EsClient = client } func main() { diff --git a/cmd/interaction/main.go b/cmd/interaction/main.go index d73aefbd..16e76c11 100644 --- a/cmd/interaction/main.go +++ b/cmd/interaction/main.go @@ -2,26 +2,34 @@ package main import ( "flag" + "fmt" "net" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/limit" "github.com/cloudwego/kitex/pkg/rpcinfo" "github.com/cloudwego/kitex/server" + "github.com/elastic/go-elasticsearch" + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" etcd "github.com/kitex-contrib/registry-etcd" trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" "github.com/ozline/tiktok/cmd/interaction/dal" "github.com/ozline/tiktok/cmd/interaction/rpc" "github.com/ozline/tiktok/config" interaction "github.com/ozline/tiktok/kitex_gen/interaction/interactionservice" "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" "github.com/ozline/tiktok/pkg/tracer" "github.com/ozline/tiktok/pkg/utils" + "github.com/sirupsen/logrus" ) var ( path *string listenAddr string // listen port + + EsClient *elasticsearch.Client ) func Init() { @@ -33,6 +41,32 @@ func Init() { rpc.Init() dal.Init() tracer.InitJaeger(constants.InteractionServiceName) + + EsInit() + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) +} + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.InteractionServiceName) + if err != nil { + panic(err) + } + + return hook +} + +// InitEs 初始化es +func EsInit() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + panic(err) + } + EsClient = client } func main() { diff --git a/cmd/user/main.go b/cmd/user/main.go index 3adff2c5..b25c9fdb 100644 --- a/cmd/user/main.go +++ b/cmd/user/main.go @@ -2,26 +2,33 @@ package main import ( "flag" + "fmt" "net" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/limit" "github.com/cloudwego/kitex/pkg/rpcinfo" "github.com/cloudwego/kitex/server" + "github.com/elastic/go-elasticsearch" + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" etcd "github.com/kitex-contrib/registry-etcd" + trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" "github.com/ozline/tiktok/cmd/user/dal" "github.com/ozline/tiktok/config" user "github.com/ozline/tiktok/kitex_gen/user/userservice" "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" "github.com/ozline/tiktok/pkg/tracer" "github.com/ozline/tiktok/pkg/utils" - - trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/sirupsen/logrus" ) var ( path *string listenAddr string // listen port + + EsClient *elasticsearch.Client ) func Init() { @@ -33,6 +40,32 @@ func Init() { // others dal.Init() tracer.InitJaeger(constants.UserServiceName) + + EsInit() + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) +} + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.FollowServiceName) + if err != nil { + panic(err) + } + + return hook +} + +// InitEs 初始化es +func EsInit() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + panic(err) + } + EsClient = client } func main() { diff --git a/cmd/video/main.go b/cmd/video/main.go index 8c9bfe6c..b1c9e156 100644 --- a/cmd/video/main.go +++ b/cmd/video/main.go @@ -2,25 +2,34 @@ package main import ( "flag" + "fmt" "net" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/limit" "github.com/cloudwego/kitex/pkg/rpcinfo" "github.com/cloudwego/kitex/server" + "github.com/elastic/go-elasticsearch" + kitexlogrus "github.com/kitex-contrib/obs-opentelemetry/logging/logrus" etcd "github.com/kitex-contrib/registry-etcd" + trace "github.com/kitex-contrib/tracer-opentracing" + "github.com/ozline/tiktok/cmd/api/biz/middleware/es" "github.com/ozline/tiktok/cmd/video/dal" "github.com/ozline/tiktok/cmd/video/rpc" "github.com/ozline/tiktok/config" video "github.com/ozline/tiktok/kitex_gen/video/videoservice" "github.com/ozline/tiktok/pkg/constants" + "github.com/ozline/tiktok/pkg/eslogrus" "github.com/ozline/tiktok/pkg/tracer" "github.com/ozline/tiktok/pkg/utils" + "github.com/sirupsen/logrus" ) var ( path *string listenAddr string // listen port + + EsClient *elasticsearch.Client ) func Init() { @@ -31,7 +40,34 @@ func Init() { dal.Init() tracer.InitJaeger(constants.VideoServiceName) rpc.Init() + + EsInit() + klog.SetLevel(klog.LevelDebug) + klog.SetLogger(kitexlogrus.NewLogger(kitexlogrus.WithHook(es.EsHookLog()))) +} + +func EsHookLog() *eslogrus.ElasticHook { + hook, err := eslogrus.NewElasticHook(EsClient, config.Elasticsearch.Host, logrus.DebugLevel, constants.FollowServiceName) + if err != nil { + panic(err) + } + + return hook } + +// InitEs 初始化es +func EsInit() { + esConn := fmt.Sprintf("http://%s", config.Elasticsearch.Addr) + cfg := elasticsearch.Config{ + Addresses: []string{esConn}, + } + client, err := elasticsearch.NewClient(cfg) + if err != nil { + panic(err) + } + EsClient = client +} + func main() { Init() r, err := etcd.NewEtcdRegistry([]string{config.Etcd.Addr}) @@ -66,6 +102,7 @@ func main() { server.WithMuxTransport(), server.WithServiceAddr(addr), server.WithRegistry(r), + server.WithSuite(trace.NewDefaultServerSuite()), server.WithLimit(&limit.Option{ MaxConnections: constants.MaxConnections, MaxQPS: constants.MaxQPS, diff --git a/config/config.go b/config/config.go index 16fce8a3..9e31b9bd 100644 --- a/config/config.go +++ b/config/config.go @@ -8,14 +8,15 @@ import ( ) var ( - Server *server - Mysql *mySQL - Snowflake *snowflake - Service *service - Etcd *etcd - RabbitMQ *rabbitMQ - Redis *redis - OSS *oss + Server *server + Mysql *mySQL + Snowflake *snowflake + Service *service + Etcd *etcd + RabbitMQ *rabbitMQ + Redis *redis + OSS *oss + Elasticsearch *elasticsearch ) func Init(path string, service string) { @@ -54,6 +55,7 @@ func configMapping(srv string) { RabbitMQ = &c.RabbitMQ Redis = &c.Redis OSS = &c.OSS + Elasticsearch = &c.Elasticsearch Service = GetService(srv) } diff --git a/config/config_exmple.yaml b/config/config_exmple.yaml index b56f2208..b9aead5a 100644 --- a/config/config_exmple.yaml +++ b/config/config_exmple.yaml @@ -25,6 +25,10 @@ oss: bucketname: main-directory: tiktok +elasticsearch: + addr: 127.0.0.1:9200 + host: localhost + etcd: addr: 127.0.0.1:2379 diff --git a/config/types.go b/config/types.go index 36d7186e..6961a9be 100644 --- a/config/types.go +++ b/config/types.go @@ -29,16 +29,6 @@ type etcd struct { Addr string } -type config struct { - Server server - Snowflake snowflake - MySQL mySQL - Etcd etcd - RabbitMQ rabbitMQ - Redis redis - OSS oss -} - type rabbitMQ struct { Addr string Username string @@ -57,3 +47,19 @@ type oss struct { BucketName string MainDirectory string `mapstructure:"main-directory"` } + +type elasticsearch struct { + Addr string + Host string +} + +type config struct { + Server server + Snowflake snowflake + MySQL mySQL + Etcd etcd + RabbitMQ rabbitMQ + Redis redis + OSS oss + Elasticsearch elasticsearch +} diff --git a/deploy/config/config.yaml b/deploy/config/config.yaml index 57900d76..8066c857 100644 --- a/deploy/config/config.yaml +++ b/deploy/config/config.yaml @@ -21,6 +21,10 @@ oss: bucketname: main-directory: tiktok +elasticsearch: + addr: 127.0.0.1:9200 + host: localhost + etcd: addr: 127.0.0.1:2379 diff --git a/docker-compose.yml b/docker-compose.yml index f8cb874c..8e749c9b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,6 +73,51 @@ services: networks: - tiktok + elasticsearch: + image: elasticsearch:8.4.2 + container_name: elasticsearch + environment: + bootstrap.memory_lock: "true" + ES_JAVA_OPTS: "-Xms512m -Xmx512m" + discovery.type: single-node + ingest.geoip.downloader.enabled: "false" + TZ: Asia/Shanghai + xpack.security.enabled: "false" + healthcheck: + test: ["CMD-SHELL", "curl -sf http://elasticsearch:9200/_cluster/health || exit 1"] # ⼼跳检测,成功之后不再执⾏后⾯的退出 + interval: 60s # ⼼跳检测间隔周期 + timeout: 10s + retries: 3 + start_period: 60s # ⾸次检测延迟时间 + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - /usr/local/elasticsearch/data:/usr/local/elasticsearch/data + - /usr/local/elasticsearch/config/es/config:/usr/local/elasticsearch/config + ports: + - "9200:9200" + restart: always + networks: + - tiktok + + kibana: + image: kibana:8.4.2 + container_name: kibana + environment: + - I18N_LOCALE=zh-CN + - XPACK_GRAPH_ENABLED=true + - TIMELION_ENABLED=true + - XPACK_MONITORING_COLLECTION_ENABLED="true" + - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 + depends_on: + - elasticsearch + ports: + - "5601:5601" + networks: + - tiktok + jaeger: container_name: jaeger image: "jaegertracing/all-in-one:latest" diff --git a/go.mod b/go.mod index 1ad47076..5b8e6999 100644 --- a/go.mod +++ b/go.mod @@ -11,15 +11,16 @@ require ( github.com/cloudwego/fastpb v0.0.4 github.com/cloudwego/hertz v0.6.7 github.com/cloudwego/kitex v0.6.2 + github.com/elastic/go-elasticsearch v0.0.0 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/hertz-contrib/opensergo v0.0.1 + github.com/kitex-contrib/obs-opentelemetry/logging/logrus v0.0.0-20230819133448-76093321aa8e github.com/kitex-contrib/registry-etcd v0.1.0 github.com/kitex-contrib/tracer-opentracing v0.0.3 github.com/opentracing/opentracing-go v1.2.0 - github.com/panjf2000/ants v1.3.0 - github.com/panjf2000/ants/v2 v2.8.1 github.com/rabbitmq/amqp091-go v1.8.1 github.com/redis/go-redis/v9 v9.0.5 + github.com/sirupsen/logrus v1.9.2 github.com/spf13/viper v1.16.0 github.com/tinylib/msgp v1.1.8 github.com/uber/jaeger-client-go v2.30.0+incompatible @@ -29,6 +30,11 @@ require ( gorm.io/hints v1.1.2 ) +require ( + go.opentelemetry.io/otel v1.16.0 // indirect + go.opentelemetry.io/otel/trace v1.16.0 // indirect +) + require ( bou.ke/monkey v1.0.2 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect diff --git a/go.sum b/go.sum index c50cce1d..efc96b7f 100644 --- a/go.sum +++ b/go.sum @@ -215,6 +215,8 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -256,6 +258,8 @@ github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpx github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= @@ -442,6 +446,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kitex-contrib/obs-opentelemetry/logging/logrus v0.0.0-20230819133448-76093321aa8e h1:OHusXtJsfrKodGSC/w33PRntQBiltCKuAtAvr/+SNg4= +github.com/kitex-contrib/obs-opentelemetry/logging/logrus v0.0.0-20230819133448-76093321aa8e/go.mod h1:Kf0zvMUYs1/xlqrqthhJCw4RVyWZoAsfeAWUDN6en6U= github.com/kitex-contrib/registry-etcd v0.1.0 h1:D+bg70S9+syTx2+m3I71vO8fhbKT/MO9GeKLzqXSkuc= github.com/kitex-contrib/registry-etcd v0.1.0/go.mod h1:+/RTa3y+zSX4+HIlrowJYt2w4FQiQbQsrKTVsQ4ueIs= github.com/kitex-contrib/tracer-opentracing v0.0.2/go.mod h1:mprt5pxqywFQxlHb7ugfiMdKbABTLI9YrBYs9WmlK5Q= @@ -532,10 +538,6 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= -github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= -github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= -github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= -github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -615,8 +617,9 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= +github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= @@ -745,16 +748,20 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0 h1:Wx7nFnvCaissIUZxPkBqDz2963Z+Cl+PkYbDKzTxDqQ= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.25.0/go.mod h1:E5NNboN0UqSAki0Atn9kVwaN7I+l25gGxDqBueo/74E= -go.opentelemetry.io/otel v1.0.1 h1:4XKyXmfqJLOQ7feyV5DB6gsBFZ0ltB8vLtp6pj4JIcc= go.opentelemetry.io/otel v1.0.1/go.mod h1:OPEOD4jIT2SlZPMmwT6FqZz2C0ZNdQqiWcoK6M0SNFU= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1 h1:ofMbch7i29qIUf7VtF+r0HRF6ac0SBaPSziSsKp7wkk= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.1/go.mod h1:Kv8liBeVNFkkkbilbgWRpV+wWuu+H5xdOT6HAgd30iw= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1 h1:CFMFNoz+CGprjFAFy+RJFrfEe4GBia3RRm2a4fREvCA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.0.1/go.mod h1:xOvWoTOrQjxjW61xtOmD/WKGRYb/P4NzRo3bs65U6Rk= -go.opentelemetry.io/otel/sdk v1.0.1 h1:wXxFEWGo7XfXupPwVJvTBOaPBC9FEg0wB8hMNrKk+cA= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/sdk v1.0.1/go.mod h1:HrdXne+BiwsOHYYkBE5ysIcv2bvdZstxzmCQhxTcZkI= -go.opentelemetry.io/otel/trace v1.0.1 h1:StTeIH6Q3G4r0Fiw34LTokUFESZgIDUr0qIJ7mKmAfw= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= go.opentelemetry.io/otel/trace v1.0.1/go.mod h1:5g4i4fKLaX2BQpSBsxw8YYcgKpMMSW3x7ZTuYBr3sUk= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.9.0 h1:C0g6TWmQYvjKRnljRULLWUVJGy8Uvu0NEL/5frY2/t4= go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSsOPU9brfSHJg= @@ -996,6 +1003,7 @@ golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220817070843-5a390386f1f2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/ants/ants.go b/pkg/ants/ants.go deleted file mode 100644 index 9964e262..00000000 --- a/pkg/ants/ants.go +++ /dev/null @@ -1,23 +0,0 @@ -package ants - -import ( - "errors" - "sync" - - "github.com/panjf2000/ants" -) - -var ( - AntsPool *ants.PoolWithFunc - Wg sync.WaitGroup -) - -func Init() { - ants_Pool, err := ants.NewPoolWithFunc(500, func(payload interface{}) { - defer Wg.Done() - }) - if err != nil { - panic(errors.New("[ants goroutine init error]")) - } - AntsPool = ants_Pool -} diff --git a/pkg/eslogrus/eslogrus.go b/pkg/eslogrus/eslogrus.go new file mode 100644 index 00000000..5dc039db --- /dev/null +++ b/pkg/eslogrus/eslogrus.go @@ -0,0 +1,136 @@ +package eslogrus + +import ( + "bytes" + "context" + "encoding/json" + "log" + "strings" + "time" + + elastic "github.com/elastic/go-elasticsearch" + "github.com/elastic/go-elasticsearch/esapi" + "github.com/sirupsen/logrus" +) + +// ElasticHook is a logrus +// hook for ElasticSearch +type ElasticHook struct { + client *elastic.Client // es的客户端 + host string // es 的 host + index IndexNameFunc // 获取索引的名字 + levels []logrus.Level // 日志的级别 info,error + ctx context.Context // 上下文 + ctxCancel context.CancelFunc // 上下文cancel的函数, + fireFunc fireFunc // 需要实现这个 +} + +// 发送到es的信息结构 +type message struct { + Host string + Timestamp string `json:"@timestamp"` + Message string + Data logrus.Fields + Level string +} + +// IndexNameFunc get index name +type IndexNameFunc func() string + +type fireFunc func(entry *logrus.Entry, hook *ElasticHook) error + +// NewElasticHook 新建一个es hook对象 +func NewElasticHook(client *elastic.Client, host string, level logrus.Level, index string) (*ElasticHook, error) { + return NewElasticHookWithFunc(client, host, level, func() string { return index }) +} + +func NewElasticHookWithFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc) (*ElasticHook, error) { + return newHookFuncAndFireFunc(client, host, level, indexFunc, syncFireFunc) +} + +// 新建一个hook +func newHookFuncAndFireFunc(client *elastic.Client, host string, level logrus.Level, indexFunc IndexNameFunc, fireFunc fireFunc) (*ElasticHook, error) { + var levels []logrus.Level + for _, l := range []logrus.Level{ + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + logrus.InfoLevel, + logrus.DebugLevel, + } { + if l <= level { + levels = append(levels, l) + } + } + + ctx, cancel := context.WithCancel(context.TODO()) + + return &ElasticHook{ + client: client, + host: host, + index: indexFunc, + levels: levels, + ctx: ctx, + ctxCancel: cancel, + fireFunc: fireFunc, + }, nil +} + +// createMessage 创建信息 +func createMessage(entry *logrus.Entry, hook *ElasticHook) *message { + level := entry.Level.String() + + if e, ok := entry.Data[logrus.ErrorKey]; ok && e != nil { + if err, ok := e.(error); ok { + entry.Data[logrus.ErrorKey] = err.Error() + } + } + + return &message{ + hook.host, + entry.Time.UTC().Format(time.RFC3339Nano), + entry.Message, + entry.Data, + strings.ToUpper(level), + } +} + +// syncFireFunc 异步发送 +func syncFireFunc(entry *logrus.Entry, hook *ElasticHook) error { + data, err := json.Marshal(createMessage(entry, hook)) + + if err != nil { + return err + } + + req := esapi.IndexRequest{ + Index: hook.index(), + Body: bytes.NewReader(data), + Refresh: "true", + } + + res, err := req.Do(hook.ctx, hook.client) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + + var r map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + log.Printf("Error parsing the response body: %s", err) + } else { + // Print the response status and indexed document version. + log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64))) + } + return err +} + +// Fire 实现 logrus hook 必须要的接口函数 +func (hook *ElasticHook) Fire(entry *logrus.Entry) error { + return hook.fireFunc(entry, hook) +} + +// Levels 实现 logrus hook 必须要的接口函数 +func (hook *ElasticHook) Levels() []logrus.Level { + return hook.levels +}