Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: client cause memory leak #5265

Closed
Arc-Jung opened this issue Jan 3, 2022 · 3 comments
Closed

pubsub: client cause memory leak #5265

Arc-Jung opened this issue Jan 3, 2022 · 3 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.

Comments

@Arc-Jung
Copy link

Arc-Jung commented Jan 3, 2022

Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

Is this a client library issue or a product issue? We will only be able to assist with issues that pertain to the behaviors of this library. If the issue you're experiencing is due to the behavior of the product itself, please visit the Support page to reach the most relevant engineers.

If the support paths suggested above still do not result in a resolution, please provide the following details.

Environment details

Steps to reproduce

1. I try coding to this example.

// https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-concurrency-control

func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
        // projectID := "my-project-id"
        // subID := "my-sub"
        ctx := context.Background()
        client, err := pubsub.NewClient(ctx, projectID)
        if err != nil {
                return fmt.Errorf("pubsub.NewClient: %v", err)
        }
        defer client.Close()

        sub := client.Subscription(subID)
        // Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
        // concurrency settings. Otherwise, NumGoroutines will be set to 1.
        sub.ReceiveSettings.Synchronous = false
        // NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
        // messages.
        sub.ReceiveSettings.NumGoroutines = 16
        // MaxOutstandingMessages limits the number of concurrent handlers of messages.
        // In this case, up to 8 unacked messages can be handled concurrently.
        sub.ReceiveSettings.MaxOutstandingMessages = 8

        // Receive messages for 30 seconds.
        ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
        defer cancel()

        var counter int32

        // Receive blocks until the context is cancelled or an error occurs.
        err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
                // The message handler passed to Receive may be called concurrently
                // so it's okay to process the messages concurrently but make sure
                // to synchronize access to shared memory.
                atomic.AddInt32(&counter, 1)
                msg.Ack()
        })
        if err != nil {
                return fmt.Errorf("pubsub: Receive returned error: %v", err)
        }
        fmt.Fprintf(w, "Received %d messages\n", counter)

        return nil
}

2. I founded at this example that memory leak accident.

스크린샷 2021-12-30 오후 2 36 11

heap redbox

3. Therefore i inserted code that use garbage collection when GCP PubSub Client pull message.

스크린샷 2021-12-30 오후 12 07 53

4. Even after doing this solution, memory leak occur. I don't know that cause memory leak when subscription client receive GCP PubSub message.

스크린샷 2022-01-03 오전 11 28 02

Making sure to follow these steps will guarantee the quickest resolution possible.

Thanks!

@ghost ghost assigned codyoss Jan 3, 2022
@codyoss
Copy link
Member

codyoss commented Jan 4, 2022

Transferring this issue to the correct repo.

@codyoss codyoss transferred this issue from googleapis/google-api-go-client Jan 4, 2022
@codyoss codyoss changed the title GCP PubSub Client cause memory leak. pubsub: client cause memory leak Jan 4, 2022
@codyoss codyoss assigned hongalex and unassigned codyoss Jan 4, 2022
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Jan 4, 2022
@hongalex
Copy link
Member

hongalex commented Jan 4, 2022

Thanks for reporting. What is your publish rate? Also, is there anything underneath the last line, msg.Ack, in your last screenshot?

@codyoss codyoss added the api: pubsub Issues related to the Pub/Sub API. label Jan 4, 2022
@Arc-Jung
Copy link
Author

Arc-Jung commented Jan 5, 2022

I am so sorry. I checked this situation. Solution is here.

func PullMessage(w io.Writer, projectID string, subID string) error {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile("./auth_env.json"))

	if err != nil {
		log.Errorf("PubSub Credentials Error : ", err)
		return err
	}
	defer client.Close()

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.NumGoroutines = 32
	sub.ReceiveSettings.MaxOutstandingMessages = 32

	ctx, cancel := context.WithCancel(ctx)
	ctx, _ = context.WithTimeout(ctx, 60*time.Second) // Receive messages for 60 seconds. If timeout then Deadline Except occur.
	defer cancel()

As my code write, difference occur with example. difference is timeout function.
I change my code that modify "ctx, _" to "ctx, cancel"

BEFORE

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.NumGoroutines = 32
	sub.ReceiveSettings.MaxOutstandingMessages = 32

	ctx, cancel := context.WithCancel(ctx)
	ctx, _ = context.WithTimeout(ctx, 60*time.Second) // Receive messages for 60 seconds. If timeout then Deadline Except occur.
	defer cancel()

AFTER

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.NumGoroutines = 32
	sub.ReceiveSettings.MaxOutstandingMessages = 32

	ctx, cancel = context.WithTimeout(ctx, 60*time.Second) // Receive messages for 60 seconds. If timeout then Deadline Except occur.
	defer cancel()

However, if a similar error occurs again, I will write an issue again.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. triage me I really want to be triaged.
Projects
None yet
Development

No branches or pull requests

4 participants