-
Notifications
You must be signed in to change notification settings - Fork 132
/
largecolumn_test.py
79 lines (66 loc) · 3.65 KB
/
largecolumn_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pytest
import re
import logging
from dtest import Tester
since = pytest.mark.since
logger = logging.getLogger(__name__)
@since('2.2')
class TestLargeColumn(Tester):
"""
Check that inserting and reading large columns to the database doesn't cause off heap memory usage
that is proportional to the size of the memory read/written.
"""
def stress_with_col_size(self, cluster, node, size):
size = str(size)
node.stress(['write', 'n=5', "no-warmup", "cl=ALL", "-pop", "seq=1...5", "-schema", "replication(factor=2)", "-col", "n=fixed(1)", "size=fixed(" + size + ")", "-rate", "threads=1"])
node.stress(['read', 'n=5', "no-warmup", "cl=ALL", "-pop", "seq=1...5", "-schema", "replication(factor=2)", "-col", "n=fixed(1)", "size=fixed(" + size + ")", "-rate", "threads=1"])
def directbytes(self, node):
def is_number(s):
try:
float(s)
return True
except ValueError:
return False
output, err, _ = node.nodetool("gcstats")
logger.debug(output)
output = output.split("\n")
assert re.search('Interval', output[0].strip())
fields = output[1].split()
assert len(fields) >= 6, "Expected output from nodetool gcstats has at least six fields. However >= fields is: {}".format(fields)
for field in fields:
assert is_number(field.strip()) or field == 'NaN', "Expected numeric from fields from nodetool gcstats. However, field.strip() is: {}".format(field.strip())
return fields[6]
def test_cleanup(self):
"""
@jira_ticket CASSANDRA-8670
"""
cluster = self.cluster
# Commit log segment size needs to increase for the database to be willing to accept columns that large
# internode compression is disabled because the regression being tested occurs in NIO buffer pooling without compression
configuration = {'commitlog_segment_size_in_mb': 128, 'internode_compression': 'none'}
if cluster.version() >= '4.0':
configuration['internode_max_message_size_in_bytes'] = 128 * 1024 * 1024
if cluster.version() >= '4.1':
configuration['native_transport_max_request_data_in_flight'] = '64MiB'
configuration['native_transport_max_request_data_in_flight_per_ip'] = '64MiB'
cluster.set_configuration_options(configuration)
# Have Netty allocate memory on heap so it is clear if memory used for large columns is related to intracluster messaging
cluster.populate(2).start(jvm_args=[" -Dcassandra.netty_use_heap_allocator=true "])
node1, node2 = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Before stress {0}".format(self.directbytes(node1)))
logger.debug("Running stress")
# Run the full stack to see how much memory is utilized for "small" columns
self.stress_with_col_size(cluster, node1, 1)
beforeStress = self.directbytes(node1)
logger.debug("Ran stress once {0}".format(beforeStress))
# Now run the full stack to see how much memory is utilized for "large" columns
LARGE_COLUMN_SIZE = 1024 * 1024 * 63
self.stress_with_col_size(cluster, node1, LARGE_COLUMN_SIZE)
output, err, _ = node1.nodetool("gcstats")
afterStress = self.directbytes(node1)
logger.debug("After stress {0}".format(afterStress))
# Any growth in memory usage should not be proportional column size. Really almost no memory should be used
# since Netty was instructed to use a heap allocator
diff = int(afterStress) - int(beforeStress)
assert diff < LARGE_COLUMN_SIZE