This commit is contained in:
Ēriks K 2021-08-26 10:30:03 +03:00
parent 2221a30fae
commit ce9b1691c3
3 changed files with 142 additions and 100 deletions

View File

@ -255,6 +255,18 @@ events = [
re.compile(r"New taxes for (?P<product>[\w ]+) were proposed"), re.compile(r"New taxes for (?P<product>[\w ]+) were proposed"),
"{current_country} proposed new taxes for {product}", "{current_country} proposed new taxes for {product}",
), ),
EventKind(
"new_welcome_message_proposed",
"New Welcome message has been proposed",
re.compile(r"President of (?P<country>{country}) proposed a new welcome message for new citizens"),
"{country} proposed new welcome message!",
),
EventKind(
"new_welcome_message_approved",
"New Welcome message has been approved",
re.compile(r"(?P<country>{country}) now has a new welcoming message for new citizens"),
"{country} approved new welcome message!",
),
] ]
UTF_FLAG = { UTF_FLAG = {

View File

@ -1,23 +1,22 @@
import asyncio import asyncio
import datetime import datetime
import json
import logging import logging
import os import os
import sys import sys
import time import time
from json import JSONDecodeError from json import JSONDecodeError
from operator import itemgetter
from typing import Union
import discord import discord
import feedparser import feedparser
import pytz import pytz
import requests import requests
from constants import UTF_FLAG, events
from db import DiscordDB
from discord.ext import commands from discord.ext import commands
from erepublik.constants import COUNTRIES from erepublik.constants import COUNTRIES
from constants import events
from db import DiscordDB
from dbot.utils import timestamp, check_battles
APP_NAME = "discord_bot" APP_NAME = "discord_bot"
os.chdir(os.path.abspath(os.path.dirname(sys.argv[0]))) os.chdir(os.path.abspath(os.path.dirname(sys.argv[0])))
@ -60,22 +59,6 @@ __last_battle_response = None
__last_battle_update_timestamp = 0 __last_battle_update_timestamp = 0
def timestamp_to_datetime(timestamp: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(timestamp)
def timestamp_now() -> int:
return int(datetime.datetime.now().timestamp())
def s_to_human(seconds: Union[int, float]) -> str:
seconds = int(seconds)
h = seconds // 3600
m = (seconds - (h * 3600)) // 60
s = seconds % 60
return f"{h:01d}:{m:02d}:{s:02d}"
def get_battle_page(): def get_battle_page():
global __last_battle_update_timestamp, __last_battle_response global __last_battle_update_timestamp, __last_battle_response
if int(datetime.datetime.now().timestamp()) >= __last_battle_update_timestamp + 60: if int(datetime.datetime.now().timestamp()) >= __last_battle_update_timestamp + 60:
@ -94,13 +77,9 @@ class MyClient(discord.Client):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# create the background task and run it in the background # create the background task and run it in the background
self.last_event_timestamp = self.timestamp self.last_event_timestamp = timestamp()
self.bg_task = self.loop.create_task(self.report_epics()) self.bg_task = self.loop.create_task(self.report_battle_events())
self.bg_rss_task = self.loop.create_task(self.report_latvian_events()) self.bg_rss_task = self.loop.create_task(self.report_rss_events())
@property
def timestamp(self):
return int(time.time())
async def on_ready(self): async def on_ready(self):
logger.info("Client running") logger.info("Client running")
@ -115,7 +94,7 @@ class MyClient(discord.Client):
else: else:
return logger.debug(f"Sending message to: {channel_id}\nArgs: {args}\nKwargs{kwargs}") return logger.debug(f"Sending message to: {channel_id}\nArgs: {args}\nKwargs{kwargs}")
async def report_latvian_events(self): async def report_rss_events(self):
await self.wait_until_ready() await self.wait_until_ready()
feed_response = None feed_response = None
while not self.is_closed(): while not self.is_closed():
@ -171,57 +150,69 @@ class MyClient(discord.Client):
for channel_id in DB.get_kind_notification_channel_ids("events"): for channel_id in DB.get_kind_notification_channel_ids("events"):
await self.get_channel(channel_id).send(embed=embed) await self.get_channel(channel_id).send(embed=embed)
await asyncio.sleep((self.timestamp // 300 + 1) * 300 - self.timestamp) await asyncio.sleep((timestamp() // 300 + 1) * 300 - timestamp())
except Exception as e: except Exception as e:
logger.error("eRepublik event reader ran into a problem!", exc_info=e) logger.error("eRepublik event reader ran into a problem!", exc_info=e)
try: try:
with open(f"debug/{self.timestamp}.rss", "w") as f: with open(f"debug/{timestamp()}.rss", "w") as f:
f.write(feed_response.text) f.write(feed_response.text)
except (NameError, AttributeError): except (NameError, AttributeError):
logger.error("There was no Response object!", exc_info=e) logger.error("There was no Response object!", exc_info=e)
await asyncio.sleep(10) await asyncio.sleep(10)
async def report_epics(self): async def report_battle_events(self):
await self.wait_until_ready() await self.wait_until_ready()
while not self.is_closed(): while not self.is_closed():
try: try:
r = get_battle_page() r = get_battle_page()
if not isinstance(r.get("battles"), dict): if not isinstance(r.get("battles"), dict):
sleep_seconds = r.get("last_updated") + 60 - self.timestamp sleep_seconds = r.get("last_updated") + 60 - timestamp()
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0) await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
continue continue
for bid, battle in r.get("battles", {}).items():
for div in battle.get("div", {}).values():
if div.get("epic") > 1 and not DB.get_epic(div.get("id")):
with open(f"debug/{self.timestamp}.json", "w") as f:
json.dump(r, f)
invader_id = battle["inv"]["id"]
defender_id = battle["def"]["id"]
embed = discord.Embed(
title=" ".join(div.get("intensity_scale").split("_")).title(),
url=f"https://www.erepublik.com/en/military/battlefield/{battle['id']}",
description=f"Epic battle {UTF_FLAG[invader_id]} vs {UTF_FLAG[defender_id]}!\n" f"Battle for {battle['region']['name']}, Round {battle['zone_id']}",
)
embed.set_footer(text=f"Round time {s_to_human(self.timestamp - battle['start'])}")
logger.debug(
f"Epic battle {UTF_FLAG[invader_id]} vs {UTF_FLAG[defender_id]}! "
f"Round time {s_to_human(self.timestamp - battle['start'])} "
f"https://www.erepublik.com/en/military/battlefield/{battle['id']}"
)
for channel_id in DB.get_kind_notification_channel_ids("epic"):
if role_id := DB.get_role_id_for_channel_division(channel_id, division=div["div"]):
await self.get_channel(channel_id).send(f"<@&{role_id}>", embed=embed)
else:
await self.get_channel(channel_id).send(embed=embed)
DB.add_epic(div.get("id"))
sleep_seconds = r.get("last_updated") + 60 - self.timestamp desc = "'Empty' medals are being guessed based on the division wall. Expect false-positives!"
empty_divisions = {
1: discord.Embed(title="Possibly empty **__last-minute__ D1** medals", description=desc),
2: discord.Embed(title="Possibly empty **__last-minute__ D2** medals", description=desc),
3: discord.Embed(title="Possibly empty **__last-minute__ D3** medals", description=desc),
4: discord.Embed(title="Possibly empty **__last-minute__ D4** medals", description=desc),
11: discord.Embed(title="Possibly empty **__last-minute__ Air** medals", description=desc)
}
for kind, div, data in check_battles(r.get('battles')):
if kind == 'epic' and not DB.get_epic(data['div_id']):
embed = discord.Embed.from_dict(dict(
title=" ".join(data['extra']["intensity_scale"].split("_")).title(),
url=data["url"],
description=f"Epic battle {' vs '.join(data['sides'])}!\nBattle for {data['region']}, Round {data['zone_id']}",
footer=f"Round time {data['round_time']}"
))
logger.debug(f"{embed.title=}, {embed.description=}, {embed.url=}, {embed.footer=}")
for channel_id in DB.get_kind_notification_channel_ids("epic"):
if role_id := DB.get_role_id_for_channel_division(channel_id, division=div):
await self.get_channel(channel_id).send(f"<@&{role_id}>", embed=embed)
else:
await self.get_channel(channel_id).send(embed=embed)
DB.add_epic(data['div_id'])
if kind == 'empty' and data['round_time_s'] >= 87 * 60:
empty_divisions[div].add_field(
name=f"**Battle for {data['region']} {' '.join(data['sides'])}**",
value=f"[R{data['zone_id']} | Time {data['round_time']}]({data['url']})"
)
for d, e in empty_divisions.items():
if e.fields:
for channel_id in DB.get_kind_notification_channel_ids("empty"):
if role_id := DB.get_role_id_for_channel_division(channel_id, division=d):
await self.get_channel(channel_id).send(f"<@&{role_id}>", embed=e)
else:
await self.get_channel(channel_id).send(embed=e)
sleep_seconds = r.get("last_updated") + 60 - timestamp()
await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0) await asyncio.sleep(sleep_seconds if sleep_seconds > 0 else 0)
except Exception as e: except Exception as e:
logger.error("Discord bot's eRepublik epic watcher died!", exc_info=e) logger.error("Discord bot's eRepublik epic watcher died!", exc_info=e)
try: try:
with open(f"debug/{self.timestamp}.json", "w") as f: with open(f"debug/{timestamp()}.json", "w") as f:
f.write(r.text) f.write(f"{r}")
except NameError: except NameError:
logger.error("There was no Response object!", exc_info=e) logger.error("There was no Response object!", exc_info=e)
await asyncio.sleep(10) await asyncio.sleep(10)
@ -269,6 +260,9 @@ async def notify(ctx, kind: str):
elif kind == "events": elif kind == "events":
DB.add_notification_channel(guild_id, channel_id, kind) DB.add_notification_channel(guild_id, channel_id, kind)
await ctx.send("I will notify about eLatvia's events in this channel!") await ctx.send("I will notify about eLatvia's events in this channel!")
elif kind == "empty":
DB.add_notification_channel(guild_id, channel_id, kind)
await ctx.send("I will notify about empty medals in this channel!")
else: else:
await ctx.send(f"Unknown {kind=}") await ctx.send(f"Unknown {kind=}")
else: else:
@ -302,39 +296,8 @@ async def exit(ctx):
return await ctx.send(f"Labs mēģinājums! Mani nogalināt var tikai <@{ADMIN_ID}>") return await ctx.send(f"Labs mēģinājums! Mani nogalināt var tikai <@{ADMIN_ID}>")
def get_empty_medals(division_id: int, minutes: int = 30):
minutes = minutes if minutes > 0 else 60
r = get_battle_page()
if not isinstance(r.get("battles"), dict):
return
for battle in sorted(r.get("battles", {}).values(), key=itemgetter("start")):
if battle["start"] > timestamp_now() - minutes * 60:
continue
battle_url = f"https://www.erepublik.com/en/military/battlefield/{battle['id']}"
invader_id = battle["inv"]["id"]
defender_id = battle["def"]["id"]
for div in battle.get("div", {}).values():
if not div["div"] == division_id or div.get('end'):
continue
domination = div.get("wall", {}).get("dom")
value = f"[Battle for {battle['region']['name']}]({battle_url})"
ret = dict(
region=battle["region"]["name"],
round_time=s_to_human(timestamp_now() - battle["start"]),
sides=[],
url=f"https://www.erepublik.com/en/military/battlefield/{battle['id']}",
zone_id=battle["zone_id"],
)
if domination == 50:
ret["sides"] = [UTF_FLAG[invader_id], UTF_FLAG[defender_id]]
if domination == 100:
ret["sides"] = [UTF_FLAG[invader_id if defender_id == div["wall"]["for"] else defender_id]]
if ret["sides"]:
yield ret
@bot.command() @bot.command()
async def empty(ctx, division, minutes: int = 30): async def empty(ctx, division):
if not ctx.channel.id == 603527159109124096: if not ctx.channel.id == 603527159109124096:
return await ctx.send("Currently unavailable!") return await ctx.send("Currently unavailable!")
try: try:
@ -350,19 +313,20 @@ async def empty(ctx, division, minutes: int = 30):
title=f"Possibly empty {s_div} medals", title=f"Possibly empty {s_div} medals",
description=f"'Empty' medals are being guessed based on the division wall. Expect false-positives!", description=f"'Empty' medals are being guessed based on the division wall. Expect false-positives!",
) )
for med in get_empty_medals(div, minutes): for kind, div_div, data in check_battles(get_battle_page().get('battles')):
embed.add_field( if kind == 'empty' and div_div == div:
name=f"**Battle for {med['region']} {' '.join(med['sides'])}**", embed.add_field(
value=f"[R{med['zone_id']} | Time {med['round_time']}]({med['url']})", name=f"**Battle for {data['region']} {' '.join(data['sides'])}**",
) value=f"[R{data['zone_id']} | Time {data['round_time']}]({data['url']})",
if len(embed.fields) >= 10: )
await ctx.send(embed=embed) if len(embed.fields) >= 10:
embed.clear_fields() return await ctx.send(embed=embed)
if embed.fields: if embed.fields:
return await ctx.send(embed=embed) return await ctx.send(embed=embed)
else: else:
return await ctx.send(f"No empty {s_div} medals found") return await ctx.send(f"No empty {s_div} medals found")
@empty.error @empty.error
async def division_error(ctx, error): async def division_error(ctx, error):
if isinstance(error, (commands.BadArgument, commands.MissingRequiredArgument)): if isinstance(error, (commands.BadArgument, commands.MissingRequiredArgument)):

66
dbot/utils.py Normal file
View File

@ -0,0 +1,66 @@
import datetime
from operator import itemgetter
from typing import Union, Dict, Any, Generator, Tuple, List
from dbot.constants import UTF_FLAG
def timestamp_to_datetime(timestamp: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(timestamp)
def timestamp() -> int:
return int(datetime.datetime.now().timestamp())
def s_to_human(seconds: Union[int, float]) -> str:
seconds = int(seconds)
h = seconds // 3600
m = (seconds - (h * 3600)) // 60
s = seconds % 60
return f"{h:01d}:{m:02d}:{s:02d}"
def check_battles(battle_json: Dict[str, Dict[str, Any]]) -> Generator[Tuple[str, int, Dict[str, Union[str, List[str], int, Dict[str, Union[str, int]]]]], None, None]:
for battle in sorted(battle_json.values(), key=itemgetter("start")):
if battle["start"] > timestamp():
continue
region_name = battle["region"]["name"]
invader_flag = UTF_FLAG[battle["inv"]["id"]]
defender_flag = UTF_FLAG[battle["def"]["id"]]
for div in battle["div"].values():
if div['end']:
continue
division = div['div']
dom = div["wall"]["dom"]
epic = div["epic"]
division_meta_data = dict(
region=region_name,
round_time=s_to_human(timestamp() - battle["start"]),
round_time_s=int(timestamp() - battle["start"]),
sides=[],
url=f"https://www.erepublik.com/en/military/battlefield/{battle['id']}",
zone_id=battle["zone_id"],
div_id=div['id'],
extra={}
)
if dom == 50:
division_meta_data.update(sides=[invader_flag, defender_flag])
yield 'empty', division, division_meta_data
division_meta_data['sides'].clear()
if dom == 100:
division_meta_data.update(sides=[invader_flag if battle["def"]["id"] == div["wall"]["for"] else defender_flag])
yield 'empty', division, division_meta_data
division_meta_data['sides'].clear()
if epic > 1:
division_meta_data.update(sides=[invader_flag, defender_flag])
division_meta_data['extra'].update(intensity_scale=div['intensity_scale'],
epic_type=epic)
yield 'epic', division, division_meta_data
division_meta_data['sides'].clear()
division_meta_data['extra'].clear()
if dom >= 66.8:
division_meta_data.update(sides=[invader_flag if battle["def"]["id"] == div["wall"]["for"] else defender_flag])
yield 'steal', division, division_meta_data
division_meta_data['sides'].clear()
return