Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tests for the itertools docs rough equivalents #120509

Merged
merged 3 commits into from
Jun 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 315 additions & 18 deletions Lib/test/test_itertools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,27 +1587,169 @@ def batched_recipe(iterable, n):
self.assertEqual(r1, r2)
self.assertEqual(e1, e2)


def test_groupby_recipe(self):

# Begin groupby() recipe #######################################

def groupby(iterable, key=None):
# [k for k, g in groupby('AAAABBBCCDAABBB')] → A B C D A B
# [list(g) for k, g in groupby('AAAABBBCCD')] → AAAA BBB CC D

keyfunc = (lambda x: x) if key is None else key
iterator = iter(iterable)
exhausted = False

def _grouper(target_key):
nonlocal curr_value, curr_key, exhausted
yield curr_value
for curr_value in iterator:
curr_key = keyfunc(curr_value)
if curr_key != target_key:
return
yield curr_value
exhausted = True

try:
curr_value = next(iterator)
except StopIteration:
return
curr_key = keyfunc(curr_value)

while not exhausted:
target_key = curr_key
curr_group = _grouper(target_key)
yield curr_key, curr_group
if curr_key == target_key:
for _ in curr_group:
pass

# End groupby() recipe #########################################

# Check whether it accepts arguments correctly
self.assertEqual([], list(groupby([])))
self.assertEqual([], list(groupby([], key=id)))
self.assertRaises(TypeError, list, groupby('abc', []))
if False:
# Test not applicable to the recipe
self.assertRaises(TypeError, list, groupby('abc', None))
self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10)

# Check normal input
s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22),
(2,15,22), (3,16,23), (3,17,23)]
dup = []
for k, g in groupby(s, lambda r:r[0]):
for elem in g:
self.assertEqual(k, elem[0])
dup.append(elem)
self.assertEqual(s, dup)

# Check nested case
dup = []
for k, g in groupby(s, testR):
for ik, ig in groupby(g, testR2):
for elem in ig:
self.assertEqual(k, elem[0])
self.assertEqual(ik, elem[2])
dup.append(elem)
self.assertEqual(s, dup)

# Check case where inner iterator is not used
keys = [k for k, g in groupby(s, testR)]
expectedkeys = set([r[0] for r in s])
self.assertEqual(set(keys), expectedkeys)
self.assertEqual(len(keys), len(expectedkeys))

# Check case where inner iterator is used after advancing the groupby
# iterator
s = list(zip('AABBBAAAA', range(9)))
it = groupby(s, testR)
_, g1 = next(it)
_, g2 = next(it)
_, g3 = next(it)
self.assertEqual(list(g1), [])
self.assertEqual(list(g2), [])
self.assertEqual(next(g3), ('A', 5))
list(it) # exhaust the groupby iterator
self.assertEqual(list(g3), [])

# Exercise pipes and filters style
s = 'abracadabra'
# sort s | uniq
r = [k for k, g in groupby(sorted(s))]
self.assertEqual(r, ['a', 'b', 'c', 'd', 'r'])
# sort s | uniq -d
r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))]
self.assertEqual(r, ['a', 'b', 'r'])
# sort s | uniq -c
r = [(len(list(g)), k) for k, g in groupby(sorted(s))]
self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')])
# sort s | uniq -c | sort -rn | head -3
r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3]
self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')])

# iter.__next__ failure
class ExpectedError(Exception):
pass
def delayed_raise(n=0):
for i in range(n):
yield 'yo'
raise ExpectedError
def gulp(iterable, keyp=None, func=list):
return [func(g) for k, g in groupby(iterable, keyp)]

# iter.__next__ failure on outer object
self.assertRaises(ExpectedError, gulp, delayed_raise(0))
# iter.__next__ failure on inner object
self.assertRaises(ExpectedError, gulp, delayed_raise(1))

# __eq__ failure
class DummyCmp:
def __eq__(self, dst):
raise ExpectedError
s = [DummyCmp(), DummyCmp(), None]

# __eq__ failure on outer object
self.assertRaises(ExpectedError, gulp, s, func=id)
# __eq__ failure on inner object
self.assertRaises(ExpectedError, gulp, s)

# keyfunc failure
def keyfunc(obj):
if keyfunc.skip > 0:
keyfunc.skip -= 1
return obj
else:
raise ExpectedError

# keyfunc failure on outer object
keyfunc.skip = 0
self.assertRaises(ExpectedError, gulp, [None], keyfunc)
keyfunc.skip = 1
self.assertRaises(ExpectedError, gulp, [None, None], keyfunc)


@staticmethod
def islice(iterable, *args):
# islice('ABCDEFG', 2) → A B
# islice('ABCDEFG', 2, 4) → C D
# islice('ABCDEFG', 2, None) → C D E F G
# islice('ABCDEFG', 0, None, 2) → A C E G

s = slice(*args)
start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1
it = iter(range(start, stop, step))
try:
nexti = next(it)
except StopIteration:
# Consume *iterable* up to the *start* position.
for i, element in zip(range(start), iterable):
pass
return
try:
for i, element in enumerate(iterable):
if i == nexti:
yield element
nexti = next(it)
except StopIteration:
# Consume to *stop*.
for i, element in zip(range(i + 1, stop), iterable):
pass
start = 0 if s.start is None else s.start
stop = s.stop
step = 1 if s.step is None else s.step
if start < 0 or (stop is not None and stop < 0) or step <= 0:
raise ValueError

indices = count() if stop is None else range(max(start, stop))
next_i = start
for i, element in zip(indices, iterable):
if i == next_i:
yield element
next_i += step

def test_islice_recipe(self):
self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB'))
Expand All @@ -1627,6 +1769,161 @@ def test_islice_recipe(self):
self.assertEqual(next(c), 3)


def test_tee_recipe(self):

# Begin tee() recipe ###########################################

def tee(iterable, n=2):
iterator = iter(iterable)
shared_link = [None, None]
return tuple(_tee(iterator, shared_link) for _ in range(n))

def _tee(iterator, link):
try:
while True:
if link[1] is None:
link[0] = next(iterator)
link[1] = [None, None]
value, link = link
yield value
except StopIteration:
return

# End tee() recipe #############################################

n = 200

a, b = tee([]) # test empty iterator
self.assertEqual(list(a), [])
self.assertEqual(list(b), [])

a, b = tee(irange(n)) # test 100% interleaved
self.assertEqual(lzip(a,b), lzip(range(n), range(n)))

a, b = tee(irange(n)) # test 0% interleaved
self.assertEqual(list(a), list(range(n)))
self.assertEqual(list(b), list(range(n)))

a, b = tee(irange(n)) # test dealloc of leading iterator
for i in range(100):
self.assertEqual(next(a), i)
del a
self.assertEqual(list(b), list(range(n)))

a, b = tee(irange(n)) # test dealloc of trailing iterator
for i in range(100):
self.assertEqual(next(a), i)
del b
self.assertEqual(list(a), list(range(100, n)))

for j in range(5): # test randomly interleaved
order = [0]*n + [1]*n
random.shuffle(order)
lists = ([], [])
its = tee(irange(n))
for i in order:
value = next(its[i])
lists[i].append(value)
self.assertEqual(lists[0], list(range(n)))
self.assertEqual(lists[1], list(range(n)))

# test argument format checking
self.assertRaises(TypeError, tee)
self.assertRaises(TypeError, tee, 3)
self.assertRaises(TypeError, tee, [1,2], 'x')
self.assertRaises(TypeError, tee, [1,2], 3, 'x')

# Tests not applicable to the tee() recipe
if False:
# tee object should be instantiable
a, b = tee('abc')
c = type(a)('def')
self.assertEqual(list(c), list('def'))

# test long-lagged and multi-way split
a, b, c = tee(range(2000), 3)
for i in range(100):
self.assertEqual(next(a), i)
self.assertEqual(list(b), list(range(2000)))
self.assertEqual([next(c), next(c)], list(range(2)))
self.assertEqual(list(a), list(range(100,2000)))
self.assertEqual(list(c), list(range(2,2000)))

# Tests not applicable to the tee() recipe
if False:
# test invalid values of n
self.assertRaises(TypeError, tee, 'abc', 'invalid')
self.assertRaises(ValueError, tee, [], -1)

for n in range(5):
result = tee('abc', n)
self.assertEqual(type(result), tuple)
self.assertEqual(len(result), n)
self.assertEqual([list(x) for x in result], [list('abc')]*n)


# Tests not applicable to the tee() recipe
if False:
# tee pass-through to copyable iterator
a, b = tee('abc')
c, d = tee(a)
self.assertTrue(a is c)

# test tee_new
t1, t2 = tee('abc')
tnew = type(t1)
self.assertRaises(TypeError, tnew)
self.assertRaises(TypeError, tnew, 10)
t3 = tnew(t1)
self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc'))

# test that tee objects are weak referencable
a, b = tee(range(10))
p = weakref.proxy(a)
self.assertEqual(getattr(p, '__class__'), type(b))
del a
gc.collect() # For PyPy or other GCs.
self.assertRaises(ReferenceError, getattr, p, '__class__')

ans = list('abc')
long_ans = list(range(10000))

# Tests not applicable to the tee() recipe
if False:
# check copy
a, b = tee('abc')
self.assertEqual(list(copy.copy(a)), ans)
self.assertEqual(list(copy.copy(b)), ans)
a, b = tee(list(range(10000)))
self.assertEqual(list(copy.copy(a)), long_ans)
self.assertEqual(list(copy.copy(b)), long_ans)

# check partially consumed copy
a, b = tee('abc')
take(2, a)
take(1, b)
self.assertEqual(list(copy.copy(a)), ans[2:])
self.assertEqual(list(copy.copy(b)), ans[1:])
self.assertEqual(list(a), ans[2:])
self.assertEqual(list(b), ans[1:])
a, b = tee(range(10000))
take(100, a)
take(60, b)
self.assertEqual(list(copy.copy(a)), long_ans[100:])
self.assertEqual(list(copy.copy(b)), long_ans[60:])
self.assertEqual(list(a), long_ans[100:])
self.assertEqual(list(b), long_ans[60:])

# Issue 13454: Crash when deleting backward iterator from tee()
forward, backward = tee(repeat(None, 2000)) # 20000000
try:
any(forward) # exhaust the iterator
del backward
except:
del forward, backward
raise


class TestGC(unittest.TestCase):

def makecycle(self, iterator, container):
Expand Down
Loading