Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: to LNbits 1.0.0 #99

Merged
merged 19 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "TPoS",
"short_description": "A shareable PoS terminal!",
"tile": "/tpos/static/image/tpos.png",
"min_lnbits_version": "0.12.11",
"min_lnbits_version": "1.0.0",
"contributors": [
{
"name": "Ben Arc",
Expand Down
113 changes: 41 additions & 72 deletions crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import List, Optional, Union
from typing import Optional, Union

from lnbits.db import Database
from lnbits.helpers import urlsafe_short_hash
from lnbits.helpers import insert_query, update_query, urlsafe_short_hash
from loguru import logger

from .models import CreateTposData, LNURLCharge, TPoS, TPoSClean
from .models import CreateTposData, LnurlCharge, Tpos, TposClean

db = Database("ext_tpos")

Expand All @@ -18,43 +18,19 @@ async def get_current_timestamp():
return int(current_timestamp)


async def create_tpos(wallet_id: str, data: CreateTposData) -> TPoS:
async def create_tpos(data: CreateTposData) -> Tpos:
tpos_id = urlsafe_short_hash()
tpos = Tpos(id=tpos_id, **data.dict())
await db.execute(
"""
INSERT INTO tpos.pos
(
id, wallet, name, currency, tip_options, tip_wallet, withdrawlimit,
withdrawpin, withdrawamt, withdrawtime, withdrawbtwn, withdrawtimeopt,
withdrawpindisabled, withdrawpremium
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tpos_id,
wallet_id,
data.name,
data.currency,
data.tip_options,
data.tip_wallet,
data.withdrawlimit,
data.withdrawpin,
0,
0,
data.withdrawbtwn,
data.withdrawtimeopt,
data.withdrawpindisabled,
data.withdrawpremium,
),
insert_query("tpos.pos", tpos),
tpos.dict(),
)
tpos = await get_tpos(tpos_id)
assert tpos, "Newly created tpos couldn't be retrieved"
return tpos


async def get_tpos(tpos_id: str) -> Optional[TPoS]:
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = ?", (tpos_id,))
return TPoS(**row) if row else None
async def get_tpos(tpos_id: str) -> Optional[Tpos]:
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id})
return Tpos(**row) if row else None


async def start_lnurlcharge(tpos_id: str):
Expand All @@ -63,45 +39,40 @@ async def start_lnurlcharge(tpos_id: str):

now = await get_current_timestamp()
withdraw_time_seconds = (
tpos.withdrawbtwn * 60 if tpos.withdrawtimeopt != "secs" else tpos.withdrawbtwn
tpos.withdraw_between * 60
if tpos.withdraw_time_option != "secs"
else tpos.withdraw_between
)
assert (
now - tpos.withdrawtime > withdraw_time_seconds
now - tpos.withdraw_time > withdraw_time_seconds
), f"""
Last withdraw was made too recently, please try again in
{int(withdraw_time_seconds - (now - tpos.withdrawtime))} secs
{int(withdraw_time_seconds - (now - tpos.withdraw_time))} secs
"""

token = urlsafe_short_hash()
await db.execute(
"""
INSERT INTO tpos.withdraws (id, tpos_id)
VALUES (?, ?)
VALUES (:id, :tpos_id)
""",
(token, tpos_id),
{"id": token, "tpos_id": tpos_id},
)
lnurlcharge = await get_lnurlcharge(token)
return lnurlcharge


async def get_lnurlcharge(lnurlcharge_id: str) -> Optional[LNURLCharge]:
async def get_lnurlcharge(lnurlcharge_id: str) -> Optional[LnurlCharge]:
row = await db.fetchone(
"SELECT * FROM tpos.withdraws WHERE id = ?", (lnurlcharge_id,)
"SELECT * FROM tpos.withdraws WHERE id = :id", {"id": lnurlcharge_id}
)
return LNURLCharge(**row) if row else None


async def update_lnurlcharge(data: LNURLCharge) -> LNURLCharge:
# Construct the SET clause for the SQL query
set_clause = ", ".join([f"{field[0]} = ?" for field in data.dict().items()])
return LnurlCharge(**row) if row else None

# Get the values for the SET clause
set_values = list(data.dict().values())
set_values.append(data.id) # Add the ID for the WHERE clause

# Execute the UPDATE statement
async def update_lnurlcharge(data: LnurlCharge) -> LnurlCharge:
await db.execute(
f"UPDATE tpos.withdraws SET {set_clause} WHERE id = ?", tuple(set_values)
update_query("tpos.withdraws", data),
data.dict(),
)

lnurlcharge = await get_lnurlcharge(data.id)
Expand All @@ -110,17 +81,19 @@ async def update_lnurlcharge(data: LNURLCharge) -> LNURLCharge:
return lnurlcharge


async def get_clean_tpos(tpos_id: str) -> Optional[TPoSClean]:
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = ?", (tpos_id,))
return TPoSClean(**row) if row else None
async def get_clean_tpos(tpos_id: str) -> Optional[TposClean]:
row = await db.fetchone("SELECT * FROM tpos.pos WHERE id = :id", {"id": tpos_id})
return TposClean(**row) if row else None


async def update_tpos_withdraw(data: TPoS, tpos_id: str) -> TPoS:
async def update_tpos_withdraw(data: Tpos, tpos_id: str) -> Tpos:
# Calculate the time between withdrawals in seconds
now = await get_current_timestamp()
time_elapsed = now - data.withdrawtime
time_elapsed = now - data.withdraw_time
withdraw_time_seconds = (
data.withdrawbtwn * 60 if data.withdrawtimeopt != "secs" else data.withdrawbtwn
data.withdraw_between * 60
if data.withdraw_time_option != "secs"
else data.withdraw_between
)

logger.debug(f"Time between: {time_elapsed} seconds")
Expand All @@ -135,34 +108,30 @@ async def update_tpos_withdraw(data: TPoS, tpos_id: str) -> TPoS:

# Update the withdraw time in the database
await db.execute(
"UPDATE tpos.pos SET withdrawtime = ? WHERE id = ?", (now, tpos_id)
"UPDATE tpos.pos SET withdrawtime = :time WHERE id = :id",
{"time": now, "id": tpos_id},
)

tpos = await get_tpos(tpos_id)
assert tpos, "Newly updated tpos couldn't be retrieved"
return tpos


async def update_tpos(tpos_id: str, **kwargs) -> TPoS:
q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()])
async def update_tpos(tpos: Tpos) -> Tpos:
await db.execute(
f"UPDATE tpos.pos SET {q} WHERE id = ?", (*kwargs.values(), tpos_id)
update_query("tpos.pos", tpos),
tpos.dict(),
)
tpos = await get_tpos(tpos_id)
assert tpos, "Newly updated tpos couldn't be retrieved"
return tpos


async def get_tposs(wallet_ids: Union[str, List[str]]) -> List[TPoS]:
async def get_tposs(wallet_ids: Union[str, list[str]]) -> list[Tpos]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]

q = ",".join(["?"] * len(wallet_ids))
rows = await db.fetchall(
f"SELECT * FROM tpos.pos WHERE wallet IN ({q})", (*wallet_ids,)
)
return [TPoS(**row) for row in rows]
q = ",".join([f"'{wallet_id}'" for wallet_id in wallet_ids])
rows = await db.fetchall(f"SELECT * FROM tpos.pos WHERE wallet IN ({q})")
return [Tpos(**row) for row in rows]


async def delete_tpos(tpos_id: str) -> None:
await db.execute("DELETE FROM tpos.pos WHERE id = ?", (tpos_id,))
await db.execute("DELETE FROM tpos.pos WHERE id = :id", {"id": tpos_id})
25 changes: 25 additions & 0 deletions migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,28 @@ async def m009_tax_inclusive(db):
"ALTER TABLE tpos.pos ADD COLUMN tax_inclusive BOOL NOT NULL DEFAULT true;"
)
await db.execute("ALTER TABLE tpos.pos ADD COLUMN tax_default FLOAT DEFAULT 0;")


async def m010_rename_tpos_withdraw_columns(db):
"""
Add rename tpos withdraw columns
"""
await db.execute(
"""
CREATE TABLE tpos.pos_backup AS
SELECT
id, name, currency, items, wallet, tax_inclusive,
tax_default, tip_wallet, tip_options,
withdrawtime AS withdraw_time,
withdrawbtwn AS withdraw_between,
withdrawlimit AS withdraw_limit,
withdrawamt AS withdraw_amount,
withdrawtimeopt AS withdraw_time_option,
withdrawpremium AS withdraw_premium,
withdrawpindisabled AS withdraw_pin_disabled,
withdrawpin AS withdraw_pin
FROM tpos.pos
"""
)
await db.execute("DROP TABLE tpos.pos")
await db.execute("ALTER TABLE tpos.pos_backup RENAME TO pos")
71 changes: 32 additions & 39 deletions models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from sqlite3 import Row
from typing import List, Optional
from typing import Optional

from fastapi import Request
from lnurl import Lnurl
Expand All @@ -11,61 +10,55 @@ class CreateTposData(BaseModel):
wallet: Optional[str]
name: Optional[str]
currency: Optional[str]
tip_options: str = Field("[]")
tip_wallet: str = Field("")
withdrawlimit: int = Field(None, ge=1)
withdrawpin: int = Field(None, ge=1)
withdrawamt: int = Field(None, ge=0)
withdrawtime: int = Field(0)
withdrawtimeopt: Optional[str]
withdrawbtwn: int = Field(10, ge=1)
withdrawpremium: float = Field(None)
withdrawpindisabled: bool = Field(False)
tax_inclusive: bool = Field(True)
tax_default: float = Field(None)
tip_options: str = Field("[]")
tip_wallet: str = Field("")
withdraw_time: int = Field(0)
withdraw_between: int = Field(10, ge=1)
withdraw_limit: Optional[int] = Field(None, ge=1)
withdraw_pin: Optional[int] = Field(None, ge=1)
withdraw_amount: Optional[int] = Field(None, ge=0)
withdraw_time_option: Optional[str] = Field(None)
withdraw_premium: Optional[float] = Field(None)
withdraw_pin_disabled: bool = Field(False)


class TPoSClean(BaseModel):
class TposClean(BaseModel):
id: str
name: str
currency: str
tip_options: Optional[str]
withdrawlimit: Optional[int]
withdrawamt: int
withdrawtime: int
withdrawtimeopt: Optional[str]
withdrawbtwn: int
withdrawpremium: Optional[float]
withdrawpindisabled: Optional[bool]
items: Optional[str]
tax_inclusive: bool
tax_default: Optional[float]

@classmethod
def from_row(cls, row: Row) -> "TPoSClean":
return cls(**dict(row))
tax_default: Optional[float] = None
withdraw_time: int
withdraw_between: int
withdraw_limit: Optional[int] = None
withdraw_amount: Optional[int] = None
withdraw_time_option: Optional[str] = None
withdraw_premium: Optional[float] = None
withdraw_pin_disabled: Optional[bool] = None
items: Optional[str] = None
tip_options: Optional[str] = None

@property
def withdrawamtposs(self) -> int:
return self.withdrawlimit - self.withdrawamt if self.withdrawlimit else 0
def withdraw_maximum(self) -> int:
if not self.withdraw_amount or not self.withdraw_limit:
return 0
return self.withdraw_limit - self.withdraw_amount


class TPoS(TPoSClean, BaseModel):
class Tpos(TposClean, BaseModel):
wallet: str
tip_wallet: Optional[str]
withdrawpin: Optional[int]
tip_wallet: Optional[str] = None
withdraw_pin: Optional[int] = None


class LNURLCharge(BaseModel):
class LnurlCharge(BaseModel):
id: str
tpos_id: str
amount: int = Field(None)
claimed: bool = Field(False)

@classmethod
def from_row(cls, row: Row) -> "LNURLCharge":
return cls(**dict(row))

def lnurl(self, req: Request) -> Lnurl:
url = str(
req.url_for(
Expand Down Expand Up @@ -93,12 +86,12 @@ class Item(BaseModel):
description: Optional[str]
tax: Optional[float] = Field(0, ge=0.0)
disabled: bool = False
categories: Optional[List[str]] = []
categories: Optional[list[str]] = []

@validator("tax", pre=True, always=True)
def set_default_tax(cls, v):
return v or 0


class CreateUpdateItemData(BaseModel):
items: List[Item]
items: list[Item]
Loading