patch: change user when other user is scanned
This commit is contained in:
parent
222a335ddc
commit
b24ea7530f
@ -1,3 +1,7 @@
|
|||||||
server:
|
server:
|
||||||
host: "http://localhost"
|
host: "http://localhost"
|
||||||
port: "5000"
|
port: "5000"
|
||||||
|
|
||||||
|
options:
|
||||||
|
barcode:
|
||||||
|
codeid_position: Null
|
||||||
@ -1,14 +1,18 @@
|
|||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from json import dump as jdump, load as jload
|
from json import dump as jdump, load as jload
|
||||||
from os import remove
|
from os import makedirs, remove
|
||||||
from os.path import exists
|
from os.path import exists
|
||||||
from select import select as timedInput
|
from select import select as timedInput
|
||||||
from sys import stdin
|
from sys import stdin
|
||||||
|
from yaml import safe_load
|
||||||
import connection
|
import connection
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
if not exists("../logs"):
|
||||||
|
makedirs("../logs")
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
LOGGER.setLevel(logging.DEBUG)
|
LOGGER.setLevel(logging.DEBUG)
|
||||||
logFormatter = logging.Formatter(
|
logFormatter = logging.Formatter(
|
||||||
@ -23,56 +27,31 @@ consoleHandler = logging.StreamHandler()
|
|||||||
consoleHandler.setLevel(logging.DEBUG)
|
consoleHandler.setLevel(logging.DEBUG)
|
||||||
LOGGER.addHandler(consoleHandler)
|
LOGGER.addHandler(consoleHandler)
|
||||||
|
|
||||||
|
with open("config.yaml", 'r') as file:
|
||||||
|
data = safe_load(file)['options']
|
||||||
|
CODEID_POS = data['barcode']['codeid_position']
|
||||||
|
|
||||||
TEMPFILE = "scans.json"
|
TEMPFILE = "scans.json"
|
||||||
|
|
||||||
TIMEOUT = 60 # Number of seconds for a timeout after being logged in
|
TIMEOUT = 60 # Number of seconds for a timeout after being logged in
|
||||||
|
|
||||||
|
|
||||||
def main(TEMP: list = None) -> None:
|
def main() -> None:
|
||||||
while True:
|
while True:
|
||||||
user = input("Enter Login: ")
|
user = login()
|
||||||
|
if not user:
|
||||||
|
continue
|
||||||
if user == "quit":
|
if user == "quit":
|
||||||
break
|
break
|
||||||
if not connection.check_login(user):
|
|
||||||
LOGGER.debug("Login failed")
|
|
||||||
continue # Send Error that login wasn't possible
|
|
||||||
LOGGER.debug("Login successful")
|
LOGGER.debug("Login successful")
|
||||||
scanned = scanning()
|
scanning(user)
|
||||||
scanned = group_scanning(scanned)
|
|
||||||
result = connection.send_scan(user, scanned)
|
|
||||||
if result != True:
|
|
||||||
result['date'] = str(date.today())
|
|
||||||
TEMP.append(result)
|
|
||||||
if TEMP:
|
|
||||||
TEMP = group_temp(TEMP)
|
|
||||||
for bought in list(TEMP):
|
|
||||||
result = connection.send_scan(bought['user'], bought['items'], bought['date'])
|
|
||||||
TEMP.remove(bought)
|
|
||||||
if result != True:
|
|
||||||
TEMP.append(result)
|
|
||||||
with open(TEMPFILE, "w") as file:
|
|
||||||
jdump(TEMP, file)
|
|
||||||
elif exists(TEMPFILE):
|
|
||||||
remove(TEMPFILE)
|
|
||||||
LOGGER.info(TEMP)
|
|
||||||
|
|
||||||
|
def delete(scanned: list):
|
||||||
def scanning() -> list:
|
|
||||||
scan, scanned = "", []
|
|
||||||
while True:
|
|
||||||
i, _, _ = timedInput([stdin], [], [], TIMEOUT)
|
i, _, _ = timedInput([stdin], [], [], TIMEOUT)
|
||||||
if not i:
|
if not i:
|
||||||
break # send a short timeout message before break
|
return #TODO send a short timeout message before return
|
||||||
scan = stdin.readline().strip()
|
|
||||||
match scan:
|
|
||||||
case "logout":
|
|
||||||
break
|
|
||||||
case "delete":
|
|
||||||
while True:
|
|
||||||
i, _, _ = timedInput([stdin], [], [], TIMEOUT)
|
|
||||||
if not i:
|
|
||||||
break # send a short timeout message before break
|
|
||||||
scan = stdin.readline().strip()
|
scan = stdin.readline().strip()
|
||||||
|
codeid, scan = split_codeid(scan, "A")
|
||||||
match scan:
|
match scan:
|
||||||
case "delete":
|
case "delete":
|
||||||
try:
|
try:
|
||||||
@ -84,9 +63,15 @@ def scanning() -> list:
|
|||||||
scanned.remove(scan)
|
scanned.remove(scan)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
LOGGER.exception(e)
|
LOGGER.exception(e)
|
||||||
case _:
|
|
||||||
scanned.append(scan)
|
def group_scanning(scanned: list[int]) -> dict[int: int]:
|
||||||
return scanned
|
scan_dict = {}
|
||||||
|
for scan in scanned:
|
||||||
|
if scan not in scan_dict:
|
||||||
|
scan_dict[scan] = 1
|
||||||
|
else:
|
||||||
|
scan_dict[scan] += 1
|
||||||
|
return scan_dict
|
||||||
|
|
||||||
def group_temp(TEMP: list):
|
def group_temp(TEMP: list):
|
||||||
NEWTEMP = []
|
NEWTEMP = []
|
||||||
@ -105,18 +90,80 @@ def group_temp(TEMP: list):
|
|||||||
NEWTEMP.append(deepcopy(temp))
|
NEWTEMP.append(deepcopy(temp))
|
||||||
return NEWTEMP
|
return NEWTEMP
|
||||||
|
|
||||||
def group_scanning(scanned: list[int]) -> dict[int: int]:
|
def login(user: str = None):
|
||||||
scan_dict = {}
|
if not user:
|
||||||
for scan in scanned:
|
user = input("Enter Login: ")
|
||||||
if scan not in scan_dict:
|
codeid, user = split_codeid(user, "D")
|
||||||
scan_dict[scan] = 1
|
|
||||||
else:
|
else:
|
||||||
scan_dict[scan] += 1
|
codeid = "D"
|
||||||
return scan_dict
|
if codeid != "D":
|
||||||
|
return None
|
||||||
|
if not connection.check_login(user):
|
||||||
|
LOGGER.debug("Login failed")
|
||||||
|
if not ("users" in data and user in data['users']):
|
||||||
|
return None
|
||||||
|
LOGGER.debug("Using local login")
|
||||||
|
return user
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def scanning(user: str) -> dict[int: int]:
|
||||||
TEMP = None
|
scan, scanned = "", []
|
||||||
|
while True:
|
||||||
|
i, _, _ = timedInput([stdin], [], [], TIMEOUT)
|
||||||
|
if not i:
|
||||||
|
break # send a short timeout message before break
|
||||||
|
scan = stdin.readline().strip()
|
||||||
|
codeid, scan = split_codeid(scan, "A")
|
||||||
|
match codeid:
|
||||||
|
case "A":
|
||||||
|
scanned.append(scan)
|
||||||
|
case "D":
|
||||||
|
match scan:
|
||||||
|
case "logout":
|
||||||
|
break
|
||||||
|
case "delete":
|
||||||
|
delete(scanned)
|
||||||
|
case _:
|
||||||
|
altuser = login(scan)
|
||||||
|
if not altuser:
|
||||||
|
continue
|
||||||
|
LOGGER.debug("Login successful")
|
||||||
|
scanning(altuser)
|
||||||
|
break
|
||||||
|
case _:
|
||||||
|
LOGGER.debug(f"Unknown barcode scanned: {codeid}_{scan}")
|
||||||
|
scanned = group_scanning(scanned)
|
||||||
|
send_scan(user, scanned)
|
||||||
|
|
||||||
|
def send_scan(user: str, scanned: list, temp: list[dict] = []):
|
||||||
if exists(TEMPFILE):
|
if exists(TEMPFILE):
|
||||||
with open(TEMPFILE, "r") as file:
|
with open(TEMPFILE, "r") as file:
|
||||||
TEMP = jload(file)
|
temp.extend(jload(file))
|
||||||
main(TEMP)
|
result = connection.send_scan(user, scanned)
|
||||||
|
if result != True:
|
||||||
|
result['date'] = str(date.today())
|
||||||
|
temp.append(result)
|
||||||
|
if temp:
|
||||||
|
temp = group_temp(temp)
|
||||||
|
for bought in list(temp):
|
||||||
|
result = connection.send_scan(bought['user'], bought['items'], bought['date'])
|
||||||
|
temp.remove(bought)
|
||||||
|
if result != True:
|
||||||
|
temp.append(result)
|
||||||
|
if temp: # if temp still present, save it
|
||||||
|
with open(TEMPFILE, "w") as file:
|
||||||
|
jdump(temp, file)
|
||||||
|
elif exists(TEMPFILE):
|
||||||
|
remove(TEMPFILE)
|
||||||
|
LOGGER.info(temp)
|
||||||
|
|
||||||
|
def split_codeid(scan: str, default_codeid: str = ""):
|
||||||
|
match CODEID_POS:
|
||||||
|
case "prefix":
|
||||||
|
return(scan[0], scan[1:])
|
||||||
|
case "suffix":
|
||||||
|
return(scan[-1], scan[:-1])
|
||||||
|
case _:
|
||||||
|
return(default_codeid, scan)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -2,9 +2,14 @@ from database import Database
|
|||||||
from flask import Flask, abort, request
|
from flask import Flask, abort, request
|
||||||
from flask.json import jsonify
|
from flask.json import jsonify
|
||||||
from gevent.pywsgi import WSGIServer
|
from gevent.pywsgi import WSGIServer
|
||||||
|
from os import makedirs
|
||||||
|
from os.path import exists
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
if not exists("../logs"):
|
||||||
|
makedirs("../logs")
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
LOGGER.setLevel(logging.DEBUG)
|
LOGGER.setLevel(logging.DEBUG)
|
||||||
logFormatter = logging.Formatter(
|
logFormatter = logging.Formatter(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user