Updated to only report epics

Requirement update

Requirement update
This commit is contained in:
Eriks K 2021-07-17 00:03:12 +03:00
parent bf8899b8fb
commit 9f23253232
9 changed files with 70 additions and 525 deletions

3
.gitignore vendored
View File

@ -1,4 +1,5 @@
venv
*.db
.env
__pycache__
__pycache__
.idea

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt
COPY . /app
CMD python discord_bot.py

192
db.py
View File

@ -1,4 +1,4 @@
from typing import List, Union, Dict, Optional
from typing import Union, Dict, Optional
from sqlite_utils import Database
from sqlite_utils.db import NotFoundError
@ -14,60 +14,20 @@ class DiscordDB:
self._db = Database(memory=True)
else:
self._db = Database(self._name)
migrate_db = False
if "member" not in self._db.table_names():
self._db.create_table("member", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
if "player" not in self._db.table_names():
self._db.create_table("player", {"id": int, "name": str}, pk="id", not_null={"id", "name"})
if "hunted" not in self._db.table_names() or migrate_db:
if migrate_db:
self._db.table('hunted').drop()
self._db.create_table("hunted", {"id": int, "member_id": int, "player_id": int, 'channel_id': int},
pk="id", not_null={"id", "member_id", "player_id", "channel_id"})
self._db['hunted'].create_index(["member_id", "player_id"], unique=True)
if "epic" not in self._db.table_names():
self._db.create_table("epic", {"id": int, }, pk="id", not_null={"id"})
if "medals" in self._db.table_names():
self._db.table('medals').drop()
self._db.create_table("medals", dict(id=int, player_id=int, battle_id=int, division_id=int, side_id=int,
damage=int), pk="id", defaults={"damage": 0},
not_null={"id", "player_id", "battle_id", "division_id", "side_id", "damage"})
self._db['medals'].create_index(["player_id", "battle_id", "division_id", "side_id"], unique=True)
if "hunted_players" not in self._db.view_names():
self._db.create_view("hunted_players", "select distinct player_id from hunted")
if "protected" not in self._db.table_names() or migrate_db:
self._db.create_table("protected", {"id": int, "member_id": int, "player_id": int, 'channel_id': int},
pk="id", not_null={"id", "member_id", "player_id", "channel_id"})
self._db['protected'].create_index(["member_id", "player_id"], unique=True)
if "protected_medals" not in self._db.table_names():
self._db.create_table("protected_medals",
dict(id=int, player_id=int, division_id=int, side_id=int), pk="id",
not_null={"id", "player_id", "division_id", "side_id"})
self._db['protected_medals'].create_index(["player_id", "division_id", "side_id"], unique=True)
if "protected_players" not in self._db.view_names():
self._db.create_view("protected_players", "select distinct player_id from protected")
self._db.add_foreign_keys([("hunted", "member_id", "member", "id"),
("hunted", "player_id", "player", "id"),
("protected", "member_id", "member", "id"),
("protected", "player_id", "player", "id"),
("medals", "player_id", "player", "id"),
("protected_medals", "player_id", "player", "id")])
self._db.vacuum()
self.member = self._db.table("member")
self.player = self._db.table("player")
self.hunted = self._db.table("hunted")
self.medals = self._db.table("medals")
self.hunted_players = self._db.table("hunted_players")
self.protected = self._db.table("protected")
self.protected_medals = self._db.table("protected_medals")
self.protected_players = self._db.table("protected_players")
self.epic = self._db.table("epic")
# Player methods
@ -150,142 +110,26 @@ class DiscordDB:
self.member.update(member["id"], {"name": name})
return True
def check_medal(self, pid: int, bid: int, div: int, side: int, dmg: int) -> bool:
"""Check if players (pid) damage (dmg) in battle (bid) for side in division (div) has been registered
# Epic Methods
:param pid: Player ID
:type pid: int
:param bid: Battle ID
:type bid: int
:param div: Division
:type div: int
:param side: Side ID
:type side: int
:param dmg: Damage amount
:type dmg: int
:return: If medal has been registered
:rtype: bool
"""
medals = self.medals
record_pk = medals.lookup(dict(player_id=pid, battle_id=bid, division_id=div, side_id=side))
return medals.get(record_pk)["damage"] == dmg
def get_epic(self, division_id: int) -> Optional[Dict[str, Union[int, str]]]:
"""Get Epic division
def add_reported_medal(self, pid: int, bid: int, div: int, side: int, dmg: int):
medals = self.medals
pk = medals.lookup(dict(player_id=pid, battle_id=bid, division_id=div, side_id=side))
medals.update(pk, {"damage": dmg})
return True
def delete_medals(self, bid: List[int]):
self.medals.delete_where("battle_id in (%s)" % "?" * len(bid), bid)
return True
def check_hunt(self, pid: int, member_id: int) -> bool:
try:
next(self.hunted.rows_where("player_id=? and member_id=?", [pid, member_id]))
return True
except StopIteration:
return False
def add_hunted_player(self, pid: int, member_id: int, channel_id: int) -> bool:
if self.check_hunt(pid, member_id):
return False
else:
self.hunted.insert(dict(player_id=pid, member_id=member_id, channel_id=channel_id))
return True
def remove_hunted_player(self, pid: int, member_id: int) -> bool:
if self.check_hunt(pid, member_id):
self.hunted.delete_where("player_id=? and member_id=?", (pid, member_id))
return True
else:
return False
def get_member_hunted_players(self, member_id: int) -> List[Dict[str, Union[int, str]]]:
return [self.get_player(r['player_id']) for r in self.hunted.rows_where("member_id=?",
(self.get_member(member_id)['id'], ))]
def get_hunted_player_ids(self) -> List[int]:
return [r["player_id"] for r in self.hunted_players.rows]
def get_members_to_notify(self, pid: int) -> List[Dict[str, Union[int, str]]]:
return [r for r in self.hunted.rows_where("player_id = ?", [pid])]
'''' MEDAL PROTECTION '''
def check_protected_medal(self, pid: int, div: int, side: int) -> Optional[bool]:
"""Check if player (pid) in battle (bid) for side in division (div) hasn't taken protected medal
:param pid: Player ID
:type pid: int
:param bid: Battle ID
:type bid: int
:param div: Division
:type div: int
:param side: Side ID
:type side: int
:return: If medal has been registered
:rtype: bool
:param division_id: int Division ID
:return: division id
"""
try:
medal = next(self.protected_medals.rows_where("division_id=? and side_id=?", (div, side)))
except StopIteration:
return self.epic.get(division_id)
except NotFoundError:
return None
return medal['player_id'] == pid
def add_protected_medal(self, pid: int, div: int, side: int):
"""Check if players (pid) medal in division (div) for side (sid) has been registered
def add_epic(self, division_id: int) -> bool:
"""Add Epic division.
:param pid: Player ID
:type pid: int
:param div: Division
:type div: int
:param side: Side ID
:type side: int
:param division_id: int Epic division ID
:return: bool Epic division added
"""
self.protected_medals.lookup(dict(player_id=pid, division_id=div, side_id=side))
def get_protected_medal(self, div: int, side: int):
""" Get player_id (pid) in division (div) for side (sid)
:param div: Division
:type div: int
:param side: Side ID
:type side: int
"""
pk = self.protected_medals.lookup(dict(division_id=div, side_id=side))
return self.protected_medals.get(pk)
def delete_protected_medals(self, div_id: List[int]):
self.protected_medals.delete_where("division_id in (%s)" % "?" * len(div_id), div_id)
return True
def check_protected(self, pid: int, member_id: int) -> bool:
try:
next(self.protected.rows_where("player_id=? and member_id=?", [pid, member_id]))
if not self.get_epic(division_id):
self.epic.insert({"id": division_id})
return True
except StopIteration:
return False
def add_protected_player(self, pid: int, member_id: int, channel_id: int) -> bool:
if self.check_protected(pid, member_id):
return False
else:
self.protected.insert(dict(player_id=pid, member_id=member_id, channel_id=channel_id))
return True
def remove_protected_player(self, pid: int, member_id: int) -> bool:
if self.check_protected(pid, member_id):
self.protected.delete_where("player_id=? and member_id=?", (pid, member_id))
return True
else:
return False
def get_member_protected_players(self, member_id: int) -> List[Dict[str, Union[int, str]]]:
return [self.get_player(r['player_id']) for r in self.protected.rows_where("member_id=?", (member_id, ))]
def get_protected_player_ids(self) -> List[int]:
return [r["player_id"] for r in self.protected_players.rows]
def get_protected_members_to_notify(self, pid: int) -> List[Dict[str, Union[int, str]]]:
return [r for r in self.protected.rows_where("player_id = ?", [pid])]
return False

View File

@ -4,13 +4,12 @@ import logging
import os
import sys
from json import JSONDecodeError
from typing import Union
import discord
import pytz
import requests
from discord.ext import commands
from dotenv import load_dotenv
from sqlite_utils.db import NotFoundError
from db import DiscordDB
@ -28,28 +27,16 @@ fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
keep_fds = [fh.stream.fileno()]
pidfile = f"pid"
pidfile = "pid"
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN")
DEFAULT_CHANNEL_ID = os.getenv("DEFAULT_CHANNEL_ID", 603527159109124096)
ADMIN_ID = os.getenv("DEFAULT_CHANNEL_ID", 220849530730577920)
DB_NAME = os.getenv('DB_NAME', 'discord.db')
DB = DiscordDB(DB_NAME)
COUNTRIES = {1: 'Romania', 9: 'Brazil', 10: 'Italy', 11: 'France', 12: 'Germany', 13: 'Hungary', 14: 'China',
15: 'Spain', 23: 'Canada', 24: 'USA', 26: 'Mexico', 27: 'Argentina', 28: 'Venezuela', 29: 'United Kingdom',
30: 'Switzerland', 31: 'Netherlands', 32: 'Belgium', 33: 'Austria', 34: 'Czech Republic', 35: 'Poland',
36: 'Slovakia', 37: 'Norway', 38: 'Sweden', 39: 'Finland', 40: 'Ukraine', 41: 'Russia', 42: 'Bulgaria',
43: 'Turkey', 44: 'Greece', 45: 'Japan', 47: 'South Korea', 48: 'India', 49: 'Indonesia', 50: 'Australia',
51: 'South Africa', 52: 'Republic of Moldova', 53: 'Portugal', 54: 'Ireland', 55: 'Denmark', 56: 'Iran',
57: 'Pakistan', 58: 'Israel', 59: 'Thailand', 61: 'Slovenia', 63: 'Croatia', 64: 'Chile', 65: 'Serbia',
66: 'Malaysia', 67: 'Philippines', 68: 'Singapore', 69: 'Bosnia and Herzegovina', 70: 'Estonia',
71: 'Latvia', 72: 'Lithuania', 73: 'North Korea', 74: 'Uruguay', 75: 'Paraguay', 76: 'Bolivia', 77: 'Peru',
78: 'Colombia', 79: 'Republic of Macedonia (FYROM)', 80: 'Montenegro', 81: 'Republic of China (Taiwan)',
82: 'Cyprus', 83: 'Belarus', 84: 'New Zealand', 164: 'Saudi Arabia', 165: 'Egypt',
166: 'United Arab Emirates', 167: 'Albania', 168: 'Georgia', 169: 'Armenia', 170: 'Nigeria', 171: 'Cuba'}
FLAGS = {1: 'flag_ro', 9: 'flag_br', 10: 'flag_it', 11: 'flag_fr', 12: 'flag_de', 13: 'flag_hu', 14: 'flag_cn',
15: 'flag_es', 23: 'flag_ca', 24: 'flag_us', 26: 'flag_mx', 27: 'flag_ar', 28: 'flag_ve', 29: 'flag_gb',
30: 'flag_ch', 31: 'flag_nl', 32: 'flag_be', 33: 'flag_at', 34: 'flag_cz', 35: 'flag_pl', 36: 'flag_sk',
@ -62,6 +49,7 @@ FLAGS = {1: 'flag_ro', 9: 'flag_br', 10: 'flag_it', 11: 'flag_fr', 12: 'flag_de'
82: 'flag_cy', 83: 'flag_by', 84: 'flag_nz', 164: 'flag_sa', 165: 'flag_eg', 166: 'flag_ae', 167: 'flag_al',
168: 'flag_ge', 169: 'flag_am', 170: 'flag_ng', 171: 'flag_cu'}
MENTION_MAPPING = {1: "@D1", 2: "@D2", 3: "@D3", 4: "@D4", 11: "@AIR"}
__last_battle_response = None
__last_battle_update_timestamp = 0
@ -71,67 +59,33 @@ def timestamp_to_datetime(timestamp: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(timestamp)
def s_to_human(seconds: Union[int, float]) -> str:
seconds = int(seconds)
h = seconds // 3600
m = (seconds - (h * 3600)) // 60
s = seconds % 60
return f'{h:01d}:{m:02d}:{s:02d}'
def get_battle_page():
global __last_battle_update_timestamp, __last_battle_response
if int(datetime.datetime.now().timestamp()) >= __last_battle_update_timestamp + 60:
dt = datetime.datetime.now()
r = requests.get('https://erep.lv/battles.json')
os.makedirs(f"{dt:%F/%H}/", exist_ok=True)
with open(f"{dt:%F/%H}/{int(dt.timestamp())}.json", 'w') as f:
f.write(r.text)
try:
__last_battle_response = r.json()
except JSONDecodeError:
logger.warning(f"Received non json response from erep.lv/battles.json! "
f"Located at '{dt:%F/%H}/{int(dt.timestamp())}.json'")
logger.warning("Received non json response from erep.lv/battles.json!")
return get_battle_page()
__last_battle_update_timestamp = __last_battle_response.get('last_updated', int(dt.timestamp()))
return __last_battle_response
def check_player(player_id: int) -> bool:
try:
player_id = int(player_id)
except ValueError:
return False
if not DB.get_player(player_id):
try:
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{player_id}').json()
except JSONDecodeError:
return False
if r.get('error'):
return False
DB.add_player(player_id, r.get('citizen').get('name'))
return True
def get_medals(division: int):
r = get_battle_page()
if r.get('battles'):
request_time = timestamp_to_datetime(r.get('last_updated'))
for battle_id, battle in r.get('battles').items():
start_time = timestamp_to_datetime(battle.get('start'))
if start_time - datetime.timedelta(seconds=30) < request_time:
for division_data in battle.get('div', {}).values():
if not division_data.get('end') and division_data.get('div') == division:
for side, stat in division_data['stats'].items():
data = dict(id=battle.get('id'), country_id=battle.get(side).get('id'),
time=request_time - start_time, dmg=0)
if stat:
data.update(dmg=division_data['stats'][side]['damage'])
yield data
else:
yield data
class MyClient(discord.Client):
erep_tz = pytz.timezone('US/Pacific')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# create the background task and run it in the background
self.bg_task = self.loop.create_task(self.report_medals())
self.bg_task = self.loop.create_task(self.report_epics())
@property
def timestamp(self):
@ -142,77 +96,38 @@ class MyClient(discord.Client):
print('------')
async def on_error(self, event_method, *args, **kwargs):
logger.warning('Ignoring exception in {}'.format(event_method))
logger.warning(f'Ignoring exception in {event_method}')
async def report_medals(self):
async def report_epics(self):
await self.wait_until_ready()
while not self.is_closed():
try:
r = get_battle_page()
hunted_ids = DB.get_hunted_player_ids()
protected_ids = DB.get_protected_player_ids()
if not isinstance(r.get('battles'), dict):
sleep_seconds = r.get('last_updated') + 60 - self.timestamp
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
continue
for bid, battle in r.get('battles', {}).items():
for div in battle.get('div', {}).values():
if div['stats'] and not div['end']:
for side, side_data in div['stats'].items():
if side_data:
pid = side_data['citizenId']
if pid in hunted_ids:
hunted_medal_key = (pid, battle['id'], div['id'],
battle[side]['id'], side_data['damage'])
if not DB.check_medal(*hunted_medal_key):
for hunt_row in DB.get_members_to_notify(pid):
format_data = dict(author=hunt_row['member_id'],
player=DB.get_player(pid)['name'],
battle=bid,
region=battle.get('region').get('name'),
division=div['div'], dmg=side_data['damage'],
side=COUNTRIES[battle[side]['id']])
if div.get('epic') and not DB.get_epic(div.get('div')):
await self.get_channel(DEFAULT_CHANNEL_ID).send(
f"<@{MENTION_MAPPING[div['div']]}> Epic battle! Round time {s_to_human(self.timestamp - battle['start'])}\n"
f"https://www.erepublik.com/en/military/battlefield/{battle['id']}")
DB.add_epic(div.get('div'))
await self.get_channel(hunt_row['channel_id']).send(
"<@{author}> **{player}** detected in battle for {region} on {side} "
"side in d{division} with {dmg:,d}dmg\n"
"https://www.erepublik.com/en/military/battlefield/{battle}".format(
**format_data))
DB.add_reported_medal(*hunted_medal_key)
protected_medal_key = (pid, div['id'], battle[side]['id'])
protected_medal_status = DB.check_protected_medal(*protected_medal_key)
if protected_medal_status == False:
medal = DB.get_protected_medal(div['id'], battle[side]['id'])
for protected in DB.get_protected_members_to_notify(medal['player_id']):
await self.get_channel(protected['channel_id']).send(
"<@{author}> Medal for **{player}** in battle for {region} on"
" {side} side in d{division} has been taken!\n"
"https://www.erepublik.com/en/military/battlefield/{battle}".format(
author=protected['member_id'],
player=DB.get_player(medal['player_id'])['name'],
battle=bid, region=battle.get('region').get('name'),
division=div['div'], side=COUNTRIES[battle[side]['id']]
))
DB.delete_protected_medals([medal['division_id']])
else:
if protected_medal_status is None and pid in protected_ids:
DB.add_protected_medal(*protected_medal_key)
logger.info(f"Added medal for protection {protected_medal_key}")
sleep_seconds = r.get('last_updated') + 60 - self.timestamp
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
except Exception as e:
await self.get_channel(603527159109124096).send("<@220849530730577920> Something bad has happened with"
" medal hunter!")
logger.error("Discord bot's eRepublik medal watcher died!", exc_info=e)
await self.get_channel(DEFAULT_CHANNEL_ID).send(
f"<@{ADMIN_ID}> Something bad has happened with epic notifier!")
logger.error("Discord bot's eRepublik epic watcher died!", exc_info=e)
try:
with open(f"{self.timestamp}.json", 'w') as f:
f.write(r.text)
except NameError:
logger.error("There was no Response object!", exc_info=e)
await asyncio.sleep(10)
await self.get_channel(603527159109124096).send("<@220849530730577920> I've stopped, please restart")
await self.get_channel(DEFAULT_CHANNEL_ID).send(f"<@{ADMIN_ID}> I've stopped, please restart")
loop = asyncio.get_event_loop()
@ -228,172 +143,13 @@ async def on_ready():
print('------')
@bot.command(description="Parādīt lētos d1 BH, kuru dmg ir zem 5m vai Tevis ievadīta vērtībā", help="Lētie d1 BH",
category="Cheap medals")
async def bh1(ctx, max_damage: int = 5_000_000):
await _send_medal_info(ctx, 1, max_damage)
@bot.command(description="Parādīt lētos d2 BH, kuru dmg ir zem 10m vai Tevis ievadīta vērtībā", help="Lētie d2 BH",
category="Cheap medals")
async def bh2(ctx, max_damage: int = 10_000_000):
await _send_medal_info(ctx, 2, max_damage)
@bot.command(description="Parādīt lētos d3 BH, kuru dmg ir zem 15m vai Tevis ievadīta vērtībā", help="Lētie d3 BH",
category="Cheap medals")
async def bh3(ctx, max_damage: int = 15_000_000):
await _send_medal_info(ctx, 3, max_damage)
@bot.command(description="Parādīt lētos d4 BH, kuru dmg ir zem 50m vai Tevis ievadīta vērtībā", help="Lētie d4 BH",
category="Cheap medals")
async def bh4(ctx, max_damage: int = 50_000_000):
await _send_medal_info(ctx, 4, max_damage)
@bot.command(description="Parādīt lētos SH, kuru dmg ir zem 50k vai Tevis ievadīta vērtībā", help="Lētie SH",
category="Cheap medals")
async def sh(ctx, min_damage: int = 50_000):
await _send_medal_info(ctx, 11, min_damage)
@bh1.error
@bh2.error
@bh3.error
@bh4.error
@sh.error
async def damage_error(ctx, error):
if isinstance(error, commands.BadArgument):
await ctx.send('Damage vērtībai ir jābūt veselam skaitlim')
async def _send_medal_info(ctx, division: int, damage: int):
cheap_bhs = [] # Battle id, Side, damage, round time
for division_data in get_medals(division):
if division_data['dmg'] < damage:
division_data['flag'] = FLAGS[division_data['country_id']]
division_data['country'] = COUNTRIES[division_data['country_id']]
cheap_bhs.append(division_data)
if cheap_bhs:
cheap_bhs = sorted(cheap_bhs, key=lambda _: _['time'])
cheap_bhs.reverse()
msg = "\n".join(["{dmg:,d}dmg for :{flag}: {country}, {time} round time "
"https://www.erepublik.com/en/military/battlefield/{id}".format(**bh) for bh in cheap_bhs])
if len(msg) > 2000:
msg = "\n".join(msg[:2000].split('\n')[:-1])
await ctx.send(msg)
@bot.command()
async def kill(ctx):
if ctx.author.id == ADMIN_ID:
await ctx.send(f"{ctx.author.mention} Bye!")
sys.exit(1)
else:
await ctx.send("No medals under {:,d} damage found!".format(damage))
@bot.command(description="Informēt par spēlētāja mēģinājumiem ņemt medaļas",
help="Piereģistrēties uz spēlētāja medaļu paziņošanu", category="Hunting")
async def hunt(ctx, player_id: int):
if not check_player(player_id):
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
else:
player_name = DB.get_player(player_id).get('name')
try:
local_member_id = DB.get_member(ctx.author.id).get('id')
except NotFoundError:
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
if ctx.channel.type.value == 1:
await ctx.send(f"{ctx.author.mention}, sorry, but currently I'm unable to notify You in DM channel!")
elif DB.add_hunted_player(player_id, local_member_id, ctx.channel.id):
await ctx.send(f"{ctx.author.mention} You'll be notified for **{player_name}** medals in this channel")
else:
await ctx.send(f"{ctx.author.mention} You are already being notified for **{player_name}** medals")
@bot.command(description="Show list of hunted players",
help="Parādīt visus spēlētajus, kurus es medīju", category="Hunting")
async def my_hunt(ctx):
msgs = []
for hunted_player in DB.get_member_hunted_players(ctx.author.id):
msgs.append(f"`{hunted_player['id']}` - **{hunted_player['name']}**")
if msgs:
msg = "\n".join(msgs)
await ctx.send(f"{ctx.author.mention} You are hunting:\n{msg}")
else:
await ctx.send(f"{ctx.author.mention} You're not hunting anyone!")
@bot.command(description="Beigt informēt par spēlētāja mēģinājumiem ņemt medaļas",
help="Atreģistrēties no spēlētāja medaļu paziņošanas", category="Hunting")
async def remove_hunt(ctx, player_id: int):
if not check_player(player_id):
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
else:
player_name = DB.get_player(player_id).get('name')
try:
local_member_id = DB.get_member(ctx.author.id).get('id')
except NotFoundError:
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
if DB.remove_hunted_player(player_id, local_member_id):
await ctx.send(f"{ctx.author.mention} You won't be notified for **{player_name}** medals")
else:
await ctx.send(f"{ctx.author.mention} You were not hunting **{player_name}** medals")
@bot.command(description="Informēt par mēģinājiem nozagt medaļu",
help="Piereģistrēties uz medaļu sargāšanas paziņošanu", category="Protection")
async def protect(ctx, player_id: int):
if not check_player(player_id):
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
else:
player_name = DB.get_player(player_id).get('name')
try:
local_member_id = DB.get_member(ctx.author.id).get('id')
except NotFoundError:
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
if ctx.channel.type.value == 1:
await ctx.send(f"{ctx.author.mention}, sorry, but currently I'm unable to notify You in DM channel!")
elif DB.add_protected_player(player_id, local_member_id, ctx.channel.id):
await ctx.send(f"{ctx.author.mention} You'll be notified in this channel when anyone passes "
f"**{player_name}**'s medals")
else:
await ctx.send(f"{ctx.author.mention} You are already being notified for **{player_name}** medals")
@bot.command(description="Show players whose medals I'm protecting",
help="Parādīt sargājamo spēlētāju sarakstu ", category="Protection")
async def my_protected(ctx):
msgs = []
for protected_player in DB.get_member_protected_players(ctx.author.id):
msgs.append(f"`{protected_player['id']}` - **{protected_player['name']}**")
if msgs:
msg = "\n".join(msgs)
await ctx.send(f"{ctx.author.mention} You are protecting:\n{msg}")
else:
await ctx.send(f"{ctx.author.mention} You're not protecting anyone!")
@bot.command(description="Beigt informēt par spēlētāju mēģinājumiem noņemt medaļas",
help="Atreģistrēties no spēlētāja medaļu sargāšanas paziņošanas", category="Protection")
async def remove_protection(ctx, player_id: int):
if not check_player(player_id):
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
else:
player_name = DB.get_player(player_id).get('name')
try:
local_member_id = DB.get_member(ctx.author.id).get('id')
except NotFoundError:
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
if DB.remove_protected_player(player_id, local_member_id):
await ctx.send(f"{ctx.author.mention} You won't be notified for **{player_name}** medals")
else:
await ctx.send(f"{ctx.author.mention} You were not protecting **{player_name}** medals")
@hunt.error
@remove_hunt.error
@protect.error
@remove_protection.error
async def hunt_error(ctx, error):
if isinstance(error, commands.BadArgument):
await ctx.send('spēlētāja identifikators jāpadod kā skaitliska vērtība, piemēram, 1620414')
await ctx.send(f"Labs mēģinājums! Mani nogalināt var tikai <@{ADMIN_ID}>")
def main():
@ -405,6 +161,3 @@ def main():
if __name__ == "__main__":
main()
# daemon = daemonize.Daemonize(APP_NAME, pidfile, main)
# daemon.start()

View File

@ -1,6 +1,4 @@
discord.py
requests
pytz
python-dotenv
sqlite_utils
daemonize
discord.py==1.7.3
requests==2.26.0
python-dotenv==0.18.0
sqlite_utils==3.12

3
run.sh
View File

@ -1,10 +1,11 @@
#!/bin/bash
python -m venv venv
source venv/bin/activate
echo "Checking queries..."
python -m unittest
echo "Starting Discord bot..."
python discord_bot.py &
sleep 10
disown -h %1
sleep 10
echo "Done!"

View File

View File

View File

@ -33,67 +33,8 @@ class TestDatabase(unittest.TestCase):
self.assertTrue(self.db.update_player(player["id"], player["name"]))
self.assertEqual(self.db.get_player(player['id']), player)
def test_medal(self):
kwargs = {"pid": 1, "bid": 235837, "div": 4, "side": 71, "dmg": 1}
self.assertFalse(self.db.check_medal(**kwargs))
self.assertTrue(self.db.add_reported_medal(**kwargs))
self.assertTrue(self.db.check_medal(**kwargs))
self.assertTrue(self.db.delete_medals([kwargs['bid']]))
def test_hunt(self):
member_1 = self.db.add_member(2, name="one")
member_2 = self.db.add_member(3, name="two")
member_3 = self.db.add_member(4, name="three")
self.db.add_player(1, 'plato')
self.db.add_player(2, 'draco')
self.assertFalse(self.db.check_hunt(1, member_1['id']))
self.assertFalse(self.db.remove_hunted_player(1, member_1['id']))
player_1_hunt = [{'id': 1, "member_id": member_1['id'], 'player_id': 1, 'channel_id': 123},
{'id': 2, "member_id": member_2['id'], 'player_id': 1, 'channel_id': 234},
{'id': 3, "member_id": member_3['id'], 'player_id': 1, 'channel_id': 345}]
for hunt in player_1_hunt:
self.assertTrue(self.db.add_hunted_player(hunt['player_id'], hunt['member_id'], hunt['channel_id']))
player_2_hunt = [{'id': 4, "member_id": member_1['id'], 'player_id': 2, 'channel_id': 456}]
for hunt in player_2_hunt:
self.assertTrue(self.db.add_hunted_player(hunt['player_id'], hunt['member_id'], hunt['channel_id']))
self.assertListEqual(self.db.get_hunted_player_ids(), [1, 2])
self.assertListEqual(self.db.get_members_to_notify(1), player_1_hunt)
self.assertListEqual(self.db.get_members_to_notify(2), player_2_hunt)
self.assertFalse(self.db.add_hunted_player(1, member_1['id'], 567))
self.assertTrue(self.db.check_hunt(1, member_1['id']))
self.assertTrue(self.db.remove_hunted_player(1, member_1['id']))
self.assertFalse(self.db.check_hunt(1, member_1['id']))
'''' MEDAL PROTECTION '''
def test_protected_medal(self):
medal_data = {"pid": 4229720, "div": 7799071, "side": 71}
self.assertFalse(self.db.check_protected_medal(**medal_data))
self.assertIsNone(self.db.add_protected_medal(**medal_data))
self.assertTrue(self.db.check_protected_medal(**medal_data))
self.assertFalse(self.db.check_protected_medal(2, medal_data['div'], medal_data['side']))
self.assertTrue(self.db.delete_protected_medals([medal_data['div']]))
def test_protection(self):
member = self.db.add_member(2, name="one")
self.db.add_player(2, 'plato')
self.db.add_player(1620414, 'inpoc1')
self.assertFalse(self.db.check_protected(2, member['id']))
self.assertFalse(self.db.remove_protected_player(2, member['id']))
protected_player_1 = {'id': 1, "member_id": member['id'], 'player_id': 1620414, 'channel_id': 123}
self.assertTrue(self.db.add_protected_player(
protected_player_1['player_id'], protected_player_1['member_id'], protected_player_1['channel_id']
))
protected_player_2 = {'id': 2, "member_id": member['id'], 'player_id': 2, 'channel_id': 123}
self.assertTrue(self.db.add_protected_player(
protected_player_2['player_id'], protected_player_2['member_id'], protected_player_2['channel_id']
))
protected_player_ids = [2, 1620414]
self.assertListEqual(self.db.get_protected_player_ids(), protected_player_ids)
self.assertListEqual(self.db.get_protected_members_to_notify(1620414), [protected_player_1])
self.assertListEqual(self.db.get_protected_members_to_notify(2), [protected_player_2])
def test_epic(self):
self.assertFalse(self.db.get_epic(123456))
self.assertTrue(self.db.add_epic(123456))
self.assertFalse(self.db.add_epic(123456))
self.assertTrue(self.db.get_epic(123456))