205 lines
7.7 KiB
Python
205 lines
7.7 KiB
Python
from collections import namedtuple
|
|
from sqlite3 import IntegrityError
|
|
from typing import List, Union, Dict, Optional, Set
|
|
|
|
from sqlite_utils import Database
|
|
from sqlite_utils.db import NotFoundError
|
|
|
|
|
|
class DiscordDB:
|
|
_name: str
|
|
_db: Database
|
|
|
|
def __init__(self, db_name: str = ""):
|
|
self._name = db_name
|
|
if not self._name:
|
|
self._db = Database(memory=True)
|
|
else:
|
|
self._db = Database(self._name)
|
|
migrate_db = False
|
|
if "member" not in self._db.table_names():
|
|
self._db.create_table("member", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
|
else:
|
|
if "mention_number" in self._db.table('member').columns_dict.keys():
|
|
memb = namedtuple('Member', ['id', 'name'])
|
|
members: Set['memb'] = {*[]}
|
|
for row in self._db.table('member').rows:
|
|
members.add(memb(row['mention_number'], row['name']))
|
|
self._db.table('member').drop()
|
|
self._db.create_table("member", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
|
self._db.table('member').insert_all([m._asdict() for m in members])
|
|
migrate_db = True
|
|
|
|
if "player" not in self._db.table_names():
|
|
self._db.create_table("player", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
|
|
|
if "hunted" not in self._db.table_names() or migrate_db:
|
|
if migrate_db:
|
|
self._db.table('hunted').drop()
|
|
self._db.create_table("hunted", {"id": int, "member_id": int, "player_id": int, 'channel_id': int},
|
|
pk="id", not_null={"id", "member_id", "player_id", "channel_id"})
|
|
self._db['hunted'].create_index(["member_id", "player_id"], unique=True)
|
|
|
|
if "medals" in self._db.table_names():
|
|
self._db.table('medals').drop()
|
|
self._db.create_table("medals", dict(id=int, player_id=int, battle_id=int, division_id=int, side_id=int,
|
|
damage=int), pk="id", defaults={"damage": 0},
|
|
not_null={"id", "player_id", "battle_id", "division_id", "side_id", "damage"})
|
|
self._db['medals'].create_index(["player_id", "battle_id", "division_id", "side_id"], unique=True)
|
|
|
|
if "hunted_players" not in self._db.view_names():
|
|
self._db.create_view("hunted_players", "select distinct player_id from hunted")
|
|
|
|
self._db.add_foreign_keys([("hunted", "member_id", "member", "id"),
|
|
("hunted", "player_id", "player", "id"),
|
|
("medals", "player_id", "player", "id")])
|
|
self._db.vacuum()
|
|
|
|
self.member = self._db.table("member")
|
|
self.player = self._db.table("player")
|
|
self.hunted = self._db.table("hunted")
|
|
self.medals = self._db.table("medals")
|
|
self.hunted_players = self._db.table("hunted_players")
|
|
|
|
# Player methods
|
|
|
|
def get_player(self, pid: int) -> Optional[Dict[str, Union[int, str]]]:
|
|
"""Get Player
|
|
|
|
:param pid: int Player ID
|
|
:return: player id, name if player exists
|
|
"""
|
|
try:
|
|
return self.player.get(pid)
|
|
except NotFoundError:
|
|
return None
|
|
|
|
def add_player(self, pid: int, name: str) -> bool:
|
|
"""Add player.
|
|
|
|
:param pid: int Player ID
|
|
:param name: Player Name
|
|
:return: bool Player added
|
|
"""
|
|
if not self.get_player(pid):
|
|
self.player.insert({"id": pid, "name": name})
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def update_player(self, pid: int, name: str) -> bool:
|
|
"""Update player"s record
|
|
|
|
:param pid: int Player ID
|
|
:param name: Player Name
|
|
:return: bool
|
|
"""
|
|
if self.get_player(pid):
|
|
self.player.update(pid, {"name": name})
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
# Member methods
|
|
|
|
def get_member(self, member_id) -> Dict[str, Union[int, str]]:
|
|
"""Get discord member
|
|
|
|
:param member_id: int Discord Member ID
|
|
:type member_id: int
|
|
:return: local id, name, mention number if discord member exists else None
|
|
:rtype: Union[Dict[str, Union[int, str]], None]
|
|
"""
|
|
try:
|
|
return self.member.get(member_id)
|
|
except NotFoundError:
|
|
raise NotFoundError("Member with given id not found")
|
|
|
|
def add_member(self, id: int, name: str) -> Dict[str, Union[int, str]]:
|
|
"""Add discord member.
|
|
|
|
:param id: int Discord member ID
|
|
:param name: Discord member Name
|
|
"""
|
|
try:
|
|
self.member.insert({"id": id, "name": name})
|
|
finally:
|
|
return self.member.get(id)
|
|
|
|
def update_member(self, member_id: int, name: str) -> bool:
|
|
"""Update discord member"s record
|
|
|
|
:param member_id: Discord Mention ID
|
|
:type member_id: int Discord Mention ID
|
|
:param name: Discord user name
|
|
:type name: str Discord user name
|
|
:return: bool
|
|
"""
|
|
try:
|
|
member = self.get_member(member_id)
|
|
except NotFoundError:
|
|
member = self.add_member(member_id, name)
|
|
self.member.update(member["id"], {"name": name})
|
|
return True
|
|
|
|
def check_medal(self, pid: int, bid: int, div: int, side: int, dmg: int) -> bool:
|
|
"""Check if players (pid) damage (dmg) in battle (bid) for side in division (div) has been registered
|
|
|
|
:param pid: Player ID
|
|
:type pid: int
|
|
:param bid: Battle ID
|
|
:type bid: int
|
|
:param div: Division
|
|
:type div: int
|
|
:param side: Side ID
|
|
:type side: int
|
|
:param dmg: Damage amount
|
|
:type dmg: int
|
|
:return: If medal has been registered
|
|
:rtype: bool
|
|
"""
|
|
medals = self.medals
|
|
record_pk = medals.lookup(dict(player_id=pid, battle_id=bid, division_id=div, side_id=side))
|
|
return medals.get(record_pk)["damage"] == dmg
|
|
|
|
def add_reported_medal(self, pid: int, bid: int, div: int, side: int, dmg: int):
|
|
medals = self.medals
|
|
pk = medals.lookup(dict(player_id=pid, battle_id=bid, division_id=div, side_id=side))
|
|
medals.update(pk, {"damage": dmg})
|
|
return True
|
|
|
|
def delete_medals(self, bid: List[int]):
|
|
self.medals.delete_where("battle_id in (%s)" % "?" * len(bid), bid)
|
|
return True
|
|
|
|
def check_hunt(self, pid: int, member_id: int) -> bool:
|
|
try:
|
|
next(self.hunted.rows_where("player_id=? and member_id=?", [pid, member_id]))
|
|
return True
|
|
except StopIteration:
|
|
return False
|
|
|
|
def add_hunted_player(self, pid: int, member_id: int, channel_id: int) -> bool:
|
|
if self.check_hunt(pid, member_id):
|
|
return False
|
|
else:
|
|
self.hunted.insert(dict(player_id=pid, member_id=member_id, channel_id=channel_id))
|
|
return True
|
|
|
|
def remove_hunted_player(self, pid: int, member_id: int) -> bool:
|
|
if self.check_hunt(pid, member_id):
|
|
self.hunted.delete_where("player_id=? and member_id=?", (pid, member_id))
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_member_hunted_players(self, member_id: int) -> List[Dict[str, Union[int, str]]]:
|
|
return [self.get_player(r['player_id']) for r in self.hunted.rows_where("member_id=?",
|
|
(self.get_member(member_id)['id'], ))]
|
|
|
|
def get_hunted_player_ids(self) -> List[int]:
|
|
return [r["player_id"] for r in self.hunted_players.rows]
|
|
|
|
def get_members_to_notify(self, pid: int) -> List[Dict[str, Union[int, str]]]:
|
|
return [r for r in self.hunted.rows_where("player_id = ?", [pid])]
|