Medal protection
This commit is contained in:
parent
1f9e816577
commit
29b1d980ac
105
db.py
105
db.py
@ -1,6 +1,4 @@
|
||||
from collections import namedtuple
|
||||
from sqlite3 import IntegrityError
|
||||
from typing import List, Union, Dict, Optional, Set
|
||||
from typing import List, Union, Dict, Optional
|
||||
|
||||
from sqlite_utils import Database
|
||||
from sqlite_utils.db import NotFoundError
|
||||
@ -40,9 +38,26 @@ class DiscordDB:
|
||||
if "hunted_players" not in self._db.view_names():
|
||||
self._db.create_view("hunted_players", "select distinct player_id from hunted")
|
||||
|
||||
if "protected" not in self._db.table_names() or migrate_db:
|
||||
self._db.create_table("protected", {"id": int, "member_id": int, "player_id": int, 'channel_id': int},
|
||||
pk="id", not_null={"id", "member_id", "player_id", "channel_id"})
|
||||
self._db['protected'].create_index(["member_id", "player_id"], unique=True)
|
||||
|
||||
if "protected_medals" not in self._db.table_names():
|
||||
self._db.create_table("protected_medals",
|
||||
dict(id=int, player_id=int, division_id=int, side_id=int), pk="id",
|
||||
not_null={"id", "player_id", "division_id", "side_id"})
|
||||
self._db['protected_medals'].create_index(["player_id", "division_id", "side_id"], unique=True)
|
||||
|
||||
if "protected_players" not in self._db.view_names():
|
||||
self._db.create_view("protected_players", "select distinct player_id from protected")
|
||||
|
||||
self._db.add_foreign_keys([("hunted", "member_id", "member", "id"),
|
||||
("hunted", "player_id", "player", "id"),
|
||||
("medals", "player_id", "player", "id")])
|
||||
("protected", "member_id", "member", "id"),
|
||||
("protected", "player_id", "player", "id"),
|
||||
("medals", "player_id", "player", "id"),
|
||||
("protected_medals", "player_id", "player", "id")])
|
||||
self._db.vacuum()
|
||||
|
||||
self.member = self._db.table("member")
|
||||
@ -50,6 +65,9 @@ class DiscordDB:
|
||||
self.hunted = self._db.table("hunted")
|
||||
self.medals = self._db.table("medals")
|
||||
self.hunted_players = self._db.table("hunted_players")
|
||||
self.protected = self._db.table("protected")
|
||||
self.protected_medals = self._db.table("protected_medals")
|
||||
self.protected_players = self._db.table("protected_players")
|
||||
|
||||
# Player methods
|
||||
|
||||
@ -192,3 +210,82 @@ class DiscordDB:
|
||||
|
||||
def get_members_to_notify(self, pid: int) -> List[Dict[str, Union[int, str]]]:
|
||||
return [r for r in self.hunted.rows_where("player_id = ?", [pid])]
|
||||
|
||||
'''' MEDAL PROTECTION '''
|
||||
|
||||
def check_protected_medal(self, pid: int, div: int, side: int) -> Optional[bool]:
|
||||
"""Check if player (pid) in battle (bid) for side in division (div) hasn't taken protected medal
|
||||
|
||||
:param pid: Player ID
|
||||
:type pid: int
|
||||
:param bid: Battle ID
|
||||
:type bid: int
|
||||
:param div: Division
|
||||
:type div: int
|
||||
:param side: Side ID
|
||||
:type side: int
|
||||
:return: If medal has been registered
|
||||
:rtype: bool
|
||||
"""
|
||||
try:
|
||||
medal = next(self.protected_medals.rows_where("division_id=? and side_id=?", (div, side)))
|
||||
except StopIteration:
|
||||
return None
|
||||
return medal['player_id'] == pid
|
||||
|
||||
def add_protected_medal(self, pid: int, div: int, side: int):
|
||||
"""Check if players (pid) medal in division (div) for side (sid) has been registered
|
||||
|
||||
:param pid: Player ID
|
||||
:type pid: int
|
||||
:param div: Division
|
||||
:type div: int
|
||||
:param side: Side ID
|
||||
:type side: int
|
||||
"""
|
||||
self.protected_medals.lookup(dict(player_id=pid, division_id=div, side_id=side))
|
||||
|
||||
def get_protected_medal(self, div: int, side: int):
|
||||
""" Get player_id (pid) in division (div) for side (sid)
|
||||
|
||||
:param div: Division
|
||||
:type div: int
|
||||
:param side: Side ID
|
||||
:type side: int
|
||||
"""
|
||||
pk = self.protected_medals.lookup(dict(division_id=div, side_id=side))
|
||||
return self.protected_medals.get(pk)
|
||||
|
||||
def delete_protected_medals(self, div_id: List[int]):
|
||||
self.protected_medals.delete_where("division_id in (%s)" % "?" * len(div_id), div_id)
|
||||
return True
|
||||
|
||||
def check_protected(self, pid: int, member_id: int) -> bool:
|
||||
try:
|
||||
next(self.protected.rows_where("player_id=? and member_id=?", [pid, member_id]))
|
||||
return True
|
||||
except StopIteration:
|
||||
return False
|
||||
|
||||
def add_protected_player(self, pid: int, member_id: int, channel_id: int) -> bool:
|
||||
if self.check_protected(pid, member_id):
|
||||
return False
|
||||
else:
|
||||
self.protected.insert(dict(player_id=pid, member_id=member_id, channel_id=channel_id))
|
||||
return True
|
||||
|
||||
def remove_protected_player(self, pid: int, member_id: int) -> bool:
|
||||
if self.check_protected(pid, member_id):
|
||||
self.protected.delete_where("player_id=? and member_id=?", (pid, member_id))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_member_protected_players(self, member_id: int) -> List[Dict[str, Union[int, str]]]:
|
||||
return [self.get_player(r['player_id']) for r in self.protected.rows_where("member_id=?", (member_id, ))]
|
||||
|
||||
def get_protected_player_ids(self) -> List[int]:
|
||||
return [r["player_id"] for r in self.protected_players.rows]
|
||||
|
||||
def get_protected_members_to_notify(self, pid: int) -> List[Dict[str, Union[int, str]]]:
|
||||
return [r for r in self.protected.rows_where("player_id = ?", [pid])]
|
||||
|
167
discord_bot.py
167
discord_bot.py
@ -3,9 +3,7 @@ import datetime
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from json import JSONDecodeError
|
||||
from typing import Dict, Set
|
||||
|
||||
import discord
|
||||
import pytz
|
||||
@ -93,7 +91,7 @@ def check_player(player_id: int) -> bool:
|
||||
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{player_id}').json()
|
||||
except JSONDecodeError:
|
||||
return False
|
||||
if r.get('error') or not r.get('status'):
|
||||
if r.get('error'):
|
||||
return False
|
||||
DB.add_player(player_id, r.get('citizen').get('name'))
|
||||
|
||||
@ -106,7 +104,7 @@ def get_medals(division: int):
|
||||
request_time = timestamp_to_datetime(r.get('last_updated'))
|
||||
for battle_id, battle in r.get('battles').items():
|
||||
start_time = timestamp_to_datetime(battle.get('start'))
|
||||
if start_time < request_time:
|
||||
if start_time - datetime.timedelta(seconds=30) < request_time:
|
||||
for division_data in battle.get('div', {}).values():
|
||||
if not division_data.get('end') and division_data.get('div') == division:
|
||||
for side, stat in division_data['stats'].items():
|
||||
@ -121,13 +119,11 @@ def get_medals(division: int):
|
||||
|
||||
class MyClient(discord.Client):
|
||||
erep_tz = pytz.timezone('US/Pacific')
|
||||
hunted: Dict[int, Set[discord.Member]] = defaultdict(set)
|
||||
player_mapping: Dict[int, str] = {}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# create the background task and run it in the background
|
||||
self.bg_task = self.loop.create_task(self.report_hunted_medals())
|
||||
self.bg_task = self.loop.create_task(self.report_medals())
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
@ -135,50 +131,114 @@ class MyClient(discord.Client):
|
||||
|
||||
async def on_ready(self):
|
||||
print('Client loaded')
|
||||
# print(self.user.name)
|
||||
# print(self.user.id)
|
||||
print('------')
|
||||
|
||||
async def report_hunted_medals(self):
|
||||
async def report_medals(self):
|
||||
await self.wait_until_ready()
|
||||
while not self.is_closed():
|
||||
try:
|
||||
r = get_battle_page()
|
||||
hunted_ids = DB.get_hunted_player_ids()
|
||||
protected_ids = DB.get_protected_player_ids()
|
||||
for bid, battle in r.get('battles', {}).items():
|
||||
for div in battle.get('div', {}).values():
|
||||
if div['stats'] and not div['end']:
|
||||
for side, side_data in div['stats'].items():
|
||||
if side_data and side_data['citizenId'] in hunted_ids:
|
||||
if side_data:
|
||||
pid = side_data['citizenId']
|
||||
medal_key = (pid, battle['id'], div['div'], battle[side]['id'], side_data['damage'])
|
||||
if not DB.check_medal(*medal_key):
|
||||
for hunt_row in DB.get_members_to_notify(pid):
|
||||
format_data = dict(author=hunt_row['member_id'], player=DB.get_player(pid)['name'],
|
||||
battle=bid,
|
||||
region=battle.get('region').get('name'),
|
||||
division=div['div'], dmg=side_data['damage'],
|
||||
side=COUNTRIES[battle[side]['id']])
|
||||
if pid in hunted_ids:
|
||||
hunted_medal_key = (pid, battle['id'], div['id'],
|
||||
battle[side]['id'], side_data['damage'])
|
||||
if not DB.check_medal(*hunted_medal_key):
|
||||
for hunt_row in DB.get_members_to_notify(pid):
|
||||
format_data = dict(author=hunt_row['member_id'],
|
||||
player=DB.get_player(pid)['name'],
|
||||
battle=bid,
|
||||
region=battle.get('region').get('name'),
|
||||
division=div['div'], dmg=side_data['damage'],
|
||||
side=COUNTRIES[battle[side]['id']])
|
||||
|
||||
await self.get_channel(hunt_row['channel_id']).send(
|
||||
"<@{author}> **{player}** detected in battle for {region} on {side} side in d{division} with {dmg:,d}dmg\n"
|
||||
await self.get_channel(hunt_row['channel_id']).send(
|
||||
"<@{author}> **{player}** detected in battle for {region} on {side} "
|
||||
"side in d{division} with {dmg:,d}dmg\n"
|
||||
"https://www.erepublik.com/en/military/battlefield/{battle}".format(
|
||||
**format_data))
|
||||
DB.add_reported_medal(*hunted_medal_key)
|
||||
|
||||
protected_medal_key = (pid, div['id'], battle[side]['id'])
|
||||
protected_medal_status = DB.check_protected_medal(*protected_medal_key)
|
||||
if protected_medal_status == False:
|
||||
medal = DB.get_protected_medal(div['id'], battle[side]['id'])
|
||||
for protected in DB.get_protected_members_to_notify(medal['player_id']):
|
||||
await self.get_channel(protected['channel_id']).send(
|
||||
"<@{author}> Medal for **{player}** in battle for {region} on"
|
||||
" {side} side in d{division} has been taken!\n"
|
||||
"https://www.erepublik.com/en/military/battlefield/{battle}".format(
|
||||
**format_data)
|
||||
)
|
||||
DB.add_reported_medal(*medal_key)
|
||||
author=protected['member_id'],
|
||||
player=DB.get_player(medal['player_id'])['name'],
|
||||
battle=bid, region=battle.get('region').get('name'),
|
||||
division=div['div'], side=COUNTRIES[battle[side]['id']]
|
||||
))
|
||||
DB.delete_protected_medals([medal['division_id']])
|
||||
else:
|
||||
if protected_medal_status is None and pid in protected_ids:
|
||||
DB.add_protected_medal(*protected_medal_key)
|
||||
logger.info(f"Added medal for protection {protected_medal_key}")
|
||||
sleep_seconds = r.get('last_updated') + 60 - self.timestamp
|
||||
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
|
||||
except Exception as e:
|
||||
await self.get_channel(603527159109124096).send("<@220849530730577920> Something bad has happened with"
|
||||
" medal hunter!")
|
||||
logger.error("Discord bot's eRepublik medal hunter died!", exc_info=e)
|
||||
logger.error("Discord bot's eRepublik medal watcher died!", exc_info=e)
|
||||
try:
|
||||
with open(f"{self.timestamp}.json", 'w') as f:
|
||||
f.write(r.text)
|
||||
except NameError:
|
||||
logger.error("There was no Response object!", exc_info=e)
|
||||
await asyncio.sleep(10)
|
||||
|
||||
#
|
||||
# async def report_protected_medals(self):
|
||||
# await self.wait_until_ready()
|
||||
# while not self.is_closed():
|
||||
# try:
|
||||
# r = get_battle_page()
|
||||
# protected_ids = DB.get_protected_player_ids()
|
||||
# for bid, battle in r.get('battles', {}).items():
|
||||
# for div in battle.get('div', {}).values():
|
||||
# if div['stats'] and not div['end']:
|
||||
# for side, side_data in div['stats'].items():
|
||||
# if side_data and side_data['citizenId'] in protected_ids:
|
||||
# pid = side_data['citizenId']
|
||||
# medal_key = (pid, div['id'], battle[side]['id'])
|
||||
# if not DB.check_protected_medal(*medal_key):
|
||||
# for protected in DB.get_protected_members_to_notify(pid):
|
||||
# format_data = dict(author=protected['member_id'],
|
||||
# player=DB.get_player(pid)['name'],
|
||||
# battle=bid,
|
||||
# region=battle.get('region').get('name'),
|
||||
# division=div['div'],
|
||||
# side=COUNTRIES[battle[side]['id']])
|
||||
#
|
||||
# await self.get_channel(protected['channel_id']).send(
|
||||
# "<@{author}> Medal for **{player}** in battle for {region} on {side} "
|
||||
# "side in d{division} has been taken!\n"
|
||||
# "https://www.erepublik.com/en/military/battlefield/{battle}".format(
|
||||
# **format_data)
|
||||
# )
|
||||
# DB.add_protected_medal(*medal_key)
|
||||
# logger.info(f"Added medal for protection {medal_key}")
|
||||
# sleep_seconds = r.get('last_updated') + 60 - self.timestamp
|
||||
# await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
|
||||
# except Exception as e:
|
||||
# await self.get_channel(603527159109124096).send(
|
||||
# "<@220849530730577920> Something bad has happened with medal protector!")
|
||||
# logger.error("Discord bot's eRepublik medal protector error!", exc_info=e)
|
||||
# try:
|
||||
# with open(f"{self.timestamp}.json", 'w') as f:
|
||||
# f.write(r.text)
|
||||
# except NameError:
|
||||
# logger.error("There was no Response object!", exc_info=e)
|
||||
# await asyncio.sleep(10)
|
||||
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
@ -186,7 +246,6 @@ client = MyClient(loop=loop)
|
||||
bot = commands.Bot(command_prefix='!')
|
||||
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
print('Bot loaded')
|
||||
@ -274,8 +333,8 @@ async def hunt(ctx, player_id: int):
|
||||
await ctx.send(f"{ctx.author.mention} You are already being notified for **{player_name}** medals")
|
||||
|
||||
|
||||
@bot.command(description="Informēt par spēlētāja mēģinājumiem ņemt medaļas",
|
||||
help="Piereģistrēties uz spēlētāja medaļu paziņošanu", category="Hunting")
|
||||
@bot.command(description="Show list of hunted players",
|
||||
help="Parādīt visus spēlētajus, kurus es medīju", category="Hunting")
|
||||
async def my_hunt(ctx):
|
||||
msgs = []
|
||||
for hunted_player in DB.get_member_hunted_players(ctx.author.id):
|
||||
@ -304,6 +363,56 @@ async def remove_hunt(ctx, player_id: int):
|
||||
await ctx.send(f"{ctx.author.mention} You were not hunting **{player_name}** medals")
|
||||
|
||||
|
||||
@bot.command(description="Informēt par mēģinājiem nozagt medaļu",
|
||||
help="Piereģistrēties uz medaļu sargāšanas paziņošanu", category="Protection")
|
||||
async def protect(ctx, player_id: int):
|
||||
if not check_player(player_id):
|
||||
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
|
||||
else:
|
||||
player_name = DB.get_player(player_id).get('name')
|
||||
try:
|
||||
local_member_id = DB.get_member(ctx.author.id).get('id')
|
||||
except NotFoundError:
|
||||
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
|
||||
if ctx.channel.type.value == 1:
|
||||
await ctx.send(f"{ctx.author.mention}, sorry, but currently I'm unable to notify You in DM channel!")
|
||||
elif DB.add_protected_player(player_id, local_member_id, ctx.channel.id):
|
||||
await ctx.send(f"{ctx.author.mention} You'll be notified in this channel when anyone passes "
|
||||
f"**{player_name}**'s medals")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You are already being notified for **{player_name}** medals")
|
||||
|
||||
|
||||
@bot.command(description="Show players whose medals I'm protecting",
|
||||
help="Parādīt sargājamo spēlētāju sarakstu ", category="Protection")
|
||||
async def my_protected(ctx):
|
||||
msgs = []
|
||||
for protected_player in DB.get_member_protected_players(ctx.author.id):
|
||||
msgs.append(f"`{protected_player['id']}` - **{protected_player['name']}**")
|
||||
if msgs:
|
||||
msg = "\n".join(msgs)
|
||||
await ctx.send(f"{ctx.author.mention} You are protecting:\n{msg}")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You're not protecting anyone!")
|
||||
|
||||
|
||||
@bot.command(description="Beigt informēt par spēlētāju mēģinājumiem noņemt medaļas",
|
||||
help="Atreģistrēties no spēlētāja medaļu sargāšanas paziņošanas", category="Protection")
|
||||
async def remove_protection(ctx, player_id: int):
|
||||
if not check_player(player_id):
|
||||
await ctx.send(f"{ctx.author.mention} didn't find any player with `id: {player_id}`!")
|
||||
else:
|
||||
player_name = DB.get_player(player_id).get('name')
|
||||
try:
|
||||
local_member_id = DB.get_member(ctx.author.id).get('id')
|
||||
except NotFoundError:
|
||||
local_member_id = DB.add_member(ctx.author.id, ctx.author.name).get('id')
|
||||
if DB.remove_protected_player(player_id, local_member_id):
|
||||
await ctx.send(f"{ctx.author.mention} You won't be notified for **{player_name}** medals")
|
||||
else:
|
||||
await ctx.send(f"{ctx.author.mention} You were not protecting **{player_name}** medals")
|
||||
|
||||
|
||||
@hunt.error
|
||||
@remove_hunt.error
|
||||
async def hunt_error(ctx, error):
|
||||
|
31
tests.py
31
tests.py
@ -66,3 +66,34 @@ class TestDatabase(unittest.TestCase):
|
||||
self.assertTrue(self.db.check_hunt(1, member_1['id']))
|
||||
self.assertTrue(self.db.remove_hunted_player(1, member_1['id']))
|
||||
self.assertFalse(self.db.check_hunt(1, member_1['id']))
|
||||
|
||||
'''' MEDAL PROTECTION '''
|
||||
def test_protected_medal(self):
|
||||
medal_data = {"pid": 4229720, "div": 7799071, "side": 71}
|
||||
self.assertFalse(self.db.check_protected_medal(**medal_data))
|
||||
self.assertIsNone(self.db.add_protected_medal(**medal_data))
|
||||
self.assertTrue(self.db.check_protected_medal(**medal_data))
|
||||
self.assertFalse(self.db.check_protected_medal(2, medal_data['div'], medal_data['side']))
|
||||
self.assertTrue(self.db.delete_protected_medals([medal_data['div']]))
|
||||
|
||||
def test_protection(self):
|
||||
member = self.db.add_member(2, name="one")
|
||||
self.db.add_player(2, 'plato')
|
||||
self.db.add_player(1620414, 'inpoc1')
|
||||
|
||||
self.assertFalse(self.db.check_protected(2, member['id']))
|
||||
self.assertFalse(self.db.remove_protected_player(2, member['id']))
|
||||
protected_player_1 = {'id': 1, "member_id": member['id'], 'player_id': 1620414, 'channel_id': 123}
|
||||
self.assertTrue(self.db.add_protected_player(
|
||||
protected_player_1['player_id'], protected_player_1['member_id'], protected_player_1['channel_id']
|
||||
))
|
||||
protected_player_2 = {'id': 2, "member_id": member['id'], 'player_id': 2, 'channel_id': 123}
|
||||
self.assertTrue(self.db.add_protected_player(
|
||||
protected_player_2['player_id'], protected_player_2['member_id'], protected_player_2['channel_id']
|
||||
))
|
||||
|
||||
protected_player_ids = [2, 1620414]
|
||||
self.assertListEqual(self.db.get_protected_player_ids(), protected_player_ids)
|
||||
self.assertListEqual(self.db.get_protected_members_to_notify(1620414), [protected_player_1])
|
||||
self.assertListEqual(self.db.get_protected_members_to_notify(2), [protected_player_2])
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user