minor: add table as save for unknown items

This commit is contained in:
Lunaresk 2024-06-16 16:21:57 +02:00
parent b57dbba5d6
commit 560016cc18
5 changed files with 114 additions and 10 deletions

View File

@ -0,0 +1,40 @@
"""new table for bought entries with unknown items
Revision ID: 24b8e319c0d0
Revises: 015f4256bb4c
Create Date: 2024-06-02 13:14:38.681605
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '24b8e319c0d0'
down_revision = '015f4256bb4c'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('payment', schema=None) as batch_op:
batch_op.alter_column('token',
existing_type=sa.VARCHAR(length=15),
nullable=True)
batch_op.drop_constraint('payment_token_fkey', type_='foreignkey')
batch_op.create_foreign_key(None, 'login_token', ['token'], ['token'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('payment', schema=None) as batch_op:
batch_op.drop_constraint(None, type_='foreignkey')
batch_op.create_foreign_key('payment_token_fkey', 'login_token', ['token'], ['token'], onupdate='CASCADE', ondelete='CASCADE')
batch_op.alter_column('token',
existing_type=sa.VARCHAR(length=15),
nullable=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,35 @@
"""new table for bought entries with unknown items 2
Revision ID: 2a64d3b9235a
Revises: 24b8e319c0d0
Create Date: 2024-06-02 13:19:59.901053
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2a64d3b9235a'
down_revision = '24b8e319c0d0'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('bought_with_unknown_item',
sa.Column('token', sa.String(length=15), nullable=False),
sa.Column('item', sa.BigInteger(), nullable=False),
sa.Column('date', sa.Date(), nullable=False),
sa.Column('amount', sa.SmallInteger(), nullable=False),
sa.ForeignKeyConstraint(['token'], ['login_token.token'], ),
sa.PrimaryKeyConstraint('token', 'item', 'date')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('bought_with_unknown_item')
# ### end Alembic commands ###

View File

@ -3,6 +3,7 @@ from .receipt_item import ReceiptItem
from .receipt import Receipt from .receipt import Receipt
from .login_token import LoginToken from .login_token import LoginToken
from .bought import Bought from .bought import Bought
from .bought_with_unknown_item import BoughtWithUnknownItem
from .establishment import Establishment from .establishment import Establishment
from .establishment_candidate import EstablishmentCandidate from .establishment_candidate import EstablishmentCandidate
from .user import User from .user import User

View File

@ -0,0 +1,14 @@
from src import db
from .bought import Bought
class BoughtWithUnknownItem(db.Model):
token = db.Column(db.ForeignKey('login_token.token'),
primary_key=True, server_onupdate=db.FetchedValue())
item = db.Column(db.BigInteger, primary_key=True,
server_onupdate=db.FetchedValue())
date = db.Column(db.Date, primary_key=True)
amount = db.Column(db.SmallInteger, nullable=False)
def __repr__(self) -> str:
return f"<Bought Object>"

View File

@ -8,7 +8,7 @@ from sqlalchemy.dialects.postgresql import insert
from string import ascii_letters, digits from string import ascii_letters, digits
from .view_utils import bought_with_prices as bwp from .view_utils import bought_with_prices as bwp
from src import db, LOGGER from src import db, LOGGER
from models import Bought, Establishment, Item, LoginToken, User from models import Bought, BoughtWithUnknownItem, Establishment, Item, LoginToken, User
def insert_bought_items(token: str, dates: list[dict[str: any]]): def insert_bought_items(token: str, dates: list[dict[str: any]]):
@ -19,21 +19,35 @@ def insert_bought_items(token: str, dates: list[dict[str: any]]):
item['item_id']), date=date['date'], amount=int(item["amount"])) item['item_id']), date=date['date'], amount=int(item["amount"]))
query_insert = query_insert.on_conflict_do_update( query_insert = query_insert.on_conflict_do_update(
"bought_pkey", set_=dict(amount=text(f'bought.amount + {int(item["amount"])}'))) "bought_pkey", set_=dict(amount=text(f'bought.amount + {int(item["amount"])}')))
try: if(_query_successful(query_insert)):
db.session.execute(query_insert) item_index = dates[date_index]['items'].index(item)
db.session.commit() del (dates[date_index]['items'][item_index])
except errors.ForeignKeyViolation as e:
db.session.rollback()
except Exception as e:
db.session.rollback()
LOGGER.exception("")
else: else:
query_insert = insert(BoughtWithUnknownItem).values(token=token, item=int(
item['item_id']), date=date['date'], amount=int(item["amount"]))
query_insert = query_insert.on_conflict_do_update(
"bought_with_unknown_item_pkey", set_=dict(amount=text(f'bought_with_unknown_item.amount + {int(item["amount"])}')))
if(_query_successful(query_insert)):
item_index = dates[date_index]['items'].index(item) item_index = dates[date_index]['items'].index(item)
del (dates[date_index]['items'][item_index]) del (dates[date_index]['items'][item_index])
if len(dates[date_index]['items']) == 0: if len(dates[date_index]['items']) == 0:
del (dates[date_index]) del (dates[date_index])
return {'token': token, 'dates': dates} if dates else {} return {'token': token, 'dates': dates} if dates else {}
def _query_successful(query):
try:
db.session.execute(query)
db.session.commit()
except errors.ForeignKeyViolation as e:
db.session.rollback()
return False
except Exception as e:
db.session.rollback()
LOGGER.exception("")
return False
else:
return True
def generate_token(length=15, allowed_chars=ascii_letters + digits): def generate_token(length=15, allowed_chars=ascii_letters + digits):
new_token = "".join((rndchoice(allowed_chars) for i in range(length))) new_token = "".join((rndchoice(allowed_chars) for i in range(length)))
if not LoginToken.query.filter_by(token=new_token).first(): if not LoginToken.query.filter_by(token=new_token).first():