Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub Subscriber client memory leak #273

Closed
capt2101akash opened this issue Jan 12, 2021 · 54 comments
Closed

PubSub Subscriber client memory leak #273

capt2101akash opened this issue Jan 12, 2021 · 54 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. status: blocked Resolving the issue is dependent on other work. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@capt2101akash
Copy link

capt2101akash commented Jan 12, 2021

Environment details

  1. Specify the API at the beginning of the title (for example, "BigQuery: ...") - PubSub
    General, Core, and Other are also allowed as types
  2. OS type and version - linux microservice
  3. Python version and virtual environment information: python --version Python 3.7

Steps to reproduce

Hi guys, I have created as part of a project a simple subscriber client which get iot data on a real time basis. My subscriber client runs indefinitely and in the callback function I do some transformation in the messages that comes. I have deployed this client as a microservice in GKE, however to utilise GKE more efficiently I was looking into utilising the hpa. I observed that the subscriber client's memory always increases, it never goes down even if the queue is empty. I have tried cancelling thread and closing the subscriber client by putting it in a while loop to release socket resources but in the next iteration, I see the memory has not been freed and it stacks up. I have used memory profiler to detect the memory leak. I know this issue has been discussed in past(#5001) but I am utilizing the latest libraries but still facing this. Any help would be really appreciated.
Providing a sub-sample of code where majority of work happens. I am putting all the messages in a global list so that I can send them for further transformation downstream(If I add the transformation of messages in the callback function the memory consumption increases more rapidly). Adding the GKE dashboard screenshot too for memory consumption and CPU usage. My expectation and the general workflow should ideally be whenever there are spikes in CPU i.e whenever messages are received from queue the memory consumption should increase and after that it should release the memory.

def message_ack(message):
    global message_list
    message_list.append(message)
    message.ack()

@profile
def pull_messages():
    future = subscriber.subscribe(subscription_name, message_ack, flow_control=flow_control)
    try:
        future.result(timeout=300)
    except TimeoutError: 
        # logger.error(traceback.print_exc())
        future.cancel()

if __name__ == '__main__':
  flow_control = pubsub_v1.types.FlowControl(max_messages=30)
  subscriber = pubsub_v1.SubscriberClient()
  while 1:
        message_list = []
        pull_messages()

GKE dashboards

Thanks!

@plamut plamut transferred this issue from googleapis/google-cloud-python Jan 12, 2021
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jan 12, 2021
@plamut plamut added the type: question Request for information or clarification. Not an issue. label Jan 12, 2021
@plamut
Copy link
Contributor

plamut commented Jan 12, 2021

@capt2101akash Thanks for the report.

As a start, could you please share the library versions that your application uses, i.e. the output of pip freeze? Also, could you also share a sample code that reproduces the issue if that's possible? How many messages are we generally talking about here? Over what period of time was the memory consumption measured?

The subscriber client has an internal waiting queue of all received messages and only dispatches a subset of them to the user-provided callback (i.e. the flow control), while automatically extending acknowledge deadlines for these messages and such. If these Python instances of these messages are kept alive for some reason, that could explain the ever-reporting memory usage growth as the new messages are streamed from the server - but this is just an educated guess for now.

Any additional info would be appreciated, looking forward to it!

@capt2101akash
Copy link
Author

capt2101akash commented Jan 12, 2021

@capt2101akash Thanks for the report.

As a start, could you please share the library versions that your application uses, i.e. the output of pip freeze? Also, could you also share a sample code that reproduces the issue if that's possible? How many messages are we generally talking about here? Over what period of time was the memory consumption measured?

The subscriber client has an internal waiting queue of all received messages and only dispatches a subset of them to the user-provided callback (i.e. the flow control), while automatically extending acknowledge deadlines for these messages and such. If these Python instances of these messages are kept alive for some reason, that could explain the ever-reporting memory usage growth as the new messages are streamed from the server - but this is just an educated guess for now.

Any additional info would be appreciated, looking forward to it!

Hey plamut, I was in process of doing that :). Thanks for starting this thread. I have updated my comment with sample code and the charts for memory consumption and CPU usage. I will add the pip freeze output here. In addition to that, I have tried changing the flow controls too to see if that was the case but it was a dead end. Anyways the current setting is 30. The number of messages in consideration will be 200-300K in a day, but it is real time messages so it comes all throughout the day.

apache-beam==2.25.0
avro-python3==1.9.2.1
cachetools==4.1.1
certifi==2020.11.8
cffi==1.14.4
chardet==3.0.4
crcmod==1.7
cx-Oracle==8.0.1
dicttoxml==1.7.4
dill==0.3.1.1
docopt==0.6.2
elasticsearch==7.10.0
elasticsearch-dsl==7.3.0
fastavro==1.2.0
fasteners==0.15
future==0.18.2
google-api-core==1.22.1
google-api-python-client==1.8.4
google-apitools==0.5.31
google-auth==1.23.0
google-auth-httplib2==0.0.4
google-auth-oauthlib==0.4.2
google-cloud-bigquery==2.4.0
google-cloud-bigtable==1.6.0
google-cloud-build==2.0.0
google-cloud-core==1.4.3
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-language==1.3.0
google-cloud-pubsub==1.7.0
google-cloud-spanner==1.19.1
google-cloud-storage==1.33.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-crc32c==1.0.0
google-resumable-media==1.1.0
googleapis-common-protos==1.52.0
grpc-google-iam-v1==0.12.3
grpcio==1.33.2
grpcio-gcp==0.2.2
hdfs==2.5.8
httplib2==0.18.1
idna==2.10
kafka-python==2.0.2
libcst==0.3.14
memory-profiler==0.58.0
mock==2.0.0
monotonic==1.5
mypy-extensions==0.4.3
mysql-connector-python==8.0.22
mysqlclient==2.0.1
numpy==1.19.4
oauth2client==4.1.3
oauthlib==3.1.0
pandas==1.1.4
pandas-gbq==0.14.1
pbr==5.5.1
proto-plus==1.11.0
protobuf==3.14.0
psutil==5.8.0
pyarrow==0.17.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydata-google-auth==1.1.0
pydot==1.4.1
pymongo==3.11.1
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.4
PyYAML==5.3.1
redis==3.5.3
requests==2.25.0
requests-oauthlib==1.3.0
rsa==4.6
six==1.15.0
SQLAlchemy==1.3.20
typing-extensions==3.7.4.3
typing-inspect==0.6.0
uritemplate==3.0.1
urllib3==1.26.2
xmltodict==0.12.0

Bear with the large list of packages. Hopefully, these infos will help you a bit in getting to know my setup. If you need anything else from my side do let know.

@capt2101akash
Copy link
Author

Hey @plamut I hope you got some time to see the details that I have provided. Any inputs will be really helpful. Looking forward to it.

@plamut
Copy link
Contributor

plamut commented Jan 13, 2021

@capt2101akash I did do a quick check of the sample code and it seems like a straightforward streaming pull usage.

The messages being accumulated in the global message_list should be discarded on each iteration when the reference is changed to the new empty list, but there can always be some internal structures in the client that are kept alive and causing the memory growth. It is also possible, however, that all this is normal behavior, since Python does not necessarily release all the allocated memory back to the OS (e.g. due to keeping some of the already allocated memory for future use).

I hope I can take a closer look some time next week, but until then and if the time allows you, you can perhaps experiment with the following:

  • Run an explicit import gc; gc.collect() after re-assigning message_list to a new list to clean up some unused objects.
  • Instead of using a common subscriber instance, create a new subscriber in pull_messages() and call the close() method when done with it (or use it as a context manager).
    If the cleanup triggered by future.cancel() does not correctly clean all underlying resources, this might help.
  • Run pull_messages() in a subprocess in each iteration. When a child process terminates, the OS will reclaim all of its memory.
    This might require changes to any logic in the main process that collects the results (if it's not process-and-forget), but it's an often used technique when performing memory-intensive tasks.

@capt2101akash
Copy link
Author

@plamut thanks for your suggestions. However, I have tried all of the above that you mentioned but to no avail. I will still experiment a bit more on the multiprocessing part though and will let you know my findings. In the meantime I will be looking forward to get more insights from you sometime next week.

@plamut
Copy link
Contributor

plamut commented Jan 13, 2021

Multiprocessing trick should almost certainly work, but of course if there's a leak, we should find and fix it. Could be tricky, though. :)

Edit: Oh, you tried that already? That's somewhat surprising. Could there be anything else outside of the provided sample that is consuming memory, or is the sample fully self-contained?

@plamut plamut added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed type: question Request for information or clarification. Not an issue. labels Jan 13, 2021
@plamut plamut self-assigned this Jan 13, 2021
@capt2101akash
Copy link
Author

Multiprocessing trick should almost certainly work, but of course if there's a leak, we should find and fix it. Could be tricky, though. :)

Edit: Oh, you tried that already? That's somewhat surprising. Could there be anything else outside of the provided sample that is consuming memory, or is the sample fully self-contained?

The sample is self contained and even if we just do plain subscriber client the memory is increasing. Multiprocessing way is somewhat tricky though as we need to maintain a global list. I am looking into a workaround to do that. In any case, we should really see what's causing this gradual increase so that we can fix that. Can you once try reproducing the same and see if it behaves the same way for you too or not?

@capt2101akash
Copy link
Author

capt2101akash commented Jan 13, 2021

@plamut Update:- As expected multiprocessing pathway is also a dead end, as grpc doesn't work with multiprocessing. Referenced here . I hope after you do some research we get to know more about this issue.

@plamut
Copy link
Contributor

plamut commented Jan 14, 2021

@capt2101akash Thanks for the research, although the linked issue seems a bit different - the user-provided callbacks are, by default, invoked using an internal thread pool executor, while the linked issue is about using a process pool executor for dispatching the messages (which doesn't seem to work due to the underlying grpc dependency).

What I meant was running the entire streaming pull and message processing in a child process (including the client instantiation), so that any memory used (or leaked) would be automatically reclaimed by the OS upon the child process termination. Something like the following (note: untested):

from multiprocessing import Process

...

def pull_messages():
    flow_control = pubsub_v1.types.FlowControl(max_messages=30)
    subscriber = pubsub_v1.SubscriberClient()
    future = subscriber.subscribe(subscription_name, message_ack, flow_control=flow_control)
    try:
        future.result(timeout=300)
    except TimeoutError: 
        future.cancel()


if __name__ == '__main__':
    while True:
        p = Process(target=pull_messages)
        p.start()
        print(f"Started a new child process {p.pid}")
        p.join()

It would require some adjustments to get any message processing results back to the main process, if needed, but I hope you get the idea.


BTW, a random observation - I see that FlowControl.max_messages is set to 30, but the default thread pool executor uses 10 worker threads. If the message callback does any non-trivial CPU work, only a maximum of 10 messages will be dispatched at a time, because the rest would have to wait for the worker threads to free up.

@capt2101akash
Copy link
Author

capt2101akash commented Jan 14, 2021

@plamut Yeah I got what you are saying. The same piece of code that you sent, I tried that out yesterday. The only concern here is as you said get the message processing results back to the main process. And this is where I felt the referenced issue was coming into place. Sorry if I didn't clearly specify it earlier, but here is what I meant.

  • Once we send the pull_messages in a separate process, we need a global variable to store our messages in a list. To do that I used multiprocessing.Manager().list()
  • Once we call pull_messages, there is a callback function as you have written message_ack. This callback function should ideally append the messages in the global list created in the first step.
  • The problem comes here, once we call the list.append(messages) it fails with error TypeError: can't pickle _thread.lock objects. Same error that @theacodes was getting here. As per my research this is related to multiprocessing variables not being able to used by threads. Correct me if I am wrong and this is where I was referring to the previous issue. My bad that I didn't properly state.
import multiprocessing

def message_ack(message):
    # global message_list
    message_list.append(message) #Error occurs here 
    message.ack()

def pull_messages(message_list):
    future = subscriber.subscribe(subscription_name, message_ack, flow_control=flow_control)
    try:
        future.result(timeout=30)
    except TimeoutError: 
        # logger.error(traceback.print_exc())
        future.cancel()

if __name__ == '__main__':
  ...
  flow_control = pubsub_v1.types.FlowControl(max_messages=5)
  manager = multiprocessing.Manager()
  while 1:
      message_list = manager.list()
      subscriber = pubsub_v1.SubscriberClient()
      # message_list = list()
      p1 = multiprocessing.Process(target=pull_messages, args=(message_list,))
      p1.start()
      p1.join()
      subscriber.close()

NOTE- As you can see I have changed the flow control to get only 5 messages and this is also not resulting in less memory consumption. The memory gradually grows.

I hope this clears out the misunderstanding. If you have any inputs on how we can mitigate the error related to can't pickle thread.lock objects it will be helpful. I have added the line where we get that error.
Hoping to hear back from you soon.

@plamut
Copy link
Contributor

plamut commented Jan 14, 2021

@capt2101akash Thanks for the clarification, it indeed seems we're on the same page.

From the top of my head - the messages dispatched to the call are Message instances that wrap the actual messages received from the server.

These instances contain, among other things, a reference to a queue used for asynchronously dispatching requests such as ACK, NACK, and such, but the queue itself cannot be pickled:

>>> import queue
>>> import pickle
>>> pickle.dumps(queue.Queue())
TypeError                                 Traceback (most recent call last)
<ipython-input-43-96dc0f23ad59> in <module>
----> 1 pickle.dumps(queue.Queue())

TypeError: cannot pickle '_thread.lock' object

Since all the message_ack() callback does before acknowledging a message is appending it to a list, there's probably no need to append the entire Message instance, but instead just the message contents? Something like the following:

def message_ack(message):
    message_list.append(message.data)
    message.ack()

Would that be sufficient for the post-processing the received message data accumulated in the list?

@capt2101akash
Copy link
Author

capt2101akash commented Jan 17, 2021

@plamut I missed that part, using message.data does indeed work with multiprocessing(I missed that part). However, when I am trying to do my processing inside the callback(other than just message.ack()) it fails with segmentation fault. In addition to that, it behaves very sporadically and fails with segmentation fault quite frequently even though if we just do message.ack() in callback function. I wonder if it is related to the issue I mentioned, but I am again feeling at the dead end as nothing seems to working out, if at all we find out the specifics into why the subscriber thread is not releasing the memory. If you have encountered the segmentation faults in multiprocessing, do let me know what you did for mitigating it. Here is what grpc guys have said about using multiprocessing with grpc(backend of subscriber client). Hence, I am wondering how can we utilize this step using the API only?

To address to your point that would that be sufficient for post-processing? Yes. But in the post processing if something goes bad like the issue mentioned above, the messages get lost and we won't be able to get them back. So ideally, I we should do all the processing first and then acknowledge the messages. If that makes sense.

Looking forward for your reply.

@plamut
Copy link
Contributor

plamut commented Jan 18, 2021

@capt2101akash

But in the post processing if something goes bad like the issue mentioned above, the messages get lost and we won't be able to get them back. So ideally, I we should do all the processing first and then acknowledge the messages. If that makes sense.

That makes perfect sense. The backend will not try to re-send a message after it has received an ACK (within the message's acknowledge deadline). Applications should thus send an ACK only after they have processed a message. If processing means more than merely appending a message to a list (and then perhaps storing it somewhere), ack() should be postponed until then.

In principle, one could append a message and its ACK ID to the list and then manually send an acknowledge requests for the processed messages, but that kind of beats the purpose of the client that relieves the programmer from tasks such as retries, automatic lease management, etc., thus I wouldn't go down that route.

I can't comment on the segfaults in the callback without the code, but if they frequently happen even when just calling message.ack(), then it indeed appears multiprocessing is a dead end, unfortunately. I'll try to reproduce segfaults, too, and report back.


Random though - is there a specific reason for running the streaming pull for a fixed amount of time, cancelling it, and then starting it again? Was that just for the purpose of profiling?

Would your use case allow for moving this loop out of the subscriber script itself? By that I mean running it for a limited amount of time, processing the message stream, and then letting it terminate. After that (or even a parallel) another subscriber is fired up to continue the processing. Kind of the same idea as before, but without a child process.

(I do realize this is not ideal, but mostly trying to come up with a reasonable workaround for the time being)

@capt2101akash
Copy link
Author

@plamut per your point regarding running the streaming pull for a fixed amount of time and cancelling it has two uses:-

  • As you mentioned, I did it for profiling.
  • Secondly, to release resources in a continuous running thread was my first thing to cross off as it will be continuously running and the behavior we are facing here would have been there too. I chose this way to make sure that once my future gets cancelled, I release all resources to the os.

To your suggestion, are you suggesting that we should have a cronjob kinda thing (I am also thinking that). If that is the case, then yes it will be our last resort (not ideal for real time cases).
To help you in reproducing in the seg fault issue, try to do some complex computation in the callback for the processing of messages, then send an ack. This will help to get to that issue (in my case I was querying a sql database by using a key from json as parameter).

@plamut
Copy link
Contributor

plamut commented Jan 19, 2021

@capt2101akash

To your suggestion, are you suggesting that we should have a cronjob kinda thing (I am also thinking that). If that is the case, then yes it will be our last resort (not ideal for real time cases).

If nothing else works, it might indeed be the most reasonable workaround for the time being, yes. And the restart period can be "long" (hours or more), depending on how slow/fast the memory leaks with the real load. Furthermore, there doesn't have to be any downtime if another independent subscriber is launched, say, a minute or so before the currently running one is about to terminate.

I'll post more if I get a successful segfault reproduction, although that might not even be necessary, considering that multiprocessing appears to be a rabbit hole...

@plamut
Copy link
Contributor

plamut commented Jan 20, 2021

After testing a good number of combinations - Python version, grpcio version, test script flavour - it appears that there indeed exists a memory leak, but to make it more apparent, the number of messages posted to a topic should be high.

I used the initial code sample as a base, ran the streaming pull in the pull_messages() for 10 iterations lasting ~30 seconds each (the timeout passed to the streaming pull future), and in parallel posted 50.000 messages to the topic in quick succession. Each message was a random ASCII string of ~4100 characters.

memusage_base_example_py38_grpcio1 34 1

Creating a subscriber inside the pull_messages() each time and closing it at the end (used it as a context manager) did not really help, the memory consumption pattern looked similar:
memusage_client_in_function_py38_grpcio1 34 1

However, if running pull_messages() in a a child process, the memory consumption was stable:
memusage_client_in_subprocess_py3 8_grpcio1 34 1

I was also not able to reproduce any segfaults despite doing non-trivial work in the message callback (queried a simple database), although absence of evidence is not an evidence of absence, of course. But if I understand correctly, that should be easily reproducible, @capt2101akash ?

It might be worth noting that I created, ran, and closed a subscriber instance inside the pull_messages() function, i.e. after the fork.

Update

Managed to reproduce a segfault! It happens if subscriber instance is created before running pull_messages() in a subprocess, but not if subscriber is created inside pull_messages().

@capt2101akash As a sanity check, can you confirm these findings? I noticed that your earlier multiprocessing example created the subscriber in the main process, but did you also try with instantiating it in a subprocess?

It would be worth giving the multiprocessing workaround another shot, if it works for you, too.

@capt2101akash
Copy link
Author

@plamut I can try creating subscriber in the sub process and revert back to you my findings. However, I think we can 100% agree to the point that there is a memory leak that has to be addressed, as running streaming pull in subprocess is just a workaround that we are using to overcome this issue. I would be really glad if we can pin point the module/functionality which is causing this leak and fix this.
In the meantime I will give me results back to you(I don't recall if I tried that or not) but I will try it again and let you know.
Thanks for your time man! Really appreciate the help!

@plamut
Copy link
Contributor

plamut commented Jan 20, 2021

@capt2101akash The leak should definitely be fixed, indeed, but it's always good to have a reasonable workaround at hand - fingers crossed that it will work for you! 🤞

@capt2101akash
Copy link
Author

Just to give you a good news. It did work 😊. I somehow never tried this approach 😅 my bad.
I will be monitoring my code for a couple of days to see how it is performing or are there any issues that come up down the line. However, let's see how we can fix the memory leak. 👍 Thanks again

@plamut
Copy link
Contributor

plamut commented Feb 1, 2021

I would like to give an update on the most recent findings.

I tried running the streaming pull in a single process and for longer periods of time (15+ minutes as opposed to only a few minutes). I tried both with a plain callback logic, i.e. only calling message.ack and with the version that appends the message to a global list (the list is re-created on each streaming pull iteration, each iteration took ~60 seconds).

In all runs I made sure that the subscription always had enough messages at hand to keep the subscriber as busy as possible (so that flow control had to kick in).

15 x 60 seconds, no append

Expand to view graphs

run_15_minutes_no_append

run_15_minutes_no_append_2

Initially, the memory consumption slowly increases, but eventually flattens out. This hints that any internal streaming pull operations probably do not leak memory.

15 x 60 seconds, appending to a list

Expand to view graphs

run_15_minutes_append

run_15_minutes_append_2

run_15_minutes_append_3

Since received messages are temporarily stored in a temporary global list, the memory consumption increases faster. However, the growth is not linear - there are points when some of the memory is released back to the OS. It appears that after the initial increase, the memory consumption actually remains bounded.

If we only looked at the first few minutes, it might indeed seem that there is a memory leak, but monitoring the process over a longer time span does not support that claim.

30 x 60 seconds, appending to a list

Expand to view graphs

run_30_minutes_append

Since the second 15-minute graph was a bit inconclusive and somewhat differed from the other two, I did yet another run for full 30 minutes. The graph confirmed that the memory consumption is, in fact, bounded.

The most likely reason for seemingly "leaking" memory at the beginning is CPython's own memory management. When an object such as a list of messages goes out of scope, the memory is not immediately released back to the OS. Instead, the interpreter keeps it for future use, as it assumes that more objects will be created, and re-using the memory from an already allocated memory pool is faster than constantly freeing the memory to the OS and then allocating it back.


@capt2101akash
The big factor in memory consumption is whether the messages are stored in a list or not, but even if they are, the graphs suggest that the memory does not seem to be leaking, at least not by the subscriber client itself. Could it be that the memory consumption increases due to the application storing the results of the processing somewhere without regularly releasing these results?

I tried to find any zombie Message (or other) instances lurking in the dark and forgotten corners of memory, but couldn't really find any, they seem to be garbage collected just fine. I also checked the streaming pull code for any queues, lists, etc. that could grow indefinitely.

Initially, one possible candidate was the histogram that tracks how long does it take to acknowledge messages on average, but the the histogram size is bounded. The add() method maps all ACK times onto the interval [10, 600], and the ACK times tracked are rounded to integer values, meaning that the histogram stats are bounded - they can only contain a maximum of 591 distinct values.

@capt2101akash
Copy link
Author

@plamut I have also observed this when the frequency of messages coming in the topic is not real time or to be precise not that frequent. However, if you have seen my previous comments wherein I have posted the snippet, where I also tried just acknowledging the messages without any append and with a continuous stream of messages flowing in at the rate of 100 messages/sec, the graph shows gradual increase. However, as you have mentioned it does release some memory in between but overall it increase in a period of a day or two.
I remember in the previous comment too you have mentioned that you saw a leak with just using the initial base file.
In any case however, my point is even if I don't append anything to the global list, there is a leak.
I would say try with large number of messages coming every single minute or so and then process them with simple basic code to just acknowledge the message as you have done before. I am pretty sure that you will see a leak happening as you did before.

Just a quick update to where I currently stand now for my application so that others if they are facing any issues can correlate to this - I ran my subscription client in a multiprocessing environment, which after completion releases it's memory back to OS and I get what I required. Just a thing to keep in mind though is that one might have to adjust their processing logic in order to utilise this solution effectively.

@plamut
Copy link
Contributor

plamut commented Feb 2, 2021

@capt2101akash Thanks for the reply, and I'm glad that the multiprocessing workaround is working well.

However, if you have seen my previous comments wherein I have posted the snippet, where I also tried just acknowledging the messages without any append and with a continuous stream of messages flowing in at the rate of 100 messages/sec, the graph shows gradual increase. However, as you have mentioned it does release some memory in between but overall it increase in a period of a day or two.

FWIW, in the test that I conducted the subscriber running on my laptop received and processed (i.e. appended to a list) somewhere around 6500 messages / minute, which is comparable to the load in your test (the message stream was constantly high). I did not run it for the entire day, though.

I remember in the previous comment too you have mentioned that you saw a leak with just using the initial base file.

Yeah, that's what it looked like, although that test only ran for 5 minutes or so. Running the same test longer later revealed that the memory consumption actually stabilizes eventually.

Based on your reports, it would probably still make sense to set up a small cloud app with a busy publisher and a simple ACK-only subscriber, and let it run for at least 24 hours. I'll try to do that when I get back to this.

@lidizheng
Copy link

Hi @plamut, it might help generate a memory profile using memory_profiler. It describes all living objects in Python space. If we do observe a large number of some class, it indicate the leak happen in Python space. Otherwise, it might caused by C/C++ objects leaking. Here is one memory leak fix in gRPC: grpc/grpc#22123 (comment).

@plamut
Copy link
Contributor

plamut commented Jul 22, 2021

FWIW, the reason why this was marked as blocked is that the feedback time can be long. Even running and profiling a sample app locally for half an hour might not always be enough to definitely conclude whether a leak is present or is it just a short-term noise (some of the memory usage graphs produced by the profiler turned out to be misleading).

(the resource consumption graphs from GCP are more telling, it might make sense to profile there and then inspect the object graph inside the container)

@meredithslota meredithslota added priority: p3 Desirable enhancement or fix. May not be included in next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. labels Aug 28, 2021
@synackSA
Copy link

synackSA commented Oct 6, 2021

Just wondering what the current status of this is. We're also experiencing a memory leak issue, this seems to persist even if I fire up the box via docker-compose and use the pubsub emulator on my localhost. If I monitor the box resources with top, the overall memory usage goes up, but the individual threads for Python seem to stay the same.

Some Examples:

After processing 10k+ messages, the resources for the box and processes looks like the following:
Screen Shot 2021-10-06 at 11 52 28 AM

We then processed 5k+ messages and the memory for each process stayed the same, but the overall box memory consumption had increased (from 19.4% to 19.7%):
Screen Shot 2021-10-06 at 1 07 44 PM

We then processed another 7.5k messages. In this case, we see a small increase in memory PID 23 (105136 -> 105532), but nothing that would account for a increase from 19.7% to 20.1%
Screen Shot 2021-10-06 at 1 33 55 PM

I'm currently in the process of trying to use mprof to profile and graph too, so we'll see what results we get from that.

@synackSA
Copy link

Just want to ping this again. I'm struggling to hunt down what might be the cause of this issue and we're literally having to restart our Pub/Sub microservices multiple times a week as the memory consumption just climbs and climbs.

@plamut
Copy link
Contributor

plamut commented Oct 16, 2021

@synackSA Sorry for not responding earlier, I was away. As you have figured out by yourself in the meantime, this one is tricky.

When testing locally, it was hard to distinguish whether the memory consumption graphs actually displayed a leak (or a fix for it), or just a short-term noise. Here, "short-term" could be 30 minutes or even more, meaning that the feedback is very slow, and the graphs from multiple runs were often inconsistent.

Running the streaming pull from a busy subscription for several hours did show a probable leak, but again the memory increase was relatively small, although noticeable.

I also tried hunting it down with tracemalloc, but nothing much popped out. It is thus believed that the leak does not occur at the Python level, but instead somewhere in C - the grpc dependency is a suspect here. An additional data point here is that a similar long standing issue is known for Firestore streaming, which supports the theory that the leak is probably not in the library itself, but instead in one of the dependencies.

Anyhow, can I just ask if you have already tried the workaround that reportedly works? The trick is to run the streaming pool in a subprocess for a fixed amount of time. 30 minutes could be a good choice, as the backend already automatically terminates the stream every half an hour, in which case the client automatically re-connects.

By running the stream in a subprocess, any memory leaked will be automatically returned to the OS when the subprocess terminates.

@synackSA
Copy link

Just want to follow up on this and see if there's been any movement on this? We've implemented the design where we spawn another process, which dies after a certain amount of time, get's cleaned up and then spawn a new one. However, I'd like to keep tabs on this issue to see what's happening.

@plamut plamut removed their assignment Feb 15, 2022
@gberth
Copy link

gberth commented Feb 17, 2022

Don't know if this helps. We experience the same problems with long running application, with quite heavy pubsub and bigtable usage. This example, just doing nack resulting in resend of message, indefinitaly. Htop output at start (as quick as possible) and after 1 million, 2 million, 3 million and 7 million messages.

`from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()

subscription_path = subscriber.subscription_path(
"test",
"test-sub",
)
flow_control = pubsub_v1.types.FlowControl(max_messages=3000)

ct = 0
def message_nack(message):
global ct
ct += 1
if ct % 1000 == 0:
print(ct)
message.nack()

def pull_messages():
future = subscriber.subscribe(subscription_path, message_nack, flow_control=flow_control)
try:
future.result(timeout=3000000)
except TimeoutError:
# logger.error(traceback.print_exc())
future.cancel()

if name == 'main':
while 1:
pull_messages()`

Start
Screenshot from 2022-02-17 14-09-23

1 million
Screenshot from 2022-02-17 14-31-46
2. million
Screenshot from 2022-02-17 14-53-07
3. million
Screenshot from 2022-02-17 15-12-51
7. mill
Screenshot from 2022-02-17 16-27-10

Ubuntu 20.04.3 LTS / python 3.8
google-api-core==2.0.1
google-auth==2.0.2
google-cloud-bigquery==2.26.0
google-cloud-bigtable==2.4.0
google-cloud-core==2.0.0
google-cloud-datastore==2.1.6
google-cloud-pubsub==2.9.0
google-cloud-storage==1.42.0
google-cloud-spanner==3.11.1
google-crc32c==1.1.2
google-resumable-media==2.0.2
googleapis-common-protos==1.53.0
grpc-google-iam-v1==0.12.3
grpcio==1.43.0

rgds
Gunnar Berthelsen

@acocuzzo
Copy link
Contributor

Thanks very much @gberth

@blacar
Copy link

blacar commented Mar 10, 2022

This problem is hitting me as well.

I have a long-running (10 secs) callback process potentially using some amount of memory, but that's not the problem, the service is capable of processing several messages (I configured consumer to go 1-by-1 coz I don't need performance and want to maximize stability) but still I see the memory leak pattern ending in service crash.

Tried the multiprocessing approach and it didn't worked for me. Im in a containerized context and I had been unable to force SO to free that memory.

any other suggestion or some deadline for a fix??

BTW
I have tried python 3.8, and 3.10 with same results
and using google-cloud-pubsub==2.9.0

@blacar
Copy link

blacar commented Mar 10, 2022

BTW I dont get it ... this is a memory leak on an official Google library that has been alive for more than 1 year (?!)
Is python a pariah on GCP????

@gberth
Copy link

gberth commented Mar 14, 2022

Without any evidence, it is my opinion/experience that it also comes to the use of Bigtable from python. So I suspect it has something to do with more low-level code than pubsub itself. I agree that the absence of priority and development is frustrating. It consumes a lot of time that could/should have been used on business development

@manuelfedele
Copy link

manuelfedele commented Mar 15, 2022

@blacar

Sadly, this is still in backlog.
It is unsustainable (crash every 5-6 messages in our case) and the multiprocessing trick was slowing down our pipeline.
It is reproducible. But in backlog since more than one year now.

@gberth
Copy link

gberth commented Mar 15, 2022

That is strange. We are processing approximately 1500-2000 messages / sec from three subscriptions. This is a cached state app, and the temporary solution was to do a regularly controlled shutdown of the app 3 times during 24 hours. But, we are having a good amount of memory :-)

@manuelfedele
Copy link

Well that's not strange. What we noticed is that the whole memory taken by the process is not released. In our case, we had 1024Mb of ram. Every request, took about 200Mb of ram. Even if sent sequentially (waiting 10 seconds between the messages), after 5-6 messages the container was crashing for OOM error.

Maybe your operation is not memory intensive.

Sadly, restarting every 5 or 6 messages was not acceptable for us.

@gberth
Copy link

gberth commented Apr 6, 2022

Anything at all going on with this?

@gberth
Copy link

gberth commented Aug 7, 2022

A bit frustrating that this never gets priority. And why is it blocked?

One observation, it seems that the memory usage increases faster proportional with the (time or occurrence of) reaching of max-messages.

rgds
Gunnar

@manuelfedele
Copy link

Could this be related to grpc/grpc#28747 ?

@acocuzzo
Copy link
Contributor

acocuzzo commented Nov 10, 2022

Following up with grpc folks to see when this fix will be in a grpc release (currently the change is checked in, but not included in the latest release). As soon as it is in a release, I will update this issue with the fixed grpc version.

@acocuzzo
Copy link
Contributor

According to grpc folks, this change will be included in grpc release: v1.51.0, which will be out in approx 1-2 weeks.

@acocuzzo
Copy link
Contributor

The pre-release v1.15.0-pre1 contains the fix:
https://github.com/grpc/grpc/releases/tag/v1.51.0-pre1

@acocuzzo acocuzzo assigned acocuzzo and unassigned pradn and acocuzzo Nov 21, 2022
@acocuzzo
Copy link
Contributor

The release fix is out: https://github.com/grpc/grpc/releases/tag/v1.51.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. status: blocked Resolving the issue is dependent on other work. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests