Skip to content

Commit

Permalink
chore: Fix linter findings for revive:exported in `plugins/inputs/c…
Browse files Browse the repository at this point in the history
  • Loading branch information
zak-pawel authored and asaharn committed Oct 16, 2024
1 parent ef2784e commit a7f8625
Show file tree
Hide file tree
Showing 26 changed files with 585 additions and 608 deletions.
68 changes: 34 additions & 34 deletions plugins/inputs/ceph/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,6 @@ func (c *Ceph) gatherClusterStats(acc telegraf.Accumulator) error {
return nil
}

func init() {
inputs.Add(measurement, func() telegraf.Input {
return &Ceph{
CephBinary: "/usr/bin/ceph",
OsdPrefix: osdPrefix,
MonPrefix: monPrefix,
MdsPrefix: mdsPrefix,
RgwPrefix: rgwPrefix,
SocketDir: "/var/run/ceph",
SocketSuffix: sockSuffix,
CephUser: "client.admin",
CephConfig: "/etc/ceph/ceph.conf",
GatherAdminSocketStats: true,
GatherClusterStats: false,
}
})
}

// Run ceph perf schema on the passed socket. The output is a JSON string
// mapping collection names to a map of counter names to information.
//
Expand Down Expand Up @@ -428,8 +410,8 @@ func (c *Ceph) execute(command string) (string, error) {
return output, nil
}

// CephStatus is used to unmarshal "ceph -s" output
type CephStatus struct {
// status is used to unmarshal "ceph -s" output
type status struct {
FSMap struct {
NumIn float64 `json:"in"`
NumMax float64 `json:"max"`
Expand Down Expand Up @@ -492,12 +474,12 @@ type CephStatus struct {

// decodeStatus decodes the output of 'ceph -s'
func decodeStatus(acc telegraf.Accumulator, input string) error {
data := &CephStatus{}
data := &status{}
if err := json.Unmarshal([]byte(input), data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err)
}

decoders := []func(telegraf.Accumulator, *CephStatus) error{
decoders := []func(telegraf.Accumulator, *status) error{
decodeStatusFsmap,
decodeStatusHealth,
decodeStatusMonmap,
Expand All @@ -516,7 +498,7 @@ func decodeStatus(acc telegraf.Accumulator, input string) error {
}

// decodeStatusFsmap decodes the FS map portion of the output of 'ceph -s'
func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusFsmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{
"in": data.FSMap.NumIn,
"max": data.FSMap.NumMax,
Expand All @@ -528,7 +510,7 @@ func decodeStatusFsmap(acc telegraf.Accumulator, data *CephStatus) error {
}

// decodeStatusHealth decodes the health portion of the output of 'ceph status'
func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusHealth(acc telegraf.Accumulator, data *status) error {
statusCodes := map[string]float64{
"HEALTH_ERR": 0,
"HEALTH_WARN": 1,
Expand All @@ -544,7 +526,7 @@ func decodeStatusHealth(acc telegraf.Accumulator, data *CephStatus) error {
}

// decodeStatusMonmap decodes the Mon map portion of the output of 'ceph -s'
func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusMonmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{
"num_mons": data.MonMap.NumMons,
}
Expand All @@ -553,7 +535,7 @@ func decodeStatusMonmap(acc telegraf.Accumulator, data *CephStatus) error {
}

// decodeStatusOsdmap decodes the OSD map portion of the output of 'ceph -s'
func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusOsdmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{
"epoch": data.OSDMap.Epoch,
"num_in_osds": data.OSDMap.NumInOSDs,
Expand All @@ -578,7 +560,7 @@ func decodeStatusOsdmap(acc telegraf.Accumulator, data *CephStatus) error {
}

// decodeStatusPgmap decodes the PG map portion of the output of 'ceph -s'
func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusPgmap(acc telegraf.Accumulator, data *status) error {
fields := map[string]interface{}{
"bytes_avail": data.PGMap.BytesAvail,
"bytes_total": data.PGMap.BytesTotal,
Expand Down Expand Up @@ -609,7 +591,7 @@ func decodeStatusPgmap(acc telegraf.Accumulator, data *CephStatus) error {
}

// decodeStatusPgmapState decodes the PG map state portion of the output of 'ceph -s'
func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error {
func decodeStatusPgmapState(acc telegraf.Accumulator, data *status) error {
for _, pgState := range data.PGMap.PGsByState {
tags := map[string]string{
"state": pgState.StateName,
Expand All @@ -622,8 +604,8 @@ func decodeStatusPgmapState(acc telegraf.Accumulator, data *CephStatus) error {
return nil
}

// CephDF is used to unmarshal 'ceph df' output
type CephDf struct {
// df is used to unmarshal 'ceph df' output
type df struct {
Stats struct {
NumOSDs float64 `json:"num_osds"`
NumPerPoolOmapOSDs float64 `json:"num_per_pool_omap_osds"`
Expand Down Expand Up @@ -653,7 +635,7 @@ type CephDf struct {

// decodeDf decodes the output of 'ceph df'
func decodeDf(acc telegraf.Accumulator, input string) error {
data := &CephDf{}
data := &df{}
if err := json.Unmarshal([]byte(input), data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err)
}
Expand Down Expand Up @@ -705,8 +687,8 @@ func decodeDf(acc telegraf.Accumulator, input string) error {
return nil
}

// CephOSDPoolStats is used to unmarshal 'ceph osd pool stats' output
type CephOSDPoolStats []struct {
// osdPoolStats is used to unmarshal 'ceph osd pool stats' output
type osdPoolStats []struct {
PoolName string `json:"pool_name"`
ClientIORate struct {
OpPerSec float64 `json:"op_per_sec"` // This field is no longer reported in ceph 10 and later
Expand All @@ -732,7 +714,7 @@ type CephOSDPoolStats []struct {

// decodeOsdPoolStats decodes the output of 'ceph osd pool stats'
func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error {
data := CephOSDPoolStats{}
data := osdPoolStats{}
if err := json.Unmarshal([]byte(input), &data); err != nil {
return fmt.Errorf("failed to parse json: %q: %w", input, err)
}
Expand Down Expand Up @@ -763,3 +745,21 @@ func decodeOsdPoolStats(acc telegraf.Accumulator, input string) error {

return nil
}

func init() {
inputs.Add(measurement, func() telegraf.Input {
return &Ceph{
CephBinary: "/usr/bin/ceph",
OsdPrefix: osdPrefix,
MonPrefix: monPrefix,
MdsPrefix: mdsPrefix,
RgwPrefix: rgwPrefix,
SocketDir: "/var/run/ceph",
SocketSuffix: sockSuffix,
CephUser: "client.admin",
CephConfig: "/etc/ceph/ceph.conf",
GatherAdminSocketStats: true,
GatherClusterStats: false,
}
})
}
100 changes: 50 additions & 50 deletions plugins/inputs/chrony/chrony.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,6 @@ func (*Chrony) SampleConfig() string {
return sampleConfig
}

// dialUnix opens an unixgram connection with chrony
func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) {
dir := path.Dir(address)
c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String()))
conn, err := net.DialUnix("unixgram",
&net.UnixAddr{Name: c.local, Net: "unixgram"},
&net.UnixAddr{Name: address, Net: "unixgram"},
)

if err != nil {
return nil, err
}

filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32)
if err != nil {
return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err)
}

if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil {
return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err)
}

group, err := user.LookupGroup(c.SocketGroup)
if err != nil {
return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err)
}
gid, err := strconv.Atoi(group.Gid)
if err != nil {
return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err)
}
if err := os.Chown(c.local, os.Getuid(), gid); err != nil {
return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err)
}

return conn, nil
}

func (c *Chrony) Init() error {
// Use the configured server, if none set, we try to guess it in Start()
if c.Server != "" {
Expand Down Expand Up @@ -182,19 +145,6 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
return nil
}

func (c *Chrony) Stop() {
if c.conn != nil {
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err)
}
}
if c.local != "" {
if err := os.Remove(c.local); err != nil {
c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err)
}
}
}

func (c *Chrony) Gather(acc telegraf.Accumulator) error {
for _, m := range c.Metrics {
switch m {
Expand All @@ -216,6 +166,56 @@ func (c *Chrony) Gather(acc telegraf.Accumulator) error {
return nil
}

func (c *Chrony) Stop() {
if c.conn != nil {
if err := c.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
c.Log.Errorf("Closing connection to %q failed: %v", c.Server, err)
}
}
if c.local != "" {
if err := os.Remove(c.local); err != nil {
c.Log.Errorf("Removing temporary socket %q failed: %v", c.local, err)
}
}
}

// dialUnix opens an unixgram connection with chrony
func (c *Chrony) dialUnix(address string) (*net.UnixConn, error) {
dir := path.Dir(address)
c.local = path.Join(dir, fmt.Sprintf("chrony-telegraf-%s.sock", uuid.New().String()))
conn, err := net.DialUnix("unixgram",
&net.UnixAddr{Name: c.local, Net: "unixgram"},
&net.UnixAddr{Name: address, Net: "unixgram"},
)

if err != nil {
return nil, err
}

filemode, err := strconv.ParseUint(c.SocketPerms, 8, 32)
if err != nil {
return nil, fmt.Errorf("parsing file mode %q failed: %w", c.SocketPerms, err)
}

if err := os.Chmod(c.local, os.FileMode(filemode)); err != nil {
return nil, fmt.Errorf("changing file mode of %q failed: %w", c.local, err)
}

group, err := user.LookupGroup(c.SocketGroup)
if err != nil {
return nil, fmt.Errorf("looking up group %q failed: %w", c.SocketGroup, err)
}
gid, err := strconv.Atoi(group.Gid)
if err != nil {
return nil, fmt.Errorf("parsing group ID %q failed: %w", group.Gid, err)
}
if err := os.Chown(c.local, os.Getuid(), gid); err != nil {
return nil, fmt.Errorf("changing group of %q failed: %w", c.local, err)
}

return conn, nil
}

func (c *Chrony) gatherActivity(acc telegraf.Accumulator) error {
req := fbchrony.NewActivityPacket()
r, err := c.client.Communicate(req)
Expand Down
Loading

0 comments on commit a7f8625

Please sign in to comment.