WIP
This commit is contained in:
parent
b323832a70
commit
6c66fe2b8b
94
db.py
94
db.py
@ -1,4 +1,5 @@
|
||||
from typing import List, Union, Dict, Optional
|
||||
from sqlite3 import IntegrityError
|
||||
from typing import List, Union, Dict, Optional, Set
|
||||
|
||||
from sqlite_utils import Database
|
||||
from sqlite_utils.db import NotFoundError
|
||||
@ -14,24 +15,34 @@ class DiscordDB:
|
||||
self._db = Database(memory=True)
|
||||
else:
|
||||
self._db = Database(self._name)
|
||||
migrate_db = False
|
||||
if "member" not in self._db.table_names():
|
||||
self._db.create_table("member", {"id": int, "name": str, "mention_number": int},
|
||||
pk="id", not_null={"id", "name", "mention_number"})
|
||||
self._db.create_table("member", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
||||
else:
|
||||
if "mention_number" in self._db.table('member').columns_dict.keys():
|
||||
members: Set[Dict[str, Union[int, str]]] = {*[]}
|
||||
for row in self._db.table('member').rows:
|
||||
members.add({'id': row['mention_number'], 'name': row['name']})
|
||||
self._db.table('member').drop()
|
||||
self._db.create_table("member", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
||||
self._db.table('member').insert_all(list(members))
|
||||
migrate_db = True
|
||||
|
||||
if "player" not in self._db.table_names():
|
||||
self._db.create_table("player", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
|
||||
|
||||
if "hunted" not in self._db.table_names():
|
||||
self._db.create_table("hunted", {"id": int, "member_id": int, "player_id": int},
|
||||
pk="id", not_null={"id", "member_id", "player_id"})
|
||||
if "hunted" not in self._db.table_names() or migrate_db:
|
||||
if migrate_db:
|
||||
self._db.table('hunted').drop()
|
||||
self._db.create_table("hunted", {"id": int, "member_id": int, "player_id": int, 'channel_id': int},
|
||||
pk="id", not_null={"id", "member_id", "player_id", "channel_id"})
|
||||
self._db['hunted'].create_index(["member_id", "player_id"], unique=True)
|
||||
|
||||
if "medals" in self._db.table_names():
|
||||
self._db['medals'].drop()
|
||||
self._db.create_table("medals",
|
||||
dict(id=int, player_id=int, battle_id=int, division_id=int, side_id=int, damage=int),
|
||||
not_null={"id", "player_id", "battle_id", "division_id", "side_id", "damage"},
|
||||
pk="id", defaults={"damage": 0})
|
||||
self._db.table('medals').drop()
|
||||
self._db.create_table("medals", dict(id=int, player_id=int, battle_id=int, division_id=int, side_id=int,
|
||||
damage=int), pk="id", defaults={"damage": 0},
|
||||
not_null={"id", "player_id", "battle_id", "division_id", "side_id", "damage"})
|
||||
self._db['medals'].create_index(["player_id", "battle_id", "division_id", "side_id"], unique=True)
|
||||
|
||||
if "hunted_players" not in self._db.view_names():
|
||||
@ -89,56 +100,45 @@ class DiscordDB:
|
||||
|
||||
# Member methods
|
||||
|
||||
def get_member(self, mention_number: int = None, local_id: int = None) -> Dict[str, Union[int, str]]:
|
||||
def get_member(self, member_id) -> Dict[str, Union[int, str]]:
|
||||
"""Get discord member
|
||||
|
||||
:param mention_number: int Discord Member ID
|
||||
:type mention_number: int
|
||||
:param local_id: Query user by local id
|
||||
:type local_id: int
|
||||
:param member_id: int Discord Member ID
|
||||
:type member_id: int
|
||||
:return: local id, name, mention number if discord member exists else None
|
||||
:rtype: Union[Dict[str, Union[int, str]], None]
|
||||
"""
|
||||
if local_id:
|
||||
return self.member.get(local_id)
|
||||
elif mention_number:
|
||||
try:
|
||||
return next(self.member.rows_where("mention_number = ?", [mention_number]))
|
||||
except StopIteration:
|
||||
raise NotFoundError("Member with given kwargs not found")
|
||||
else:
|
||||
raise NotFoundError("mention_number or local_id must be provided!")
|
||||
return self.member.get(member_id)
|
||||
except NotFoundError:
|
||||
raise NotFoundError("Member with given id not found")
|
||||
|
||||
def add_member(self, mention_number: int, name: str) -> int:
|
||||
def add_member(self, id: int, name: str) -> int:
|
||||
"""Add discord member.
|
||||
|
||||
:param mention_number: int Discord member ID
|
||||
:param id: int Discord member ID
|
||||
:param name: Discord member Name
|
||||
"""
|
||||
return self.member.insert({"mention_number": mention_number, "name": name}).last_pk
|
||||
try:
|
||||
return self.member.insert({"id": id, "name": name}).last_pk
|
||||
except IntegrityError:
|
||||
return id
|
||||
|
||||
def update_member(self, local_id: int, name: str = "", mention_number: int = None) -> bool:
|
||||
def update_member(self, member_id: int, name: str) -> bool:
|
||||
"""Update discord member"s record
|
||||
|
||||
:param mention_number: Discord Mention ID
|
||||
:type mention_number: int Discord Mention ID
|
||||
:param member_id: Discord Mention ID
|
||||
:type member_id: int Discord Mention ID
|
||||
:param name: Discord user name
|
||||
:type name: str Discord user name
|
||||
:param local_id: Local id of Discord mention ID
|
||||
:type local_id: int
|
||||
:return: bool
|
||||
"""
|
||||
member = self.get_member(local_id=local_id)
|
||||
if member and (name or mention_number):
|
||||
if name and not mention_number:
|
||||
try:
|
||||
member = self.get_member(member_id)
|
||||
except NotFoundError:
|
||||
member = self.member.get(self.add_member(member_id, name))
|
||||
self.member.update(member["id"], {"name": name})
|
||||
elif mention_number and not name:
|
||||
self.member.update(member["id"], {"mention_number": mention_number})
|
||||
else:
|
||||
self.member.update(member["id"], {"name": name, "mention_number": mention_number})
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def check_medal(self, pid: int, bid: int, div: int, side: int, dmg: int) -> bool:
|
||||
"""Check if players (pid) damage (dmg) in battle (bid) for side in division (div) has been registered
|
||||
@ -177,11 +177,11 @@ class DiscordDB:
|
||||
except StopIteration:
|
||||
return False
|
||||
|
||||
def add_hunted_player(self, pid: int, member_id: int) -> bool:
|
||||
def add_hunted_player(self, pid: int, member_id: int, channel_id: int) -> bool:
|
||||
if self.check_hunt(pid, member_id):
|
||||
return False
|
||||
else:
|
||||
self.hunted.insert({"player_id": pid, "member_id": member_id})
|
||||
self.hunted.insert(dict(player_id=pid, member_id=member_id, channel_id=channel_id))
|
||||
return True
|
||||
|
||||
def remove_hunted_player(self, pid: int, member_id: int) -> bool:
|
||||
@ -191,12 +191,16 @@ class DiscordDB:
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_member_hunted_players(self, member_id: int) -> List[Dict[str, Union[int, str]]]:
|
||||
return [self.get_player(r['player_id']) for r in self.hunted.rows_where("member_id=?",
|
||||
(self.get_member(member_id)['id'], ))]
|
||||
|
||||
def get_hunted_player_ids(self) -> List[int]:
|
||||
return [r["player_id"] for r in self.hunted_players.rows]
|
||||
|
||||
def get_members_to_notify(self, pid: int) -> List[int]:
|
||||
members = []
|
||||
for row in self.hunted.rows_where("player_id = ?", [pid]):
|
||||
member = self.get_member(local_id=row["member_id"])
|
||||
members.append(member["mention_number"])
|
||||
member = self.get_member(row["member_id"])
|
||||
members.append(member["id"])
|
||||
return members
|
||||
|
@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from json import JSONDecodeError
|
||||
@ -10,10 +11,13 @@ import pytz
|
||||
import requests
|
||||
from discord.ext import commands
|
||||
from dotenv import load_dotenv
|
||||
from sqlite_utils.db import NotFoundError
|
||||
|
||||
from db import DiscordDB
|
||||
|
||||
load_dotenv()
|
||||
logging.basicConfig(level=logging.WARNING, filename="discord_bot.log",
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
|
||||
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN")
|
||||
DB_NAME = os.getenv('DB_NAME', 'discord.db')
|
||||
@ -52,7 +56,7 @@ __last_battle_update_timestamp = 0
|
||||
def get_battle_page():
|
||||
global __last_battle_update_timestamp, __last_battle_request
|
||||
if int(datetime.datetime.now().timestamp()) >= __last_battle_update_timestamp + 60:
|
||||
r = requests.get('https://www.erepublik.com/en/military/campaignsJson/list').json()
|
||||
r = requests.get('https://erep.lv/battles.json').json()
|
||||
__last_battle_request = r
|
||||
__last_battle_update_timestamp = r.get('last_updated', int(datetime.datetime.now().timestamp()))
|
||||
return __last_battle_request
|
||||
@ -114,8 +118,8 @@ class MyClient(discord.Client):
|
||||
|
||||
async def report_hunted_medals(self):
|
||||
await self.wait_until_ready()
|
||||
|
||||
while not self.is_closed():
|
||||
try:
|
||||
r = get_battle_page()
|
||||
hunted_ids = DB.get_hunted_player_ids()
|
||||
for bid, battle in r.get('battles', {}).items():
|
||||
@ -127,8 +131,8 @@ class MyClient(discord.Client):
|
||||
medal_key = (pid, battle['id'], div['div'], battle[side]['id'], side_data['damage'])
|
||||
if not DB.check_medal(*medal_key):
|
||||
for member in DB.get_members_to_notify(pid):
|
||||
|
||||
format_data = dict(author=member, player=DB.get_player(pid)['name'], battle=bid,
|
||||
format_data = dict(author=member, player=DB.get_player(pid)['name'],
|
||||
battle=bid,
|
||||
region=battle.get('region').get('name'),
|
||||
division=div['div'], dmg=side_data['damage'],
|
||||
side=COUNTRIES[battle[side]['id']])
|
||||
@ -141,10 +145,15 @@ class MyClient(discord.Client):
|
||||
DB.add_reported_medal(*medal_key)
|
||||
sleep_seconds = r.get('last_updated') + 60 - self.timestamp
|
||||
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
|
||||
except Exception as e:
|
||||
await self.get_channel(603527159109124096).send("<@220849530730577920> Something bad has happened with"
|
||||
" medal hunter!")
|
||||
logging.error("Discord bot's eRepublik medal hunter died!", exc_info=e)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
client = MyClient()
|
||||
loop = asyncio.get_event_loop()
|
||||
client = MyClient(loop=loop)
|
||||
bot = commands.Bot(command_prefix='!')
|
||||
|
||||
|
||||
@ -186,7 +195,7 @@ if __name__ == "__main__":
|
||||
|
||||
@bot.command(description="Parādīt lētos SH, kuru dmg ir zem 50k vai Tevis ievadīta vērtībā", help="Lētie SH",
|
||||
category="Cheap medals")
|
||||
async def sh(ctx, min_damage: int = 50000):
|
||||
async def sh(ctx, min_damage: int = 50_000):
|
||||
await _send_medal_info(ctx, 11, min_damage)
|
||||
|
||||
|
||||
@ -227,13 +236,29 @@ if __name__ == "__main__":
|
||||
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
|
||||
else:
|
||||
player_name = DB.get_player(player_id).get('name')
|
||||
try:
|
||||
local_member_id = DB.get_member(ctx.author.id).get('id')
|
||||
except NotFoundError:
|
||||
local_member_id = DB.add_member(ctx.author.id, ctx.author.name)
|
||||
if DB.add_hunted_player(player_id, local_member_id):
|
||||
await ctx.send(f"{ctx.author.mention} You'll be notified for all **{player_name}** medals")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You are already being notified for all **{player_name}** medals")
|
||||
|
||||
|
||||
@bot.command(description="Informēt par spēlētāja mēģinājumiem ņemt medaļas",
|
||||
help="Piereģistrēties uz spēlētāja medaļu paziņošanu", category="Hunting")
|
||||
async def my_hunt(ctx):
|
||||
msgs = []
|
||||
for hunt in DB.get_member_hunted_players(ctx.author.id):
|
||||
msgs.append(f"`{hunt['id']}` - **{hunt['name']}**")
|
||||
if msgs:
|
||||
msg = "\n".join(msgs)
|
||||
await ctx.send(f"{ctx.author.mention} You are hunting:\n{msg}")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You're not hunting anyone!")
|
||||
|
||||
|
||||
@bot.command(description="Beigt informēt par spēlētāja mēģinājumiem ņemt medaļas",
|
||||
help="Atreģistrēties no spēlētāja medaļu paziņošanas", category="Hunting")
|
||||
async def remove_hunt(ctx, player_id: int):
|
||||
@ -243,12 +268,11 @@ if __name__ == "__main__":
|
||||
player_name = DB.get_player(player_id).get('name')
|
||||
local_member_id = DB.get_member(ctx.author.id).get('id')
|
||||
if DB.remove_hunted_player(player_id, local_member_id):
|
||||
await ctx.send(f"{ctx.author.mention} You won't be notified for all **{player_name}** medals")
|
||||
await ctx.send(f"{ctx.author.mention} You won't be notified for **{player_name}** medals")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You weren't being notified for **{player_name}** medals")
|
||||
await ctx.send(f"{ctx.author.mention} You were not hunting **{player_name}** medals")
|
||||
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(bot.start(DISCORD_TOKEN))
|
||||
loop.create_task(client.start(DISCORD_TOKEN))
|
||||
loop.run_forever()
|
||||
|
0
telegram_bot.py
Normal file
0
telegram_bot.py
Normal file
0
telegram_keyboard.py
Normal file
0
telegram_keyboard.py
Normal file
40
tests.py
40
tests.py
@ -10,20 +10,16 @@ class TestDatabase(unittest.TestCase):
|
||||
self.db = DiscordDB()
|
||||
|
||||
def test_member(self):
|
||||
member = {'mention_number': 1, 'name': 'username'}
|
||||
member.update(id=self.db.add_member(**member))
|
||||
self.assertRaises(NotFoundError, self.db.get_member)
|
||||
self.assertRaises(NotFoundError, self.db.get_member, mention_number=100)
|
||||
self.assertEqual(self.db.get_member(local_id=member['id']), member)
|
||||
self.assertEqual(self.db.get_member(mention_number=member['mention_number']), member)
|
||||
self.assertFalse(self.db.update_member(member['id']))
|
||||
member.update(name="Success", mention_number=2)
|
||||
self.assertTrue(self.db.update_member(member['id'], name=member['name'],
|
||||
mention_number=member['mention_number']))
|
||||
self.assertTrue(self.db.update_member(member['id'], name=member['name']))
|
||||
self.assertTrue(self.db.update_member(member['id'], mention_number=member['mention_number']))
|
||||
self.assertEqual(self.db.get_member(local_id=member['id']), member)
|
||||
self.assertEqual(self.db.get_member(mention_number=member['mention_number']), member)
|
||||
member = {'id': 1200, 'name': 'username'}
|
||||
self.db.add_member(**member)
|
||||
self.assertEqual(self.db.add_member(**member), member['id'])
|
||||
|
||||
self.assertRaises(NotFoundError, self.db.get_member, member_id=100)
|
||||
self.assertEqual(self.db.get_member(member_id=member['id']), member)
|
||||
|
||||
member.update(name="Success")
|
||||
self.assertTrue(self.db.update_member(member['id'], member['name']))
|
||||
self.assertEqual(self.db.get_member(member_id=member['id']), member)
|
||||
|
||||
def test_player(self):
|
||||
player = {'id': 1, 'name': 'plato'}
|
||||
@ -45,20 +41,20 @@ class TestDatabase(unittest.TestCase):
|
||||
self.assertTrue(self.db.delete_medals([kwargs['bid']]))
|
||||
|
||||
def test_hunt(self):
|
||||
member_id = self.db.add_member(mention_number=2, name="username")
|
||||
member_id2 = self.db.add_member(mention_number=3, name="username")
|
||||
member_id3 = self.db.add_member(mention_number=4, name="username")
|
||||
member_id = self.db.add_member(2, name="one")
|
||||
member_id2 = self.db.add_member(3, name="two")
|
||||
member_id3 = self.db.add_member(4, name="three")
|
||||
self.db.add_player(1, 'plato')
|
||||
self.db.add_player(2, 'draco')
|
||||
self.assertFalse(self.db.check_hunt(1, member_id))
|
||||
self.assertFalse(self.db.remove_hunted_player(1, member_id))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id2))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id3))
|
||||
self.assertTrue(self.db.add_hunted_player(2, member_id))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id, 123))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id2, 234))
|
||||
self.assertTrue(self.db.add_hunted_player(1, member_id3, 345))
|
||||
self.assertTrue(self.db.add_hunted_player(2, member_id, 456))
|
||||
self.assertListEqual(self.db.get_hunted_player_ids(), [1, 2])
|
||||
self.assertListEqual(self.db.get_members_to_notify(1), [2, 3, 4])
|
||||
self.assertListEqual(self.db.get_members_to_notify(2), [2])
|
||||
self.assertFalse(self.db.add_hunted_player(1, member_id))
|
||||
self.assertFalse(self.db.add_hunted_player(1, member_id, 567))
|
||||
self.assertTrue(self.db.check_hunt(1, member_id))
|
||||
self.assertTrue(self.db.remove_hunted_player(1, member_id))
|
Loading…
x
Reference in New Issue
Block a user