Skip to content

Commit

Permalink
Fix empty field error in the iis/application pool metricset (elastic#…
Browse files Browse the repository at this point in the history
…19537)

* fix

* update test

* changelog

* change

* refactor

* close
  • Loading branch information
narph authored Jul 14, 2020
1 parent 8c5c5d4 commit 134324e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix incorrect usage of hints builder when exposed port is a substring of the hint {pull}19052[19052]
- Remove dedot for tag values in aws module. {issue}19112[19112] {pull}19221[19221]
- Stop counterCache only when already started {pull}19103[19103]
- Fix empty field name errors in the application pool metricset. {pull}19537[19537]
- Set tags correctly if the dimension value is ARN {issue}19111[19111] {pull}19433[19433]
- Fix bug incorrect parsing of float numbers as integers in Couchbase module {issue}18949[18949] {pull}19055[19055]
- Fix mapping of service start type in the service metricset, windows module. {pull}19551[19551]
Expand Down
22 changes: 13 additions & 9 deletions x-pack/metricbeat/module/iis/application_pool/application_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}
// instantiate reader object
reader, err := newReader()
reader, err := newReader(config)
if err != nil {
return nil, err
}
Expand All @@ -55,22 +55,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logp.NewLogger("application pool"),
reader: reader,
}
if err := ms.reader.initCounters(config.Names); err != nil {
return ms, err
}

return ms, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
var config Config
if err := m.Module().UnpackConfig(&config); err != nil {
return nil
// refresh performance counter list
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
// A flag is set if the second call has been executed else refresh will fail (reader.executed)
if m.reader.executed {
err := m.reader.initAppPools()
if err != nil {
return errors.Wrap(err, "failed retrieving counters")
}
}

events, err := m.reader.fetch(config.Names)
events, err := m.reader.read()
if err != nil {
return errors.Wrap(err, "failed reading counters")
}
Expand All @@ -81,6 +84,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
break
}
}

return nil
}

Expand Down
169 changes: 90 additions & 79 deletions x-pack/metricbeat/module/iis/application_pool/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,41 @@ package application_pool
import (
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/helper/windows/pdh"
"github.com/elastic/go-sysinfo"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper/windows/pdh"
"github.com/elastic/beats/v7/metricbeat/mb"
)

// Reader strucr will contain the pdh query and config options
const ecsProcessId = "process.pid"

// Reader will contain the config options
type Reader struct {
Query pdh.Query // PDH Query
ApplicationPools []ApplicationPool // Mapping of counter path to key used for the label (e.g. processor.name)
log *logp.Logger // logger
hasRun bool // will check if the reader has run a first time
WorkerProcesses map[string]string
applicationPools []ApplicationPool
workerProcesses map[string]string
query pdh.Query // PDH Query
executed bool // Indicates if the query has been executed.
log *logp.Logger //
config Config // Metricset configuration
}

// ApplicationPool struct contains the list of applications and their worker processes
type ApplicationPool struct {
Name string
WorkerProcessIds []int
name string
workerProcessIds []int
counters map[string]string
}

// WorkerProcess struct contains the worker process details
type WorkerProcess struct {
ProcessId int
InstanceName string
processId int
instanceName string
}

const ecsProcessId = "process.pid"

var appPoolCounters = map[string]string{
"process.pid": "\\Process(w3wp*)\\ID Process",
"process.cpu_usage_perc": "\\Process(w3wp*)\\% Processor Time",
Expand All @@ -62,87 +63,107 @@ var appPoolCounters = map[string]string{
}

// newReader creates a new instance of Reader.
func newReader() (*Reader, error) {
func newReader(config Config) (*Reader, error) {
var query pdh.Query
if err := query.Open(); err != nil {
return nil, err
}
reader := &Reader{
Query: query,
log: logp.NewLogger("website"),
r := &Reader{
query: query,
log: logp.NewLogger("application_pool"),
config: config,
workerProcesses: make(map[string]string),
}

return reader, nil
err := r.initAppPools()
if err != nil {
return nil, errors.Wrap(err, "error loading counters for existing app pools")
}
return r, nil
}

// initCounters func retrieves the running application worker processes and adds the counters to the pdh query
func (re *Reader) initCounters(filtered []string) error {
apps, err := getApplicationPools(filtered)
// initAppPools will check for any new instances and add them to the counter list
func (r *Reader) initAppPools() error {
apps, err := getApplicationPools(r.config.Names)
if err != nil {
return errors.Wrap(err, "failed retrieving running worker processes")
}
r.applicationPools = apps
if len(apps) == 0 {
re.log.Info("no running application pools found")
r.log.Info("no running application pools found")
return nil
}
re.ApplicationPools = apps
re.WorkerProcesses = make(map[string]string)
var newQueries []string
r.workerProcesses = make(map[string]string)
for key, value := range appPoolCounters {
counters, err := re.Query.ExpandWildCardPath(value)
childQueries, err := r.query.GetCounterPaths(value)
if err != nil {
re.log.Error(err, `failed to expand counter path (query="%v")`, value)
if err == pdh.PDH_CSTATUS_NO_COUNTER || err == pdh.PDH_CSTATUS_NO_COUNTERNAME || err == pdh.PDH_CSTATUS_NO_INSTANCE || err == pdh.PDH_CSTATUS_NO_OBJECT {
r.log.Infow("Ignoring non existent counter", "error", err,
logp.Namespace("application pool"), "query", value)
continue
} else {
return errors.Wrapf(err, `failed to expand counter (query="%v")`, value)
}
}
newQueries = append(newQueries, childQueries...)
// check if the pdhexpandcounterpath/pdhexpandwildcardpath functions have expanded the counter successfully.
if len(childQueries) == 0 || (len(childQueries) == 1 && strings.Contains(childQueries[0], "*")) {
// covering cases when PdhExpandWildCardPathW returns no counter paths or is unable to expand and the ignore_non_existent_counters flag is set
r.log.Debugw("No counter paths returned but PdhExpandWildCardPathW returned no errors", "initial query", value,
logp.Namespace("perfmon"), "expanded query", childQueries)
continue
}
for _, count := range counters {
if err = re.Query.AddCounter(count, "", "float", true); err != nil {
return errors.Wrapf(err, `failed to add counter (query="%v")`, count)
for _, v := range childQueries {
if err := r.query.AddCounter(v, "", "float", len(childQueries) > 1); err != nil {
return errors.Wrapf(err, `failed to add counter (query="%v")`, v)
}
newQueries = append(newQueries, count)
re.WorkerProcesses[count] = key
r.workerProcesses[v] = key
}
}
err = re.Query.RemoveUnusedCounters(newQueries)
err = r.query.RemoveUnusedCounters(newQueries)
if err != nil {
return errors.Wrap(err, "failed removing unused counter values")
}
return nil
}

// fetch executes collects the query data and maps the counter values to events.
func (re *Reader) fetch(names []string) ([]mb.Event, error) {
// refresh performance counter list
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
// A flag is set if the second call has been executed else refresh will fail (reader.executed)
if re.hasRun || len(re.Query.Counters) == 0 {
err := re.initCounters(names)
if err != nil {
return nil, errors.Wrap(err, "failed retrieving counters")
}
}
// if the ignore_non_existent_counters flag is set and no valid counter paths are found the Read func will still execute, a check is done before
if len(re.Query.Counters) == 0 {
// read executes a query and returns those values in an event.
func (r *Reader) read() ([]mb.Event, error) {
if len(r.applicationPools) == 0 {
r.executed = true
return nil, nil
}

// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if err := re.Query.CollectData(); err != nil {
if err := r.query.CollectData(); err != nil {
return nil, errors.Wrap(err, "failed querying counter values")
}

// Get the values.
values, err := re.Query.GetFormattedCounterValues()
values, err := r.query.GetFormattedCounterValues()
if err != nil {
r.query.Close()
return nil, errors.Wrap(err, "failed formatting counter values")
}
var events []mb.Event
eventGroup := r.mapEvents(values)
r.executed = true
results := make([]mb.Event, 0, len(events))
for _, val := range eventGroup {
results = append(results, val)
}
return results, nil
}

func (r *Reader) mapEvents(values map[string][]pdh.CounterValue) map[string]mb.Event {
workers := getProcessIds(values)
events := make(map[string]mb.Event)
for _, appPool := range re.ApplicationPools {
events[appPool.Name] = mb.Event{
for _, appPool := range r.applicationPools {
events[appPool.name] = mb.Event{
MetricSetFields: common.MapStr{
"name": appPool.Name,
"name": appPool.name,
},
RootFields: common.MapStr{},
}
Expand All @@ -151,45 +172,35 @@ func (re *Reader) fetch(names []string) ([]mb.Event, error) {
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.hasRun {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err, logp.Namespace("application_pool"), "query", counterPath)
if !r.executed {
continue
}
// The counter has a negative value or the counter was successfully found, but the data returned is not valid.
// This error can occur if the counter value is less than the previous value. (Because counter values always increment, the counter value rolls over to zero when it reaches its maximum value.)
// This is not an error that stops the application from running successfully and a positive counter value should be retrieved in the later calls.
if val.Err.Error == pdh.PDH_CALC_NEGATIVE_VALUE || val.Err.Error == pdh.PDH_INVALID_DATA {
re.log.Debugw("Counter value retrieval returned",
r.log.Debugw("Counter value retrieval returned",
"error", val.Err.Error, "cstatus", pdh.PdhErrno(val.Err.CStatus), logp.Namespace("application_pool"), "query", counterPath)
continue
}
}
if val.Instance == appPool.Name {
events[appPool.Name].MetricSetFields.Put(appPool.counters[counterPath], val.Measurement)
} else if hasWorkerProcess(val.Instance, workers, appPool.WorkerProcessIds) {
if re.WorkerProcesses[counterPath] == ecsProcessId {
events[appPool.Name].RootFields.Put(re.WorkerProcesses[counterPath], val.Measurement)
} else {
events[appPool.Name].MetricSetFields.Put(re.WorkerProcesses[counterPath], val.Measurement)
if hasWorkerProcess(val.Instance, workers, appPool.workerProcessIds) {
if r.workerProcesses[counterPath] == ecsProcessId {
events[appPool.name].RootFields.Put(r.workerProcesses[counterPath], val.Measurement)
} else if len(r.workerProcesses[counterPath]) != 0 {
events[appPool.name].MetricSetFields.Put(r.workerProcesses[counterPath], val.Measurement)
}
}
}

}
}

re.hasRun = true
results := make([]mb.Event, 0, len(events))
for _, val := range events {
results = append(results, val)
}
return results, nil
return events
}

// Close will close the PDH query for now.
func (re *Reader) close() error {
return re.Query.Close()
// close will close the PDH query for now.
func (r *Reader) close() error {
return r.query.Close()
}

// getApplicationPools method retrieves the w3wp.exe processes and the application pool name, also filters on the application pool names configured by users
Expand All @@ -204,15 +215,15 @@ func getApplicationPools(names []string) ([]ApplicationPool, error) {
}
var applicationPools []ApplicationPool
for key, value := range appPools {
applicationPools = append(applicationPools, ApplicationPool{Name: key, WorkerProcessIds: value})
applicationPools = append(applicationPools, ApplicationPool{name: key, workerProcessIds: value})
}
if len(names) == 0 {
return applicationPools, nil
}
var filtered []ApplicationPool
for _, n := range names {
for _, w3 := range applicationPools {
if n == w3.Name {
if n == w3.name {
filtered = append(filtered, w3)
}
}
Expand Down Expand Up @@ -253,18 +264,18 @@ func getProcessIds(counterValues map[string][]pdh.CounterValue) []WorkerProcess
var workers []WorkerProcess
for key, values := range counterValues {
if strings.Contains(key, "\\ID Process") {
workers = append(workers, WorkerProcess{InstanceName: values[0].Instance, ProcessId: int(values[0].Measurement.(float64))})
workers = append(workers, WorkerProcess{instanceName: values[0].Instance, processId: int(values[0].Measurement.(float64))})
}
}
return workers
}

// hasWorkerProcess func checks if workerprocess list contains the process id
// hasWorkerProcess func checks if worker process list contains the process id
func hasWorkerProcess(instance string, workers []WorkerProcess, pids []int) bool {
for _, worker := range workers {
if worker.InstanceName == instance {
if worker.instanceName == instance {
for _, pid := range pids {
if pid == worker.ProcessId {
if pid == worker.processId {
return true
}
}
Expand Down
Loading

0 comments on commit 134324e

Please sign in to comment.