Skip to content

Commit

Permalink
Merge pull request #30677 from ankush/refactor/stock_balance
Browse files Browse the repository at this point in the history
refactor: stock balance report
  • Loading branch information
mergify[bot] authored Apr 14, 2022
2 parents e6aa28e + 9af2d68 commit 41ec5ca
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 110 deletions.
244 changes: 134 additions & 110 deletions erpnext/stock/report/stock_balance/stock_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,41 @@


from operator import itemgetter
from typing import Any, Dict, List, Optional, TypedDict

import frappe
from frappe import _
from frappe.query_builder.functions import CombineDatetime
from frappe.utils import cint, date_diff, flt, getdate
from frappe.utils.nestedset import get_descendants_of
from pypika.terms import ExistsCriterion

import erpnext
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
from erpnext.stock.report.stock_ledger.stock_ledger import get_item_group_condition
from erpnext.stock.utils import add_additional_uom_columns, is_reposting_item_valuation_in_progress


def execute(filters=None):
class StockBalanceFilter(TypedDict):
company: Optional[str]
from_date: str
to_date: str
item_group: Optional[str]
item: Optional[str]
warehouse: Optional[str]
warehouse_type: Optional[str]
include_uom: Optional[str] # include extra info in converted UOM
show_stock_ageing_data: bool
show_variant_attributes: bool


SLEntry = Dict[str, Any]


def execute(filters: Optional[StockBalanceFilter] = None):
is_reposting_item_valuation_in_progress()
if not filters:
filters = {}

to_date = filters.get("to_date")

if filters.get("company"):
company_currency = erpnext.get_company_currency(filters.get("company"))
else:
Expand Down Expand Up @@ -48,6 +65,7 @@ def execute(filters=None):

_func = itemgetter(1)

to_date = filters.get("to_date")
for (company, item, warehouse) in sorted(iwb_map):
if item_map.get(item):
qty_dict = iwb_map[(company, item, warehouse)]
Expand Down Expand Up @@ -92,7 +110,7 @@ def execute(filters=None):
return columns, data


def get_columns(filters):
def get_columns(filters: StockBalanceFilter):
"""return columns"""
columns = [
{
Expand Down Expand Up @@ -215,66 +233,77 @@ def get_columns(filters):
return columns


def get_conditions(filters):
conditions = ""
def apply_conditions(query, filters):
sle = frappe.qb.DocType("Stock Ledger Entry")
warehouse_table = frappe.qb.DocType("Warehouse")

if not filters.get("from_date"):
frappe.throw(_("'From Date' is required"))

if filters.get("to_date"):
conditions += " and sle.posting_date <= %s" % frappe.db.escape(filters.get("to_date"))
if to_date := filters.get("to_date"):
query = query.where(sle.posting_date <= to_date)
else:
frappe.throw(_("'To Date' is required"))

if filters.get("company"):
conditions += " and sle.company = %s" % frappe.db.escape(filters.get("company"))

if filters.get("warehouse"):
warehouse_details = frappe.db.get_value(
"Warehouse", filters.get("warehouse"), ["lft", "rgt"], as_dict=1
)
if warehouse_details:
conditions += (
" and exists (select name from `tabWarehouse` wh \
where wh.lft >= %s and wh.rgt <= %s and sle.warehouse = wh.name)"
% (warehouse_details.lft, warehouse_details.rgt)
if company := filters.get("company"):
query = query.where(sle.company == company)

if warehouse := filters.get("warehouse"):
lft, rgt = frappe.db.get_value("Warehouse", warehouse, ["lft", "rgt"])
chilren_subquery = (
frappe.qb.from_(warehouse_table)
.select(warehouse_table.name)
.where(
(warehouse_table.lft >= lft)
& (warehouse_table.rgt <= rgt)
& (warehouse_table.name == sle.warehouse)
)

if filters.get("warehouse_type") and not filters.get("warehouse"):
conditions += (
" and exists (select name from `tabWarehouse` wh \
where wh.warehouse_type = '%s' and sle.warehouse = wh.name)"
% (filters.get("warehouse_type"))
)
query = query.where(ExistsCriterion(chilren_subquery))
elif warehouse_type := filters.get("warehouse_type"):
query = (
query.join(warehouse_table)
.on(warehouse_table.name == sle.warehouse)
.where(warehouse_table.warehouse_type == warehouse_type)
)

return conditions

return query


def get_stock_ledger_entries(filters: StockBalanceFilter, items: List[str]) -> List[SLEntry]:
sle = frappe.qb.DocType("Stock Ledger Entry")

query = (
frappe.qb.from_(sle)
.select(
sle.item_code,
sle.warehouse,
sle.posting_date,
sle.actual_qty,
sle.valuation_rate,
sle.company,
sle.voucher_type,
sle.qty_after_transaction,
sle.stock_value_difference,
sle.item_code.as_("name"),
sle.voucher_no,
sle.stock_value,
sle.batch_no,
)
.where((sle.docstatus < 2) & (sle.is_cancelled == 0))
.orderby(CombineDatetime(sle.posting_date, sle.posting_time))
.orderby(sle.creation)
.orderby(sle.actual_qty)
)

def get_stock_ledger_entries(filters, items):
item_conditions_sql = ""
if items:
item_conditions_sql = " and sle.item_code in ({})".format(
", ".join(frappe.db.escape(i, percent=False) for i in items)
)
query = query.where(sle.item_code.isin(items))

conditions = get_conditions(filters)

return frappe.db.sql(
"""
select
sle.item_code, warehouse, sle.posting_date, sle.actual_qty, sle.valuation_rate,
sle.company, sle.voucher_type, sle.qty_after_transaction, sle.stock_value_difference,
sle.item_code as name, sle.voucher_no, sle.stock_value, sle.batch_no
from
`tabStock Ledger Entry` sle
where sle.docstatus < 2 %s %s
and is_cancelled = 0
order by sle.posting_date, sle.posting_time, sle.creation, sle.actual_qty"""
% (item_conditions_sql, conditions), # nosec
as_dict=1,
)
query = apply_conditions(query, filters)
return query.run(as_dict=True)


def get_item_warehouse_map(filters, sle):
def get_item_warehouse_map(filters: StockBalanceFilter, sle: List[SLEntry]):
iwb_map = {}
from_date = getdate(filters.get("from_date"))
to_date = getdate(filters.get("to_date"))
Expand Down Expand Up @@ -332,7 +361,7 @@ def get_item_warehouse_map(filters, sle):
return iwb_map


def filter_items_with_no_transactions(iwb_map, float_precision):
def filter_items_with_no_transactions(iwb_map, float_precision: float):
for (company, item, warehouse) in sorted(iwb_map):
qty_dict = iwb_map[(company, item, warehouse)]

Expand All @@ -349,60 +378,58 @@ def filter_items_with_no_transactions(iwb_map, float_precision):
return iwb_map


def get_items(filters):
def get_items(filters: StockBalanceFilter) -> List[str]:
"Get items based on item code, item group or brand."
conditions = []
if filters.get("item_code"):
conditions.append("item.name=%(item_code)s")
if item_code := filters.get("item_code"):
return [item_code]
else:
if filters.get("item_group"):
conditions.append(get_item_group_condition(filters.get("item_group")))
if filters.get("brand"): # used in stock analytics report
conditions.append("item.brand=%(brand)s")

items = []
if conditions:
items = frappe.db.sql_list(
"""select name from `tabItem` item where {}""".format(" and ".join(conditions)), filters
)
return items
item_filters = {}
if item_group := filters.get("item_group"):
children = get_descendants_of("Item Group", item_group, ignore_permissions=True)
item_filters["item_group"] = ("in", children + [item_group])
if brand := filters.get("brand"):
item_filters["brand"] = brand

return frappe.get_all("Item", filters=item_filters, pluck="name", order_by=None)

def get_item_details(items, sle, filters):

def get_item_details(items: List[str], sle: List[SLEntry], filters: StockBalanceFilter):
item_details = {}
if not items:
items = list(set(d.item_code for d in sle))

if not items:
return item_details

cf_field = cf_join = ""
if filters.get("include_uom"):
cf_field = ", ucd.conversion_factor"
cf_join = (
"left join `tabUOM Conversion Detail` ucd on ucd.parent=item.name and ucd.uom=%s"
% frappe.db.escape(filters.get("include_uom"))
item_table = frappe.qb.DocType("Item")

query = (
frappe.qb.from_(item_table)
.select(
item_table.name,
item_table.item_name,
item_table.description,
item_table.item_group,
item_table.brand,
item_table.stock_uom,
)

res = frappe.db.sql(
"""
select
item.name, item.item_name, item.description, item.item_group, item.brand, item.stock_uom %s
from
`tabItem` item
%s
where
item.name in (%s)
"""
% (cf_field, cf_join, ",".join(["%s"] * len(items))),
items,
as_dict=1,
.where(item_table.name.isin(items))
)

for item in res:
item_details.setdefault(item.name, item)
if uom := filters.get("include_uom"):
uom_conv_detail = frappe.qb.DocType("UOM Conversion Detail")
query = (
query.left_join(uom_conv_detail)
.on((uom_conv_detail.parent == item_table.name) & (uom_conv_detail.uom == uom))
.select(uom_conv_detail.conversion_factor)
)

result = query.run(as_dict=1)

if filters.get("show_variant_attributes", 0) == 1:
for item_table in result:
item_details.setdefault(item_table.name, item_table)

if filters.get("show_variant_attributes"):
variant_values = get_variant_values_for(list(item_details))
item_details = {k: v.update(variant_values.get(k, {})) for k, v in item_details.items()}

Expand All @@ -413,36 +440,33 @@ def get_item_reorder_details(items):
item_reorder_details = frappe._dict()

if items:
item_reorder_details = frappe.db.sql(
"""
select parent, warehouse, warehouse_reorder_qty, warehouse_reorder_level
from `tabItem Reorder`
where parent in ({0})
""".format(
", ".join(frappe.db.escape(i, percent=False) for i in items)
),
as_dict=1,
item_reorder_details = frappe.get_all(
"Item Reorder",
["parent", "warehouse", "warehouse_reorder_qty", "warehouse_reorder_level"],
filters={"parent": ("in", items)},
)

return dict((d.parent + d.warehouse, d) for d in item_reorder_details)


def get_variants_attributes():
def get_variants_attributes() -> List[str]:
"""Return all item variant attributes."""
return [i.name for i in frappe.get_all("Item Attribute")]
return frappe.get_all("Item Attribute", pluck="name")


def get_variant_values_for(items):
"""Returns variant values for items."""
attribute_map = {}
for attr in frappe.db.sql(
"""select parent, attribute, attribute_value
from `tabItem Variant Attribute` where parent in (%s)
"""
% ", ".join(["%s"] * len(items)),
tuple(items),
as_dict=1,
):

attribute_info = frappe.get_all(
"Item Variant Attribute",
["parent", "attribute", "attribute_value"],
{
"parent": ("in", items),
},
)

for attr in attribute_info:
attribute_map.setdefault(attr["parent"], {})
attribute_map[attr["parent"]].update({attr["attribute"]: attr["attribute_value"]})

Expand Down
Loading

0 comments on commit 41ec5ca

Please sign in to comment.