Skip to content

Commit

Permalink
support year field (#80)
Browse files Browse the repository at this point in the history
Co-authored-by: zhouyizhen <zhouyizhen@metrodata.tech>
  • Loading branch information
JabberWocky-22 and zhouyizhen authored Jul 23, 2024
1 parent afbd23b commit a5bf116
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 13 deletions.
13 changes: 13 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ You can also use seconds as first field::
>>> itr = croniter('15,25 * * * * *', base, second_at_beginning=True)


About year
===========
Croniter also support year field.
Year presents at the seventh field, which is after second repetition.
The range of year field is from 1970 to 2099.
To ignore second repetition, simply set second to ``0`` or any other const::

>>> base = datetime(2012, 4, 6, 2, 6, 59)
>>> itr = croniter('0 0 1 1 * 0 2020/2', base)
>>> itr.get_next(datetime) # 2020 1/1 0:0:0
>>> itr.get_next(datetime) # 2022 1/1 0:0:0
>>> itr.get_next(datetime) # 2024 1/1 0:0:0

Support for start_time shifts
==============================
See https://github.com/kiorky/croniter/pull/76,
Expand Down
80 changes: 67 additions & 13 deletions src/croniter/croniter.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class croniter(object):
(1, 31),
(1, 12),
(0, 7),
(0, 59)
(0, 59),
(1970, 2099)
)
DAYS = (
31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
Expand All @@ -152,7 +153,9 @@ class croniter(object):
copy.deepcopy(M_ALPHAS),
# 4: dow
copy.deepcopy(DOW_ALPHAS),
# command/user
# 5: second
{},
# 6: year
{}
)

Expand All @@ -163,6 +166,7 @@ class croniter(object):
{0: 1},
{7: 0},
{},
{}
)

LEN_MEANS_ALL = (
Expand All @@ -171,10 +175,11 @@ class croniter(object):
31,
12,
7,
60
60,
130
)

bad_length = 'Exactly 5 or 6 columns has to be specified for iterator ' \
bad_length = 'Exactly 5, 6 or 7 columns has to be specified for iterator ' \
'expression.'

def __init__(self, expr_format, start_time=None, ret_type=float,
Expand Down Expand Up @@ -417,6 +422,25 @@ def _calc(self, now, expanded, nth_weekday_of_month, is_prev):
current_year = now.year
DAYS = self.DAYS

def proc_year(d):
if len(expanded) == YEAR_CRON_LEN:
try:
expanded[YEAR_FIELD].index("*")
except ValueError:
# use None as range_val to indicate no loop
diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
if diff_year is None:
return None, d
elif diff_year != 0:
if is_prev:
d += relativedelta(years=diff_year, month=12, day=31,
hour=23, minute=59, second=59)
else:
d += relativedelta(years=diff_year, month=1, day=1,
hour=0, minute=0, second=0)
return True, d
return False, d

def proc_month(d):
try:
expanded[MONTH_FIELD].index('*')
Expand Down Expand Up @@ -577,7 +601,8 @@ def proc_second(d):
d += relativedelta(second=0)
return False, d

procs = [proc_month,
procs = [proc_year,
proc_month,
proc_day_of_month,
(proc_day_of_week_nth if nth_weekday_of_month
else proc_day_of_week),
Expand All @@ -587,12 +612,20 @@ def proc_second(d):

while abs(year - current_year) <= self._max_years_between_matches:
next = False
stop = False
for proc in procs:
(changed, dst) = proc(dst)
# `None` can be set mostly for year processing
# so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
if changed is None:
stop = True
break
if changed:
month, year = dst.month, dst.year
next = True
break
if stop:
break
if next:
continue
return self._datetime_to_timestamp(dst.replace(microsecond=0))
Expand All @@ -616,23 +649,43 @@ def _get_prev_nearest(self, x, to_check):
return small[0]

def _get_next_nearest_diff(self, x, to_check, range_val):
"""
`range_val` is the range of a field.
If no available time, we can move to next loop(like next month).
`range_val` can also be set to `None` to indicate that there is no loop.
( Currently, should only used for `year` field )
"""
for i, d in enumerate(to_check):
if d == "l":
if d == "l" and range_val is not None:
# if 'l' then it is the last day of month
# => its value of range_val
d = range_val
if d >= x:
return d - x
# When range_val is None and x not exists in to_check,
# `None` will be returned to suggest no more available time
if range_val is None:
return None
return to_check[0] - x + range_val

def _get_prev_nearest_diff(self, x, to_check, range_val):
"""
`range_val` is the range of a field.
If no available time, we can move to previous loop(like previous month).
Range_val can also be set to `None` to indicate that there is no loop.
( Currently should only used for `year` field )
"""
candidates = to_check[:]
candidates.reverse()
for d in candidates:
if d != 'l' and d <= x:
return d - x
if 'l' in candidates:
return -x
# When range_val is None and x not exists in to_check,
# `None` will be returned to suggest no more available time
if range_val is None:
return None
candidate = candidates[0]
for c in candidates:
# fixed: c < range_val
Expand Down Expand Up @@ -694,9 +747,9 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time
if len(expressions) not in VALID_LEN_EXPRESSION:
raise CroniterBadCronError(cls.bad_length)

if len(expressions) == 6 and second_at_beginning:
# move second to last to process by same logical
expressions.append(expressions.pop(0))
if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
# move second to it's own(6th) field to process by same logical
expressions.insert(SECOND_FIELD, expressions.pop(0))

expanded = []
nth_weekday_of_month = {}
Expand Down Expand Up @@ -816,11 +869,12 @@ def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_time
pass

if t in cls.LOWMAP[i] and not (
# do not support 0 as a month either for classical 5 fields cron
# or 6fields second repeat form
# do not support 0 as a month either for classical 5 fields cron,
# 6fields second repeat form or 7 fields year form
# but still let conversion happen if day field is shifted
(i in [2, 3] and len(expressions) == 5) or
(i in [3, 4] and len(expressions) == 6)
(i in [DAY_FIELD, MONTH_FIELD] and len(expressions) == UNIX_CRON_LEN) or
(i in [MONTH_FIELD, DOW_FIELD] and len(expressions) == SECOND_CRON_LEN) or
(i in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] and len(expressions) == YEAR_CRON_LEN)
):
t = cls.LOWMAP[i][t]

Expand Down
118 changes: 118 additions & 0 deletions src/croniter/tests/test_croniter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,10 @@ def test_match_handle_bad_cron(self):
datetime(2020, 1, 31),
day_or=False
))
self.assertFalse(croniter.match(
'0 0 31 1 * 0 2024/2',
datetime(2020, 1, 31),
))

def test_match_range(self):
self.assertTrue(croniter.match_range(
Expand Down Expand Up @@ -1491,6 +1495,120 @@ def test_invalid_question_mark(self):
self.assertRaises(CroniterBadCronError, croniter, "* ? * * *")
self.assertRaises(CroniterBadCronError, croniter, "* * ?,* * *")

def test_year(self):
itr1 = croniter("0 0 11 * * 0 2060", datetime(2050, 1, 1))
n1 = itr1.get_next(datetime)
self.assertEqual(n1.year, 2060)
self.assertEqual(n1.month, 1)
self.assertEqual(n1.day, 11)
n2 = itr1.get_next(datetime)
self.assertEqual(n2.year, 2060)
self.assertEqual(n2.month, 2)
self.assertEqual(n2.day, 11)

itr2 = croniter("0 0 11 * * 0 2050-2060", datetime(2055, 1, 30))
n3 = itr2.get_next(datetime)
self.assertEqual(n3.year, 2055)
self.assertEqual(n3.month, 2)
self.assertEqual(n3.day, 11)

itr3 = croniter("0 0 29 2 * 0 2025,2021-2023,2028", datetime(2020, 1, 1))
n4 = itr3.get_next(datetime)
self.assertEqual(n4.year, 2028)
self.assertEqual(n4.month, 2)
self.assertEqual(n4.day, 29)

itr4 = croniter("0 0 29 2 * 0 2025,*", datetime(2020, 1, 1))
n5 = itr4.get_next(datetime)
self.assertEqual(n5.year, 2020)
self.assertEqual(n5.month, 2)
self.assertEqual(n5.day, 29)

itr5 = croniter("0 0 29 2 * 0 2022/3", datetime(2020, 1, 1))
n6 = itr5.get_next(datetime)
self.assertEqual(n6.year, 2028)
self.assertEqual(n6.month, 2)
self.assertEqual(n6.day, 29)

itr6 = croniter("0 0 29 2 * 0 2023-2035/3", datetime(2020, 1, 1))
n7 = itr6.get_next(datetime)
self.assertEqual(n7.year, 2032)
self.assertEqual(n7.month, 2)
self.assertEqual(n7.day, 29)

def test_year_with_other_field(self):
itr1 = croniter("0 0 31 11-12 * 0 2023", datetime(2000, 1, 30))
n1 = itr1.get_next(datetime)
self.assertEqual(n1.year, 2023)
self.assertEqual(n1.month, 12)
self.assertEqual(n1.day, 31)

itr2 = croniter("0 0 31 1-2 * 0 2023-2025", datetime(2024, 12, 30))
n2 = itr2.get_next(datetime)
self.assertEqual(n2.year, 2025)
self.assertEqual(n2.month, 1)
self.assertEqual(n2.day, 31)

itr3 = croniter("0 0 1 1 1 0 2020-2030", datetime(2000, 1, 1), day_or=False)
n3 = itr3.get_next(datetime)
self.assertEqual(n3.year, 2024)
self.assertEqual(n3.month, 1)
self.assertEqual(n3.day, 1)

def test_year_get_prev(self):
itr1 = croniter("0 0 11 * * 0 2000", datetime(2010, 1, 1))
p1 = itr1.get_prev(datetime)
self.assertEqual(p1.year, 2000)
self.assertEqual(p1.month, 12)
self.assertEqual(p1.day, 11)

itr2 = croniter("0 0 11 * * 0 2000", datetime(2010, 1, 1))
p2 = itr2.get_prev(datetime)
self.assertEqual(p2.year, 2000)
self.assertEqual(p2.month, 12)
self.assertEqual(p2.day, 11)

itr2 = croniter("0 0 29 2 * 0 2010-2030", datetime(2020, 1, 1))
p2 = itr2.get_prev(datetime)
self.assertEqual(p2.year, 2016)
self.assertEqual(p2.month, 2)
self.assertEqual(p2.day, 29)

def test_year_match(self):
self.assertTrue(croniter.match("* * * * * * 2024", datetime(2024, 1, 1)))
self.assertTrue(croniter.match("59 58 23 31 12 * 2024",
datetime(2024, 12, 31, 23, 58, 59),
second_at_beginning=True))
self.assertFalse(croniter.match("* * * * * * 2024-2026", datetime(2027, 1, 1)))
self.assertFalse(croniter.match("* * * * * * 2024/2", datetime(2025, 1, 1)))

def test_year_bad_date_error(self):
with self.assertRaises(CroniterBadDateError):
itr = croniter("* * * * * * 2020", datetime(2030, 1, 1))
itr.get_next()
with self.assertRaises(CroniterBadDateError):
itr = croniter("* * * * * * 2020", datetime(2000, 1, 1))
itr.get_prev()
with self.assertRaises(CroniterBadDateError):
itr = croniter("* * 29 2 * * 2021-2023", datetime(2000, 1, 1))
itr.get_next()

def test_year_with_second_at_beginning(self):
base = datetime(2050, 1, 1)
itr = croniter("59 58 23 31 12 * 2070", base, second_at_beginning=True)
n = itr.get_next(datetime)
self.assertEqual(n.year, 2070)
self.assertEqual(n.month, 12)
self.assertEqual(n.day, 31)
self.assertEqual(n.hour, 23)
self.assertEqual(n.minute, 58)
self.assertEqual(n.second, 59)

def test_invalid_year(self):
self.assertRaises(CroniterBadCronError, croniter, "0 0 1 * * 0 1000")
self.assertRaises(CroniterBadCronError, croniter, "0 0 1 * * 0 99999")
self.assertRaises(CroniterBadCronError, croniter, "0 0 1 * * 0 2070#3")

def test_issue_47(self):
base = datetime(2021, 3, 30, 4, 0)
itr = croniter('0 6 30 3 *', base)
Expand Down
47 changes: 47 additions & 0 deletions src/croniter/tests/test_croniter_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ def test_hash_second(self):
'H H * * * H', datetime(2020, 1, 1, 11, 10, 32), timedelta(days=1)
)

def test_hash_year(self):
"""Test years
provide a seventh field as year
"""
self._test_iter(
'H H * * * H H', datetime(2066, 1, 1, 11, 10, 32), timedelta(days=1)
)

def test_hash_id_change(self):
"""Test a different hash_id returns different results given same definition and epoch"""
self._test_iter('H H * * *', datetime(2020, 1, 1, 11, 10), timedelta(days=1))
Expand All @@ -90,10 +99,14 @@ def test_hash_range(self):
self._test_iter(
'H H H(3-5) * *', datetime(2020, 1, 5, 11, 10), timedelta(days=31)
)
self._test_iter(
'H H * * * 0 H(2025-2030)', datetime(2029, 1, 1, 11, 10), timedelta(days=1)
)

def test_hash_division(self):
"""Test a hashed division definition"""
self._test_iter('H H/3 * * *', datetime(2020, 1, 1, 2, 10), timedelta(hours=3))
self._test_iter('H H H H * H H/2', datetime(2020, 9, 1, 11, 10, 32), timedelta(days=365 * 2))

def test_hash_range_division(self):
"""Test a hashed range + division definition"""
Expand Down Expand Up @@ -519,3 +532,37 @@ def test_expand_week_days_with_full_range(self):
assert len(days) == self.TOTAL
assert min(days) == self.MIN_VALUE
assert max(days) == self.MAX_VALUE


class CroniterHashExpanderExpandYearsTest(CroniterHashExpanderBase):
def test_expand_years_by_division(self):
years = set()
year_min, year_max = croniter.RANGES[6]
expression = '* * * * * * H/10'
for hash_id in self.HASH_IDS:
expanded = croniter.expand(expression, hash_id=hash_id)
assert len(expanded[0][6]) == 13
years.update(expanded[0][6])
assert len(years) == year_max - year_min + 1
assert min(years) == year_min
assert max(years) == year_max

def test_expand_years_by_range(self):
years = set()
expression = '* * * * * * H(2020-2030)'
for hash_id in self.HASH_IDS:
expanded = croniter.expand(expression, hash_id=hash_id)
years.add(expanded[0][6][0])
assert len(years) == 11
assert min(years) == 2020
assert max(years) == 2030

def test_expand_years_by_range_and_division(self):
years = set()
expression = '* * * * * * H(2020-2050)/10'
for hash_id in self.HASH_IDS:
expanded = croniter.expand(expression, hash_id=hash_id)
years.update(expanded[0][6])
assert len(years) == 31
assert min(years) == 2020
assert max(years) == 2050
Loading

0 comments on commit a5bf116

Please sign in to comment.