diff --git a/boxoffice/models/discount_policy.py b/boxoffice/models/discount_policy.py index 5ee4c1e7..7ba28f06 100644 --- a/boxoffice/models/discount_policy.py +++ b/boxoffice/models/discount_policy.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, List, Optional, Sequence from uuid import UUID import secrets import string @@ -203,8 +204,8 @@ def make_bulk(cls, discount_code_base, **kwargs) -> DiscountPolicy: @classmethod def get_from_ticket( - cls, ticket: Item, qty, coupon_codes=() - ) -> List[Tuple[DiscountPolicy, Optional[DiscountCoupon]]]: + cls, ticket: Item, qty, coupon_codes: Sequence[str] = () + ) -> List[PolicyCoupon]: """ Return a list of (discount_policy, discount_coupon) tuples. @@ -214,9 +215,7 @@ def get_from_ticket( cls.discount_type == DiscountTypeEnum.AUTOMATIC, cls.item_quantity_min <= qty, ).all() - policies: List[Tuple[DiscountPolicy, Optional[DiscountCoupon]]] = [ - (discount, None) for discount in automatic_discounts - ] + policies = [PolicyCoupon(discount, None) for discount in automatic_discounts] if not coupon_codes: return policies coupon_policies = ticket.discount_policies.filter( @@ -250,7 +249,7 @@ def get_from_ticket( for coupon in coupons: if coupon.usage_limit > coupon.used_count: - policies.append((coupon.discount_policy, coupon)) + policies.append(PolicyCoupon(coupon.discount_policy, coupon)) return policies @property @@ -348,6 +347,12 @@ def update_used_count(self): ) +@dataclass +class PolicyCoupon: + policy: DiscountPolicy + coupon: Optional[DiscountCoupon] + + create_title_trgm_trigger = sa.DDL( 'CREATE INDEX idx_discount_policy_title_trgm on discount_policy' ' USING gin (title gin_trgm_ops);' diff --git a/boxoffice/models/line_item.py b/boxoffice/models/line_item.py index 5f6a6364..45232bff 100644 --- a/boxoffice/models/line_item.py +++ b/boxoffice/models/line_item.py @@ -1,23 +1,23 @@ from __future__ import annotations -from collections import OrderedDict, namedtuple +from collections import OrderedDict from datetime import date, datetime, timedelta, tzinfo from decimal import Decimal from typing import ( TYPE_CHECKING, Dict, - Iterable, + List, + NamedTuple, Optional, Sequence, Union, cast, - overload, ) from uuid import UUID from flask import current_app from isoweek import Week -from typing_extensions import Literal +from typing_extensions import TypedDict from baseframe import localize_timezone from coaster.utils import isoweek_datetime, midnight_to_utc, utcnow @@ -38,30 +38,20 @@ __all__ = ['LineItem', 'Assignee'] -LineItemTuple = namedtuple( - 'LineItemTuple', - [ - 'ticket_id', - 'id', - 'base_amount', - 'discount_policy_id', - 'discount_coupon_id', - 'discounted_amount', - 'final_amount', - ], -) +class LineItemTuple(NamedTuple): + """Duck-type for LineItem.""" + id: Optional[UUID] # noqa: A003 + ticket_id: UUID + base_amount: Optional[Decimal] + discount_policy_id: Optional[UUID] = None + discount_coupon_id: Optional[UUID] = None + discounted_amount: Optional[Decimal] = Decimal(0) + final_amount: Optional[Decimal] = None -def make_ntuple(ticket_id, base_amount, **kwargs): - return LineItemTuple( - ticket_id, - kwargs.get('line_item_id', None), - base_amount, - kwargs.get('discount_policy_id', None), - kwargs.get('discount_coupon_id', None), - kwargs.get('discounted_amount', Decimal(0)), - kwargs.get('final_amount', None), - ) + +class LineItemDict(TypedDict): + ticket_id: str class Assignee(BaseMixin, Model): @@ -151,77 +141,51 @@ def permissions(self, actor, inherited=None): perms.add('org_admin') return perms - @overload - @classmethod - def calculate( - cls, - line_items: Sequence[LineItem], - realculate: Literal[True], - coupons: Optional[Sequence[str]] = None, - ): - ... - - @overload @classmethod def calculate( cls, - line_items: Sequence[dict], - realculate: Literal[False], + line_items: Sequence[Union[LineItem, LineItemDict]], coupons: Optional[Sequence[str]] = None, - ): - ... - - # TODO: Fix this classmethod's typing - @classmethod - def calculate( - cls, - line_items: Iterable[Union[LineItem, dict]], # FIXME - recalculate=False, - coupons=None, - ): + ) -> Sequence[LineItemTuple]: """ Return line item data tuples. For each line item, returns a tuple of base_amount, discounted_amount, final_amount, discount_policy and discount coupon populated - - If the `recalculate` flag is set to `True`, the line_items will be considered - as SQLAlchemy objects. """ - item_line_items = {} - calculated_line_items = [] + base_amount: Optional[Decimal] + item_line_items: Dict[str, List[LineItemTuple]] = {} + calculated_line_items: List[LineItemTuple] = [] coupon_list = list(set(coupons)) if coupons else [] discounter = LineItemDiscounter() # make named tuples for line items, # assign the base_amount for each of them, None if a ticket is unavailable for line_item in line_items: - if recalculate: + ticket: Optional[Item] + if isinstance(line_item, LineItem): ticket = line_item.ticket - # existing line item, use the original base amount base_amount = line_item.base_amount - line_item_id = line_item.id + line_item_id = cast(UUID, line_item.id) else: ticket = Item.query.get(line_item['ticket_id']) - # new line item, use the current price if ticket is None: - base_amount = None - else: - current_price = ticket.current_price() - base_amount = ( - current_price.amount - if current_price is not None and ticket.is_available - else None - ) + continue + current_price = ticket.current_price() + base_amount = ( + current_price.amount + if current_price is not None and ticket.is_available + else None + ) line_item_id = None if not item_line_items.get(str(ticket.id)): item_line_items[str(ticket.id)] = [] item_line_items[str(ticket.id)].append( - make_ntuple( - ticket_id=ticket.id, + LineItemTuple( + id=line_item_id, + ticket_id=cast(UUID, ticket.id), base_amount=base_amount, - line_item_id=line_item_id, ) ) diff --git a/boxoffice/models/line_item_discounter.py b/boxoffice/models/line_item_discounter.py index cceb0b6d..8e281369 100644 --- a/boxoffice/models/line_item_discounter.py +++ b/boxoffice/models/line_item_discounter.py @@ -1,17 +1,28 @@ from __future__ import annotations from decimal import Decimal -from typing import List, Optional, Tuple +from typing import List, Sequence, Union, cast +from uuid import UUID import itertools +from beartype import beartype + +from .discount_policy import DiscountCoupon, DiscountPolicy, PolicyCoupon +from .line_item import LineItemTuple + __all__ = ['LineItemDiscounter'] class LineItemDiscounter: - def get_discounted_line_items(self, line_items, coupons=()): + @beartype + def get_discounted_line_items( + self, + line_items: Sequence[LineItemTuple], + coupons: Sequence[str] = (), + ) -> Sequence[LineItemTuple]: """Return line items with the maximum possible discount applied.""" if not line_items: - return None + return [] if len({line_item.ticket_id for line_item in line_items}) > 1: raise ValueError("line_items must be of the same ticket_id") @@ -24,9 +35,10 @@ def get_discounted_line_items(self, line_items, coupons=()): return self.apply_discount(valid_discounts[0], line_items) return line_items + @beartype def get_valid_discounts( - self, line_items, coupons - ) -> List[Tuple[DiscountPolicy, Optional[DiscountCoupon]]]: + self, line_items: Sequence[LineItemTuple], coupons: Sequence[str] + ) -> Sequence[PolicyCoupon]: """Return available discounts given line items and coupon codes.""" if not line_items: return [] @@ -38,9 +50,10 @@ def get_valid_discounts( return DiscountPolicy.get_from_ticket(ticket, len(line_items), coupons) + @beartype def calculate_discounted_amount( self, discount_policy: DiscountPolicy, line_item: LineItemTuple - ): + ) -> Decimal: if line_item.base_amount is None or line_item.base_amount == Decimal(0): # Ticket has expired, no discount return Decimal(0) @@ -59,10 +72,17 @@ def calculate_discounted_amount( return line_item.base_amount - discounted_price.amount return (discount_policy.percentage or 0) * line_item.base_amount / Decimal(100) - def is_coupon_usable(self, coupon, applied_to_count): + @beartype + def is_coupon_usable(self, coupon: DiscountCoupon, applied_to_count: int) -> bool: return (coupon.usage_limit - coupon.used_count) > applied_to_count - def apply_discount(self, policy_coupon, line_items, combo=False): + @beartype + def apply_discount( + self, + policy_coupon: PolicyCoupon, + line_items: Sequence[LineItemTuple], + combo: bool = False, + ) -> Sequence[LineItemTuple]: """ Apply discounts on given line items. @@ -72,19 +92,17 @@ def apply_discount(self, policy_coupon, line_items, combo=False): Assumes that the discount policies and discount coupons passed as arguments are valid and usable. """ - discounted_line_items = [] - # unpack (discount_policy, dicount_coupon) - discount_policy, coupon = policy_coupon + discounted_line_items: List[LineItemTuple] = [] applied_to_count = 0 for line_item in line_items: discounted_amount = self.calculate_discounted_amount( - discount_policy, line_item + policy_coupon.policy, line_item ) if ( ( # pylint: disable=too-many-boolean-expressions - coupon - and self.is_coupon_usable(coupon, applied_to_count) - or discount_policy.is_automatic + policy_coupon.coupon + and self.is_coupon_usable(policy_coupon.coupon, applied_to_count) + or policy_coupon.policy.is_automatic ) and discounted_amount > 0 and ( @@ -95,12 +113,14 @@ def apply_discount(self, policy_coupon, line_items, combo=False): # if the line item's assigned discount is lesser than the current # discount, assign the current discount to the line item discounted_line_items.append( - make_ntuple( + LineItemTuple( + id=cast(UUID, line_item.id) if line_item.id else None, ticket_id=line_item.ticket_id, base_amount=line_item.base_amount, - line_item_id=line_item.id if line_item.id else None, - discount_policy_id=discount_policy.id, - discount_coupon_id=coupon.id if coupon else None, + discount_policy_id=cast(UUID, policy_coupon.policy.id), + discount_coupon_id=cast(UUID, policy_coupon.coupon.id) + if policy_coupon.coupon + else None, discounted_amount=discounted_amount, final_amount=line_item.base_amount - discounted_amount, ) @@ -111,7 +131,12 @@ def apply_discount(self, policy_coupon, line_items, combo=False): discounted_line_items.append(line_item) return discounted_line_items - def apply_combo_discount(self, discounts, line_items): + @beartype + def apply_combo_discount( + self, + discounts: List[PolicyCoupon], + line_items: Sequence[LineItemTuple], + ) -> Sequence[LineItemTuple]: """Apply multiple discounts to a list of line items recursively.""" if len(discounts) == 0: return line_items @@ -121,7 +146,17 @@ def apply_combo_discount(self, discounts, line_items): [discounts[0]], self.apply_combo_discount(discounts[1:], line_items) ) - def apply_max_discount(self, discounts, line_items): + @beartype + def apply_max_discount( + self, + discounts: List[ + Union[ + PolicyCoupon, + Sequence[PolicyCoupon], + ] + ], + line_items: Sequence[LineItemTuple], + ) -> Sequence[LineItemTuple]: """ Find and apply the maximum discount available. @@ -130,10 +165,10 @@ def apply_max_discount(self, discounts, line_items): line items. """ discounts.extend(self.get_combos(discounts, len(line_items))) - discounted_line_items_list = [] + discounted_line_items_list: List[Sequence[LineItemTuple]] = [] for discount in discounts: - if isinstance(discount[0], tuple): + if isinstance(discount, list): # Combo discount discounted_line_items_list.append( self.apply_combo_discount(discount, line_items) @@ -149,9 +184,12 @@ def apply_max_discount(self, discounts, line_items): ), ) - def get_combos(self, discounts, qty): + @beartype + def get_combos( + self, discounts: List[PolicyCoupon], qty: int + ) -> List[List[PolicyCoupon]]: """Return valid discount combinations given a list of discount policies.""" - valid_combos = [] + valid_combos: List[List[PolicyCoupon]] = [] if len(discounts) < 2: return valid_combos @@ -160,14 +198,12 @@ def get_combos(self, discounts, qty): for n in range(2, len(discounts) + 1): combos = list(itertools.combinations(discounts, n)) for combo in combos: - if sum(discount.item_quantity_min for discount, coupon in combo) <= qty: + if sum(pc.policy.item_quantity_min for pc in combo) <= qty: # if number of line items is gte to number of items the discount # policies as a combo supports, count it as a valid combo - valid_combos.append(combo) + valid_combos.append(list(combo)) return valid_combos # Tail imports -from .discount_policy import DiscountPolicy, DiscountCoupon # isort:skip from .item import Item # isort:skip -from .line_item import LineItemTuple, make_ntuple # isort:skip diff --git a/boxoffice/views/order.py b/boxoffice/views/order.py index 736879ef..57add44f 100644 --- a/boxoffice/views/order.py +++ b/boxoffice/views/order.py @@ -683,7 +683,6 @@ def update_order_on_line_item_cancellation( ] recalculated_line_item_tups = LineItem.calculate( active_line_items, - recalculate=True, coupons=get_coupon_codes_from_line_items(active_line_items), ) diff --git a/instance/settings-sample.py b/instance/settings-sample.py index 8d85f5f8..560fae20 100644 --- a/instance/settings-sample.py +++ b/instance/settings-sample.py @@ -31,8 +31,6 @@ LASTUSER_CLIENT_ID = '' #: Lastuser client secret LASTUSER_CLIENT_SECRET = '' # nosec -DEBUG = True -ASSET_MANIFEST_PATH = "static/build/manifest.json" #: Cache settings CACHE_TYPE = 'redis' #: RQ settings diff --git a/requirements.txt b/requirements.txt index 8bf992b8..4880612c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ git+https://github.com/hasgeek/baseframe#egg=baseframe +beartype click git+https://github.com/hasgeek/coaster#egg=coaster Flask diff --git a/tests/test_order.py b/tests/test_order.py index ec88c85e..dcb32789 100644 --- a/tests/test_order.py +++ b/tests/test_order.py @@ -369,7 +369,9 @@ def test_free_order(client, all_data) -> None: coupon = DiscountCoupon.query.filter_by(code='coupon2').one() assert coupon.used_count == 1 assert order.status == OrderStatus.SALES_ORDER - assert order.line_items[0].status == LineItemStatus.CONFIRMED + assert order.line_items[0].status == ( # type: ignore[unreachable] + LineItemStatus.CONFIRMED + ) def test_cancel_line_item_in_order(db_session, client, all_data, post_env) -> None: @@ -452,6 +454,7 @@ def test_cancel_line_item_in_order(db_session, client, all_data, post_env) -> No .order_by(PaymentTransaction.created_at.desc()) .first() ) + assert refund_transaction1 is not None assert refund_transaction1.amount == expected_refund_amount @@ -535,6 +538,7 @@ def test_cancel_line_item_in_bulk_order(db_session, client, all_data, post_env) .order_by(PaymentTransaction.created_at.desc()) .first() ) + assert refund_transaction2 is not None assert refund_transaction2.amount == second_line_item.final_amount # test failed cancellation