Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugins): Datadog for metrics collection #5372

Merged
merged 17 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 277 additions & 0 deletions apisix/plugins/datadog.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

local core = require("apisix.core")
local plugin = require("apisix.plugin")
local batch_processor = require("apisix.utils.batch-processor")
local fetch_log = require("apisix.utils.log-util").get_full_log
local ngx = ngx
local udp = ngx.socket.udp
local format = string.format
local concat = table.concat
local buffers = {}
local ipairs = ipairs
local tostring = tostring
local stale_timer_running = false
local timer_at = ngx.timer.at

local plugin_name = "datadog"
local defaults = {
host = "127.0.0.1",
port = 8125,
namespace = "apisix",
constant_tags = {"source:apisix"}
}

local schema = {
type = "object",
properties = {
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 5000},
max_retry_count = {type = "integer", minimum = 1, default = 1},
}
}

local metadata_schema = {
type = "object",
properties = {
host = {type = "string", default= defaults.host},
port = {type = "integer", minimum = 0, default = defaults.port},
namespace = {type = "string", default = defaults.namespace},
constant_tags = {
type = "array",
items = {type = "string"},
default = defaults.constant_tags
}
},
}

local _M = {
version = 0.1,
priority = 495,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}

function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end

local function generate_tag(entry, const_tags)
local tags
if const_tags and #const_tags > 0 then
tags = core.table.clone(const_tags)
else
tags = {}
end

-- priority on route name, if not found using the route id.
if entry.route_name ~= "" then
core.table.insert(tags, "route_name:" .. entry.route_name)
elseif entry.route_id and entry.route_id ~= "" then
core.table.insert(tags, "route_name:" .. entry.route_id)
end

if entry.service_id and entry.service_id ~= "" then
core.table.insert(tags, "service_id:" .. entry.service_id)
end

if entry.consumer and entry.consumer ~= "" then
core.table.insert(tags, "consumer:" .. entry.consumer)
end
if entry.balancer_ip ~= "" then
core.table.insert(tags, "balancer_ip:" .. entry.balancer_ip)
end
if entry.response.status then
core.table.insert(tags, "response_status:" .. entry.response.status)
end
if entry.scheme ~= "" then
core.table.insert(tags, "scheme:" .. entry.scheme)
end

if #tags > 0 then
return "|#" .. concat(tags, ',')
end

return ""
end

-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end

for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil
end
end

stale_timer_running = false
end

function _M.log(conf, ctx)

if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end

local entry = fetch_log(ngx, {})
entry.upstream_latency = ctx.var.upstream_response_time * 1000
entry.balancer_ip = ctx.balancer_ip or ""
entry.route_name = ctx.route_name or ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fall back to route_id if the name is missing. And need a test for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code block for fallback logic is at L86
Okay, adding tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use route_name even there is only route_id? Therefore the name of tag is consistent between routes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfectly makes sense, Updated accordingly. Thanks for pointing it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander ecommend using route name, won't the same routes name be confusing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! @tzssangglass thanks for the honest concern. Yeah, in a badly designed system, it might be an issue to segregate metrics of multiple routes having the same route_name. But users can always pick multiple tags for grouping (for eg. route_name:abc, service_id:12), provided they are designing APISIX infra with service level abstraction (just one example, they can use "balancer_ip, consumer" also for filtering).

@spacewander recommended using name over id because it's more readable to the user and there was this requirement in the Prometheus plugin (#5149). Do you think that we should add a param prefer_name at the plugin schema or we should defer it for the upcoming versions?
Please do let me know. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get. I think it is up to the community to decide if prefer_name is needed like the prometheus plugin.I suggest adding a note about that it could be misleading when route name could be duplicated for multiple.

entry.scheme = ctx.upstream_scheme or ""

local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
-- Fetching metadata details
local metadata = plugin.plugin_metadata(plugin_name)
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end

-- Creating a udp socket
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
core.log.info("sending batch metrics to dogstatsd: ", host, ":", port)

local ok, err = sock:setpeername(host, port)

if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end

-- Generate prefix & suffix according dogstatsd udp data format.
local prefix = metadata.value.namespace
if prefix ~= "" then
prefix = prefix .. "."
end

core.log.info("datadog batch_entry: ", core.json.delay_encode(entries, true))
for _, entry in ipairs(entries) do
local suffix = generate_tag(entry, metadata.value.constant_tags)

-- request counter
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"request.counter", 1, "c", suffix))
if not ok then
core.log.error("failed to report request count to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end


-- request latency histogram
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"request.latency", entry.latency, "h", suffix))
if not ok then
core.log.error("failed to report request latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- upstream latency
local apisix_latency = entry.latency
if entry.upstream_latency then
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"upstream.latency", entry.upstream_latency, "h", suffix))
if not ok then
core.log.error("failed to report upstream latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
apisix_latency = apisix_latency - entry.upstream_latency
if apisix_latency < 0 then
apisix_latency = 0
end
end

-- apisix_latency
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"apisix.latency", apisix_latency, "h", suffix))
if not ok then
core.log.error("failed to report apisix latency to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- request body size timer
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"ingress.size", entry.request.size, "ms", suffix))
if not ok then
core.log.error("failed to report req body size to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- response body size timer
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"egress.size", entry.response.size, "ms", suffix))
if not ok then
core.log.error("failed to report response body size to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
end

-- Releasing the UDP socket desciptor
ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end

-- Returning at the end and ensuring the resource has been released.
return true
end
local config = {
name = plugin_name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
route_id = ctx.var.route_id,
server_addr = ctx.var.server_addr,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[conf] = log_buffer
log_buffer:push(entry)
end

return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ plugins: # plugin list (sorted by priority)
#- dubbo-proxy # priority: 507
- grpc-transcode # priority: 506
- prometheus # priority: 500
- datadog # priority: 495
- echo # priority: 412
- http-logger # priority: 410
- sls-logger # priority: 406
Expand Down
3 changes: 2 additions & 1 deletion docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@
"plugins/prometheus",
"plugins/zipkin",
"plugins/skywalking",
"plugins/node-status"
"plugins/node-status",
"plugins/datadog"
]
},
{
Expand Down
Loading