Skip to content

Commit

Permalink
Add parse_ints config in json parser to support parsing int or floa…
Browse files Browse the repository at this point in the history
…t properly (#33699)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
expose json iterator config in json parser

**Link to tracking Issue:** <Issue number if applicable>

Fixes #33696

**Testing:** <Describe what testing was performed and which tests were
added.>

added.

**Documentation:** <Describe the documentation added.>

updated
  • Loading branch information
newly12 authored Jul 3, 2024
1 parent eabe829 commit 62ea24b
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/json_parser_number_data_type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `parse_ints` config in json parser to support parsing int or float properly

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33696]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
21 changes: 11 additions & 10 deletions pkg/stanza/docs/operators/json_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ The `json_parser` operator parses the string-type field selected by `parse_from`

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `json_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `json_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
| `parse_ints` | `false` | Numbers like `int` and `float` are parsed as `float64` by default. When `parse_ints` is enabled, numbers are parsed as `json.Number` and then converted to `int64` or `float64` based on the value. However, this also introduces additional overhead. |

### Embedded Operations

Expand Down
3 changes: 3 additions & 0 deletions pkg/stanza/operator/parser/json/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func NewConfigWithID(operatorID string) *Config {
// Config is the configuration of a JSON parser operator.
type Config struct {
helper.ParserConfig `mapstructure:",squash"`

ParseInts bool `mapstructure:"parse_ints"`
}

// Build will build a JSON parser operator.
Expand All @@ -42,5 +44,6 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error

return &Parser{
ParserOperator: parserOperator,
parseInts: c.ParseInts,
}, nil
}
8 changes: 8 additions & 0 deletions pkg/stanza/operator/parser/json/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func TestConfig(t *testing.T) {
return p
}(),
},
{
Name: "parse_ints",
Expect: func() *Config {
p := NewConfig()
p.ParseInts = true
return p
}(),
},
},
}.Run(t)
}
61 changes: 58 additions & 3 deletions pkg/stanza/operator/parser/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package json // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"fmt"
"strings"

"github.com/goccy/go-json"

Expand All @@ -16,6 +17,8 @@ import (
// Parser is an operator that parses JSON.
type Parser struct {
helper.ParserOperator

parseInts bool
}

// Process will parse an entry for JSON.
Expand All @@ -28,12 +31,64 @@ func (p *Parser) parse(value any) (any, error) {
var parsedValue map[string]any
switch m := value.(type) {
case string:
err := json.Unmarshal([]byte(m), &parsedValue)
if err != nil {
return nil, err
// when parseInts is disabled, `int` and `float` will be parsed as `float64`.
// when it is enabled, they will be parsed as `json.Number`, later the parser
// will convert them to `int` or `float64` according to the field type.
if p.parseInts {
d := json.NewDecoder(strings.NewReader(m))
d.UseNumber()
err := d.Decode(&parsedValue)
if err != nil {
return nil, err
}
convertNumbers(parsedValue)
} else {
err := json.Unmarshal([]byte(m), &parsedValue)
if err != nil {
return nil, err
}
}
default:
return nil, fmt.Errorf("type %T cannot be parsed as JSON", value)
}

return parsedValue, nil
}

func convertNumbers(parsedValue map[string]any) {
for k, v := range parsedValue {
switch t := v.(type) {
case json.Number:
parsedValue[k] = convertNumber(t)
case map[string]any:
convertNumbers(t)
case []any:
convertNumbersArray(t)
}
}
}

func convertNumbersArray(arr []any) {
for i, v := range arr {
switch t := v.(type) {
case json.Number:
arr[i] = convertNumber(t)
case map[string]any:
convertNumbers(t)
case []any:
convertNumbersArray(t)
}
}
}

func convertNumber(value json.Number) any {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}
127 changes: 127 additions & 0 deletions pkg/stanza/operator/parser/json/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,116 @@ func TestParser(t *testing.T) {
ScopeName: "logger",
},
},
{
"parse_ints_disabled",
func(_ *Config) {},
&entry.Entry{
Body: `{"int":1,"float":1.0}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": float64(1),
"float": float64(1),
},
Body: `{"int":1,"float":1.0}`,
},
},
{
"parse_ints_simple",
func(p *Config) {
p.ParseInts = true
},
&entry.Entry{
Body: `{"int":1,"float":1.0}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": int64(1),
"float": float64(1),
},
Body: `{"int":1,"float":1.0}`,
},
},
{
"parse_ints_nested",
func(p *Config) {
p.ParseInts = true
},
&entry.Entry{
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0}}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": int64(1),
"float": float64(1),
"nested": map[string]any{
"int": int64(2),
"float": float64(2),
},
},
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0}}`,
},
},
{
"parse_ints_arrays",
func(p *Config) {
p.ParseInts = true
},
&entry.Entry{
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0},"array":[1,2]}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": int64(1),
"float": float64(1),
"nested": map[string]any{
"int": int64(2),
"float": float64(2),
},
"array": []any{int64(1), int64(2)},
},
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0},"array":[1,2]}`,
},
},
{
"parse_ints_mixed_arrays",
func(p *Config) {
p.ParseInts = true
},
&entry.Entry{
Body: `{"int":1,"float":1.0,"mixed_array":[1,1.5,2]}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": int64(1),
"float": float64(1),
"mixed_array": []any{int64(1), float64(1.5), int64(2)},
},
Body: `{"int":1,"float":1.0,"mixed_array":[1,1.5,2]}`,
},
},
{
"parse_ints_nested_arrays",
func(p *Config) {
p.ParseInts = true
},
&entry.Entry{
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0,"array":[1,2]},"array":[3,4]}`,
},
&entry.Entry{
Attributes: map[string]any{
"int": int64(1),
"float": float64(1),
"nested": map[string]any{
"int": int64(2),
"float": float64(2),
"array": []any{int64(1), int64(2)},
},
"array": []any{int64(3), int64(4)},
},
Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0,"array":[1,2]},"array":[3,4]}`,
},
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -176,6 +286,23 @@ func BenchmarkProcess(b *testing.B) {
parser, err := cfg.Build(componenttest.NewNopTelemetrySettings())
require.NoError(b, err)

benchmarkOperator(b, parser)
}

func BenchmarkProcessParseInts(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

cfg := NewConfig()
cfg.ParseInts = true

parser, err := cfg.Build(componenttest.NewNopTelemetrySettings())
require.NoError(b, err)

benchmarkOperator(b, parser)
}

func benchmarkOperator(b *testing.B, parser operator.Operator) {
body, err := os.ReadFile(filepath.Join("testdata", "testdata.json"))
require.NoError(b, err)

Expand Down
3 changes: 3 additions & 0 deletions pkg/stanza/operator/parser/json/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ timestamp:
parse_from: body.timestamp_field
layout_type: strptime
layout: '%Y-%m-%d'
parse_ints:
type: json_parser
parse_ints: true

0 comments on commit 62ea24b

Please sign in to comment.