""" eBot Copyright (C) 2022 Eriks K This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import datetime import os import pathlib import re import sys from io import BytesIO from itertools import product from operator import attrgetter from typing import Any, Dict, List, Set, Union import httpx from cairosvg import svg2png from erepublik import classes, constants, utils from erepublik.citizen import CitizenEconomy, CitizenLeaderBoard, CitizenMedia, CitizenSocial from erepublik.constants import AIR_RANKS, GROUND_RANKS, Rank forbidden_ids = [] class MyOfferItem: def __init__(self, **kwargs): for k, v in kwargs.items(): if hasattr(self, k): setattr(self, k, v) price: float = 999_999_999.0 country: constants.Country = constants.Country(0, "", "", "") amount: int = 0 offer_id: int = 0 citizen_id: int = 0 quality: int = 0 price_per_use: float = 0.0 true_amount: int = 0 def __repr__(self): return ( f"MyOfferItem(price={self.price}, country={repr(self.country)}, amount={self.amount}, " f"offer_id={self.offer_id}, citizen_id={self.citizen_id}, quality={self.quality}, " f"price_per_use={self.price_per_use}, true_amount={self.true_amount})" ) class _OfferHolder: _content: List[MyOfferItem] _limit: int def __init__(self, limit: int): self._content = [] self._limit = limit def append(self, obj: MyOfferItem): self._content.append(obj) self.check() def sort(self): # for equal price per use, largest amount offers will be first self._content.sort(key=attrgetter("true_amount"), reverse=True) self._content.sort(key=attrgetter("price_per_use")) @property def total_amount(self) -> int: return sum(o.true_amount for o in self._content) def check(self): self.sort() while (self.total_amount - self._content[-1].true_amount) >= self._limit: self._content.pop(-1) if self._content[-1].true_amount > (self.total_amount - self._limit) > 0: o = self._content[-1] ta = o.true_amount a = o.amount coef = ta / a o.true_amount -= self.total_amount - self._limit o.amount = int(o.true_amount / coef) @property def list(self) -> List[MyOfferItem]: return self._content def __iter__(self): for i in self._content: yield i class Contestant: id: int name: str air_kill_count: int ground_kill_count: int air_rank: Rank ground_rank: Rank residence: int military_unit: int has_house: bool done_7_dos: bool factory_count_increased: bool factory_count: int extra: List[Any] def __init__(self, _id: int): self.id = _id self.name = "" self.air_kill_count = 0 self.ground_kill_count = 0 self.air_rank = AIR_RANKS[1] self.ground_rank = GROUND_RANKS[1] self.residence = 0 self.factory_count = 0 self.factory_count_increased = False self.done_7_dos = False self.has_house = False self.extra = [] self.military_unit = 0 @property def as_dict(self) -> Dict[str, Any]: ret = self.__dict__ ret.update( _props=dict( link=self.link, air_rank_ok=self._air_rank_ok, ground_rank_penalty=self.ground_rank_penalty, air_health=self.air_health, air_health_total=self.air_health_total, ground_health=self.ground_health, ground_weapons=self.ground_weapons, air_rank_s=self.air_rank_s, ground_rank_s=self.ground_rank_s, total_health=self.total_health, total_kills=self.total_kills, ) ) return ret @property def link(self): return f"https://www.erepublik.com/en/citizen/profile/{self.id}" @property def _air_rank_ok(self) -> bool: return self.air_rank.id < 56 @property def ground_rank_penalty(self) -> int: if self.ground_rank.id < 70: return 0 else: return 100 - (89 - self.ground_rank.id) * 5 @property def air_health(self): if self.has_house and self._air_rank_ok: return self.air_kill_count * (30 if self.air_rank.id < 50 else 20 if self.air_rank.id < 56 else 0) else: return 0 @property def air_health_total(self): if self.has_house and self._air_rank_ok: return self.air_health * (1 + self.factory_count_increased) + 3500 * self.done_7_dos else: return 0 @property def ground_health(self): amount = self.ground_kill_count * 20 if self.has_house else 0 return int((amount * (100 - self.ground_rank_penalty)) / 100) // 2 * 2 @property def ground_weapons(self): amount = self.ground_kill_count // 4 if self.has_house else 0 return int((amount * (100 - self.ground_rank_penalty)) / 100) @property def air_rank_s(self) -> str: return self.air_rank.name @property def ground_rank_s(self) -> str: return self.ground_rank.name @property def total_health(self) -> int: return (self.air_health_total + self.ground_health) if self.has_house else 0 @property def total_kills(self) -> int: return self.air_kill_count + self.ground_kill_count @classmethod def from_dict(cls, **data) -> "Contestant": obj: Contestant = cls(data["id"]) obj.name = data["name"] obj.air_kill_count = data["air_kill_count"] obj.ground_kill_count = data["ground_kill_count"] obj.air_rank = AIR_RANKS[data["air_rank"]["id"]] obj.ground_rank = GROUND_RANKS[data["ground_rank"]["id"]] obj.residence = data["residence"] obj.factory_count = data["factory_count"] obj.factory_count_increased = data["factory_count_increased"] obj.done_7_dos = data["done_7_dos"] obj.has_house = data["has_house"] obj.extra = data["extra"] obj.military_unit = data["military_unit"] return obj class SupplierCitizen(CitizenLeaderBoard, CitizenMedia, CitizenEconomy, CitizenSocial): def __init__(self, email: str, password: str): """Instantiate Stripped down version of erepublik.Citizen adjusted for aviator support. :param email: Citizen's email address :param password: Citizen's password :type email: str :type password: str """ super().__init__(email, password) self.stop_threads.set() self._req.debug = True self.config.interactive = True self.set_debug(True) self.init_logger() def update(self): self.get_csrf_token() self.update_citizen_info() self.update_inventory() self.update_money() if not os.environ.get("PYTHON_TESTS"): self.reporter.do_init() self.telegram.do_init(417412798, "363081107:AAE4MmIz_TJh_mFkNDA5MDkwZTY4YjI1ZWJ", self.name) # noqa self.telegram.send_message(f"*Started aviator supply* {utils.now():%F %T}") def get_weekly_daily_orders_done(self, name: str, mu_id: int, provisional: bool) -> bool: weeks_ago = int(not bool(provisional)) params = dict( currentPage=1, panel="members", sortBy="dailyOrdersCompleted", weekFilter=f"week{weeks_ago}", search=name, ) member = self._get_military_unit_data(mu_id, **params).json() try: return bool(member.get("panelContents", {}).get("members", [{}])[0].get("allDailyOrdersCompleted")) except: # noqa return False def get_citizen_profile(self, player_id: int = None): return self._get_main_citizen_profile_json(player_id).json() def direct_get_citizen_residency_data(self, name: str, city_id: int): return self._get_main_city_data_residents(city_id, params={"search": name}).json() def get_multiple_market_offers( self, prod: str, amount: int, q: int = None, glob: bool = False ) -> List[MyOfferItem]: raw_short_names = dict(frm="foodRaw", wrm="weaponRaw", hrm="houseRaw", arm="airplaneRaw") q1_industries = list(raw_short_names.values()) q5_industries = ["house", "aircraft", "ticket"] industry_id = constants.INDUSTRIES[prod] if prod in raw_short_names: q = 1 prod = raw_short_names[prod] elif not industry_id: self.report_error(f"Industry '{prod}' not implemented") raise classes.ErepublikException(f"Industry '{prod}' not implemented") max_quality = 1 if prod in q1_industries else 5 if prod.lower() in q5_industries else 7 q = max_quality if q and q > max_quality else q if glob: countries: Set[constants.Country] = self.get_countries_with_regions() else: countries: Set[constants.Country] = {self.details.citizenship} ret_list: _OfferHolder = _OfferHolder(amount) start_dt = self.now iterable = [countries, [q] if q else range(1, max_quality + 1)] for country, qlt in product(*iterable): r = self._post_economy_marketplace(country.id, industry_id, qlt).json() if r.get("error"): self.write_warning(f"{country}: {r.get('message', '')}", extra={"resp": r}) continue for offer in r["offers"]: price = float(offer["priceWithTaxes"]) amount = int(offer["amount"]) true_amount = amount * constants.FOOD_ENERGY[f"q{qlt}"] if industry_id == 1 else amount ppu = price / (constants.FOOD_ENERGY[f"q{qlt}"] if industry_id == 1 else 1) ret_list.append( MyOfferItem( price=price, country=country, amount=amount, offer_id=int(offer["id"]), citizen_id=int(offer["citizen_id"]), quality=qlt, price_per_use=ppu, true_amount=true_amount, ) ) self.logger.debug(f"Scraped market in {self.now - start_dt}!") return ret_list.list def buy_multiple_market_offers(self, offers: Union[List[MyOfferItem], _OfferHolder]): cur_country = self.details.current_country for offer in sorted(offers, key=attrgetter("country.id")): if offer.country != cur_country: for x in range(3): if self.travel_to_country(offer.country): break else: raise classes.ErepublikException(f"Unable to travel to {offer.country}!") cur_country = offer.country self.buy_market_offer(offer, offer.amount) # noqa self.travel_to_residence() class LatvianSupply: citizen: SupplierCitizen = None free_energy: int = 0 organisation: SupplierCitizen = None CONSOLE_TABLE_FORMAT: str = "{:>9} | {:<28} | {:26} | {:6} | {:26} | {:6} | {:^4} | {:^10} | {}" contestants: Dict[int, Contestant] air_top: Dict[int, int] ground_top: Dict[int, int] already_sent: Dict[int, Dict[str, int]] context: Dict[str, Union[str, int, float]] debug: bool provisional: bool checkpoint_id: int = 0 def __init__(self, citizen: SupplierCitizen, debug: bool = False, provisional: bool = False): self.citizen = citizen self.contestants = {} self.air_top = {} self.ground_top = {} self.already_sent = {} self.context = dict( PLAYER_COUNT=0, TABLE_AIR="", TABLE_GROUND="", TABLE_TOTAL="", STARTING_ENERGY=0, TOTAL_CC=0, TOTAL_ENERGY=0, END_ENERGY=0, ) self.citizen.config.interactive = True self.citizen._req.debug = True # noqa self.citizen.set_debug(True) self.debug = debug self.provisional = provisional if debug: citizen.write_log("Running in DEBUG mode!") if provisional: citizen.write_log("Running in PROVISIONAL mode!") def __call__(self, *args, **kwargs): starting_checkpoint_id = kwargs.get("checkpoint_id") or 0 # Step 1: Setup primary data if starting_checkpoint_id < 1: latest_article = self.get_latest_article_data() self.context["STARTING_ENERGY"] = sum( amount * constants.FOOD_ENERGY[q] for q, amount in latest_article.get("free_food", {}).items() ) self.free_energy = self.context["STARTING_ENERGY"] self.context.update(TOTAL_WEEK=latest_article.get("week", 0) + 1) self.context.update(WEEK=self.context["TOTAL_WEEK"] - 184) article_id = latest_article.get("article_id") self.dump_checkpoint(1, **{"article_id": article_id, "context": self.context}) else: data = self.load_checkpoint(1) article_id = data["article_id"] self.context.update(**data["context"]) # Step 2: Setup ranking data if starting_checkpoint_id < 2: for top_data in self.citizen.get_aircraft_kill_rankings(71, 0 if self.provisional else 1, 0).get("top"): self.air_top.update({int(top_data.get("id")): int(top_data["values"])}) for top_data in self.citizen.get_ground_kill_rankings(71, 0 if self.provisional else 1, 0, 0).get("top"): self.ground_top.update({int(top_data.get("id")): int(top_data["values"])}) self.dump_checkpoint(2, **{"air_top": self.air_top, "ground_top": self.ground_top}) else: data = self.load_checkpoint(2) self.air_top = data["air_top"] self.ground_top = data["ground_top"] # Step 3: Get contestants if starting_checkpoint_id < 3: comments: Dict[str, Any] = self.citizen.get_article_comments(article_id, 1) if not comments.get("comments", {}): raise classes.ErepublikException("No comments found") self.citizen.write_log( self.CONSOLE_TABLE_FORMAT.format( "ID", "Vārds", "G rangs", "G kili", "A rangs", "A kili", "7do", "Fabrikas", "Papildu" ) ) self.set_contestants_from_comments(comments.get("comments", {})) self.context["PLAYER_COUNT"] = len(self.contestants) self.dump_checkpoint(3, **dict(contestants=self.contestants, context=self.context)) else: data = self.load_checkpoint(3) self.context.update(**data["context"]) for contestant in data["contestants"].values(): cont = Contestant.from_dict(**contestant) self.contestants.update({cont.id: cont}) # Step 4: calculate expenses and buy necessary items if starting_checkpoint_id < 4: cc_spent_on_food = cc_spent_on_tanks = 0 previously_sent = dict(food=0, tanks=0) for a in self.already_sent.values(): previously_sent["food"] += a["food"] previously_sent["tanks"] += a["tanks"] total_food_requirement = sum(c.total_health for c in self.contestants.values()) self.context["TOTAL_ENERGY"] = total_food_requirement total_food_requirement -= self.free_energy if total_food_requirement - previously_sent["food"] > 0: food_offers = self.citizen.get_multiple_market_offers( "food", total_food_requirement - previously_sent["food"], glob=True ) cc_spent_on_food = round(sum(o.amount * o.price for o in food_offers), 2) if self.debug or self.provisional: for offer in food_offers: self.citizen.logger.debug(repr(offer)) else: self.citizen.buy_multiple_market_offers(food_offers) total_tank_requirement = sum(c.ground_weapons for c in self.contestants.values()) self.context["TOTAL_TANKS"] = total_tank_requirement if total_tank_requirement - previously_sent["tanks"] > 0: tank_offers = self.citizen.get_multiple_market_offers( "weapon", total_tank_requirement - previously_sent["tanks"], 7, glob=(not self.debug and not self.provisional), ) cc_spent_on_tanks = round(sum(o.amount * o.price for o in tank_offers), 2) if self.debug or self.provisional: for offer in tank_offers: self.citizen.logger.debug(repr(offer)) else: self.citizen.buy_multiple_market_offers(tank_offers) self.context["TOTAL_CC"] = round(cc_spent_on_food + cc_spent_on_tanks * 1.1, 2) self.citizen.update_inventory() self.dump_checkpoint(4, **{"context": self.context}) else: data = self.load_checkpoint(4) self.context.update(**data["context"]) # Step 5: send supplies if starting_checkpoint_id < 5: sent_data = [] for contestant in sorted( self.contestants.values(), key=attrgetter("total_health", "total_kills"), reverse=True, ): self.send_food_supplies(contestant.id, sent_data) self.send_tank_supplies(contestant.id, sent_data) with open(utils.get_file(f"{self.citizen.eday}.json"), "w") as f: utils.json_dump(sent_data, f) self.citizen.write_log(f"Not used energy: {self.free_energy:,d}hp") self.context["END_ENERGY"] = self.free_energy self.dump_checkpoint(5, **{"context": self.context}) else: data = self.load_checkpoint(5) self.context.update(**data["context"]) # Step 6: Generate images for article if starting_checkpoint_id < 6: image_title = f"Day {self.start_eday}-{self.end_eday}" air_url = self.get_air_image_url(image_title) ground_url = self.get_ground_image_url(image_title) total_url = self.get_total_image_url(image_title) self.context.update(AIR_URL=air_url, GROUND_URL=ground_url, TOTAL_URL=total_url) self.dump_checkpoint(6, **{"context": self.context}) else: data = self.load_checkpoint(6) self.context.update(**data["context"]) # Step 7: Generate article data if starting_checkpoint_id < 7: for k, v in self.prepare_article_image().items(): self.context[f"TABLE_{k.upper()}"] = v article_body = self.generate_article_body() article_data = dict( content=article_body, from_eday=self.start_eday, till_eday=self.end_eday, title=f'[KM] eLatviešu apgāde [d{self.article_eday} {self.citizen.now.strftime("%H:%M")}]', comment=( f"★★★★ APGĀDE PAR NEDĒĻU [DAY {self.start_eday}-{self.end_eday}] IZDALĪTA ★★★★\n" "★ Apgādei piesakies šī komentāra atbildes komentāros ar saucienu - piesakos! ★" ), ) self.citizen.write_log( f"Publishing info:\n\n### Article ###\n{article_data['title']}\n\n{article_data['comment']}\n\n" ) self.dump_checkpoint(7, **{"context": self.context, "article_data": article_data}) else: data = self.load_checkpoint(7) self.context.update(**data["context"]) article_data = data["article_data"] self.organisation = SupplierCitizen("organisation.email@example.com", "") self.organisation.set_debug(True) self.organisation.update() while not self.organisation.token: self.organisation.update_citizen_info() # Step 8: refund if starting_checkpoint_id < 8: if not self.debug and not self.provisional: self.organisation.donate_money(self.citizen.details.citizen_id, int(self.context["TOTAL_CC"] + 0.5), 1) self.organisation.update_money() refund_cc = int(250_000.5 - self.organisation.details.cc) if refund_cc > 0: wall_body = ( "★★★ [ PAZIŅOJUMS KONGRESAM ] ★★★\n\n" f"Veikta apgāde par d{self.start_eday}-{self.end_eday} {refund_cc}cc apmērā!\n\n" f"Ierosiniet naudas pārskaitījumu uz {self.organisation.name}\n" ) self.citizen.write_log("### Wall ###\n" f"{wall_body}") self.citizen.telegram.send_message(wall_body) # Step 9: Publish article if starting_checkpoint_id < 9 and not self.debug and not self.provisional: comment_data = dict(message=article_data.pop("comment")) new_article_id = self.organisation.publish_article(article_data["title"], article_data["content"], 3) comment_data.update(article_id=new_article_id) self.organisation.write_article_comment(**comment_data) self.organisation.vote_article(new_article_id) self.citizen.vote_article(new_article_id) self.citizen.endorse_article(new_article_id, 100) httpx.post( "https://erep.lv/aviator/latest_article/", json=dict( week=self.context["TOTAL_WEEK"], article_id=new_article_id, free_food={"q1": self.free_energy // 2} ), ) httpx.post( "https://erep.lv/aviator/set/", json=[ dict( id=aviator.id, name=aviator.name, factory_count=aviator.factory_count, rank=aviator.air_rank.id ) for aviator in self.contestants.values() ], ) with open(f'aviator_support_{self.citizen.eday}_{self.citizen.now.strftime("%F %T")}.json', "w") as f: utils.json_dump(self, f) @property def offset(self): return int(self.debug and not self.provisional) @property def as_dict(self): return self.__dict__ def get_latest_article_data(self): data = httpx.get(f"https://erep.lv/aviator/latest_article/{self.offset}/").json() if not data.get("status"): raise classes.ErepublikException("Article ID and week problem") return data def set_contestants_from_comments(self, comments): time_string = "%Y-%m-%d %H:%M:%S" for comment_data in comments.values(): if comment_data.get("authorId") == 1954361: start_dt = utils.localize_dt(datetime.datetime.strptime(comment_data.get("createdAt"), time_string)) days_ahead = 1 - start_dt.weekday() if days_ahead <= 0: days_ahead += 7 end_dt = utils.good_timedelta(start_dt, datetime.timedelta(days_ahead)).replace( hour=0, minute=0, second=0 ) if not comment_data.get("replies", {}): raise classes.ErepublikException("No replies found") for reply_data in comment_data.get("replies").values(): if utils.localize_dt(datetime.datetime.strptime(reply_data.get("createdAt"), time_string)) > end_dt: continue if re.search(r"piesakos", reply_data.get("message"), re.I): self._setup_contestant(reply_data) @property def start_eday(self): now = utils.now() - datetime.timedelta(days=utils.now().weekday()) return utils.eday_from_date(constants.erep_tz.normalize(now - datetime.timedelta(days=6))) @property def end_eday(self): return self.start_eday + 6 @property def article_eday(self): return self.end_eday + 1 def send_food_supplies(self, contestant_id: int, actions: List[Any]): contestant = self.contestants[contestant_id] food = {q: self.citizen.food[q] for q in reversed(list(constants.FOOD_ENERGY.keys())) if self.citizen.food[q]} health = (contestant.air_health_total + contestant.ground_health) // 2 * 2 if contestant.id in self.already_sent: sent = self.already_sent[contestant.id]["food"] health -= sent if health < 0: health = 0 actions.append(dict(player_id=contestant.id, name=contestant.name, amount=sent, industry=1)) self.free_energy -= sent if self.free_energy < 0: self.free_energy = 0 if not health: # actions.append(dict(player_id=contestant.id, name=contestant.name, amount=0, industry=1)) return while health > 0: food = { q: food.get(q) for q in reversed(list(constants.FOOD_ENERGY.keys())) if food.get(q) and food.get(q) > 0 } for quality, amount in food.items(): if constants.FOOD_ENERGY[quality] <= health: break else: self.citizen.write_warning(f"{contestant.name} ({contestant.id}) needs to receive extra {health}hp") break if amount * constants.FOOD_ENERGY[quality] > health: amount = health // constants.FOOD_ENERGY[quality] q = int(quality[1]) if self.debug or self.provisional: self.citizen.logger.debug( f"citizen.donate_items(citizen_id={contestant.id}, " f"amount={amount}, industry_id=1, quality={q})" ) else: donated = self.citizen.donate_items(citizen_id=contestant.id, amount=amount, industry_id=1, quality=q) if donated != amount: health -= donated * constants.FOOD_ENERGY[quality] self.citizen.write_warning(f"{contestant.name} ({contestant.id}) needs to receive extra {health}hp") actions.append( dict( player_id=contestant.id, name=contestant.name, amount=donated * constants.FOOD_ENERGY[quality], industry=1, ) ) break food[quality] -= amount self.citizen.food[quality] -= amount self.citizen.inventory.final["Food"][q]["amount"] -= amount health -= amount * constants.FOOD_ENERGY[quality] actions.append( dict( player_id=contestant.id, name=contestant.name, amount=amount * constants.FOOD_ENERGY[quality], industry=1, ) ) def send_tank_supplies(self, contestant_id: int, actions: List[Any]): contestant = self.contestants[contestant_id] tank_amount = contestant.ground_weapons if contestant.id in self.already_sent: sent = self.already_sent[contestant_id]["tanks"] tank_amount -= sent if tank_amount < 0: tank_amount = 0 actions.append(dict(player_id=contestant.id, name=contestant.name, amount=sent, industry=2)) if not tank_amount: # actions.append(dict(player_id=contestant.id, name=contestant.name, amount=0, industry=2)) return if self.debug or self.provisional: self.citizen.logger.debug( f"citizen.donate_items(citizen_id={contestant.id}, " f"amount={tank_amount}, industry_id=2, quality=7)" ) donated = tank_amount else: donated = self.citizen.donate_items(citizen_id=contestant.id, amount=tank_amount, industry_id=2, quality=7) if donated != tank_amount: tank_amount -= donated self.citizen.write_log( f"{contestant.name} ({contestant.id}) needs to receive extra {tank_amount} q7 tanks" ) actions.append(dict(player_id=contestant.id, name=contestant.name, amount=donated, industry=2)) def get_air_image_url(self, title: str) -> str: return self.upload_image_and_get_url(self._create_aviator_png(title)) def get_ground_image_url(self, title: str) -> str: return self.upload_image_and_get_url(self._create_ground_png(title)) def get_total_image_url(self, title: str) -> str: return self.upload_image_and_get_url(self._create_total_png(title)) def prepare_article_image(self) -> Dict[str, str]: contestant_captions = {"ground": [], "air": [], "total": []} for contestant in sorted( self.contestants.values(), key=attrgetter("total_kills", "total_health"), reverse=True ): bb_link = f"[url={contestant.link}]{contestant.name}[/url]" if contestant.total_health: contestant_captions["total"].append(bb_link) for k, v in contestant_captions.items(): v = ", ".join(v) if k == "total": v = f"Apgādi saņēma: {v}." contestant_captions[k] = v title_trans = {"air": "Gaisa apgāde", "ground": "Zemes apgāde", "total": ""} return { name: ( f"[b]{title_trans[name]}[/b]\n" f"[center][url={self.context[url_key]}][img]{self.context[url_key]}[/img][/url][/center]\n" f"{contestant_captions[name]}" ) for name, url_key in ( ("air", "AIR_URL"), ("ground", "GROUND_URL"), ("total", "TOTAL_URL"), ) } def generate_article_body(self) -> str: filename = f"{os.path.abspath(os.path.dirname(sys.argv[0]))}/scripts/KM_apgade.txt" if os.path.isfile(filename): with open(filename) as article_template_file: template = article_template_file.read() article = template.format(**self.context) with open(utils.get_file(f"{self.article_eday}.txt"), "w") as article_file: article_file.write(article) return article return "" def dump_checkpoint(self, checkpoint_id: int, **data_to_dump): dump_path = pathlib.Path(f"aviator_dumps/d{self.article_eday}/") dump_path.mkdir(parents=True, exist_ok=True) with open(dump_path / f"{checkpoint_id:02d}_checkpoint.json", "w") as f: utils.json_dump(data_to_dump, f) def load_checkpoint(self, checkpoint_id: int): load_path = pathlib.Path(f"aviator_dumps/d{self.article_eday}/") with open(load_path / f"{checkpoint_id:02d}_checkpoint.json", "r") as f: data = utils.json_load(f) return data @staticmethod def upload_image_and_get_url(image: BytesIO, title: str = None) -> str: if title is None: title = "aviator_top" title += ".png" p = httpx.post( "https://erep.lv/image/upload", files=[("file", (title, image, "image/png"))], data=dict(password=""), ) if p.json().get("status"): return p.json().get("url") else: raise ValueError("Unable to upload table image!") def _create_aviator_png(self, title: str) -> BytesIO: """Convert data to png table. :param title: String containing table name :rtype: PNG image containing """ data = list(row for row in self.contestants.values() if row.air_kill_count) formatted_data = "" col_headers = [ ("name", "Spēlētājs", 10), ("air_kill_count", "Kili", 350), ("air_health", "Enerģija", 440), ("done_7_dos", "7 DO", 470), ("factory_count_increased", "Fabrikas", 540), ("air_health_total", "Kopā", 650), ] for col, col_value, x in col_headers: anchor = ( "start" if col == "name" else "middle" if col in ["done_7_dos", "factory_count_increased"] else "end" ) formatted_data += f"\n" formatted_data += f"{col_value}\n" bold = " font-weight='bold'" if col == "name" else "" for row in sorted(data, key=attrgetter("air_kill_count", "air_health_total"), reverse=True): value = getattr(row, col, None) if col == "air_health_total": if row.extra: value = str(row.extra[0]) else: value = row.air_health_total if isinstance(value, bool): value = "+" if value else "–" elif isinstance(value, int): value = f"{value:,.0f}".replace(",", " ") elif isinstance(value, str): if len(value) > 29: value = value[:26] + ".." formatted_data += f"{value}\n" formatted_data += "\n" height = 33 + len(data) * 24 row_highlight_count = (len(data) + 1) // 2 row_highlights = "" for i in range(row_highlight_count): row_highlights += f"" svg_template = ( f"" # noqa f"{title}" # noqa f"{row_highlights}" f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"{formatted_data}" ) small_svg = "".join([row.strip() for row in svg_template.split("\n")]) icon_file = BytesIO() svg2png(small_svg, write_to=icon_file, scale=2) icon_file.seek(0) return icon_file def _setup_contestant(self, reply_data): contestant = Contestant(int(reply_data.get("authorId"))) contestant.air_kill_count = self.air_top.get(contestant.id, 0) contestant.ground_kill_count = self.ground_top.get(contestant.id, 0) self.contestants.update({contestant.id: contestant}) profile = self.citizen.get_citizen_profile(contestant.id) contestant.name = profile["citizen"]["name"] contestant.air_rank = AIR_RANKS[profile["military"]["militaryData"]["aircraft"]["rankNumber"]] contestant.ground_rank = GROUND_RANKS[profile["military"]["militaryData"]["ground"]["rankNumber"]] contestant.military_unit = ( profile["military"]["militaryUnit"]["id"] if profile["military"]["militaryUnit"] else 0 ) if profile.get("isBanned"): contestant.extra.append("BANNED") elif not profile.get("location", {}).get("citizenshipCountry", {}).get("id") == 71: contestant.extra.append("Nav pilsonis") elif not profile["city"]["residenceCityId"]: contestant.extra.append("Nav rezidences") else: contestant.residence = profile["city"]["residenceCityId"] if contestant.id in forbidden_ids: contestant.extra.append("Aizliegta pieteikšanās") if contestant.extra: return residence_data = self.citizen.direct_get_citizen_residency_data(contestant.name, contestant.residence) __residents = residence_data.get("widgets", {}).get("residents", {}).get("residents") contestant.factory_count = 0 contestant.has_house = False for resident in __residents: if int(resident.get("citizenId")) == contestant.id: contestant.factory_count = resident.get("numFactories", 0) contestant.has_house = bool(resident.get("activeHouses")) break resp = httpx.post( f"https://erep.lv/aviator/check/{contestant.id}", data={"current_count": contestant.factory_count}, ) contestant.factory_count_increased = resp.json()["status"] if not contestant.has_house: contestant.extra.append("Nav māju") if contestant.military_unit: contestant.done_7_dos = self.citizen.get_weekly_daily_orders_done( contestant.name, contestant.military_unit, self.provisional ) self.citizen.write_log( self.CONSOLE_TABLE_FORMAT.format( contestant.id, contestant.name, contestant.ground_rank_s, contestant.ground_kill_count, contestant.air_rank_s, contestant.air_kill_count, contestant.done_7_dos, contestant.factory_count_increased, ", ".join(contestant.extra), ) ) def _create_ground_png(self, title: str) -> BytesIO: """Convert data to png table. :param title: String containing table name :rtype: PNG image containing """ data = list(row for row in self.contestants.values() if row.ground_kill_count) formatted_data = "" col_headers = [ ("name", "Spēlētājs", 10), ("ground_kill_count", "Kili", 350), ("penalty", "", 450), ("ground_health", "Enerģija", 550), ("ground_weapons", "q7", 650), ] for col, col_value, x in col_headers: anchor = "start" if col == "name" else "end" formatted_data += f" \n" formatted_data += f" {col_value}\n" bold = " font-weight='bold'" if col == "name" else "" for row in sorted(data, key=attrgetter("ground_kill_count", "ground_health"), reverse=True): value = getattr(row, col, None) if col == "penalty": if not row.ground_health and not row.ground_rank_penalty == 100 and row.extra: value = row.extra[0] elif row.ground_rank_penalty: value = f"-{row.ground_rank_penalty}%" else: value = "" if isinstance(value, bool): value = "+" if value else "–" elif isinstance(value, int): value = f"{value:,.0f}".replace(",", " ") elif isinstance(value, str): if len(value) > 29: value = value[:26] + ".." formatted_data += f" {value}\n" formatted_data += " \n" height = 33 + len(data) * 24 row_highlight_count = (len(data) + 1) // 2 row_highlights = "" for i in range(row_highlight_count): row_highlights += f"" svg_template = ( f"" # noqa f"{title}" # noqa f"{row_highlights}" f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"{formatted_data}" ) small_svg = "".join([row.strip() for row in svg_template.split("\n")]) icon_file = BytesIO() svg2png(small_svg, write_to=icon_file, scale=2) icon_file.seek(0) return icon_file def _create_total_png(self, title: str) -> BytesIO: """Convert data to png table. :param title: String containing table name :rtype: PNG image containing """ data = list(self.contestants.values()) formatted_data = "" col_headers = [ ("name", "Spēlētājs", 10), ("total_kills", "Kili kopā", 400), ("total_health", "Enerģija kopā", 550), ("ground_weapons", "Tanki", 650), ] for col, col_value, x in col_headers: anchor = "start" if col == "name" else "end" formatted_data += f"" formatted_data += f"{col_value}" bold = " font-weight='bold'" if col == "name" else "" for row in sorted(data, key=attrgetter("total_kills", "total_health"), reverse=True): value = getattr(row, col, None) if col == "total_health" and not value: if row.extra: value = row.extra[0] if isinstance(value, bool): value = "+" if value else "–" elif isinstance(value, int): value = f"{value:,.0f}".replace(",", " ") elif isinstance(value, str): if len(value) > 29: value = value[:26] + ".." formatted_data += f"{value}" formatted_data += "" height = 33 + len(data) * 24 row_highlight_count = (len(data) + 1) // 2 row_highlights = "" for i in range(row_highlight_count): row_highlights += f"" svg_template = ( f"" # noqa f"{title}" # noqa f"{row_highlights}" f"" # noqa f"" # noqa f"" # noqa f"" # noqa f"{formatted_data}" ) small_svg = "".join([row.strip() for row in svg_template.split("\n")]) icon_file = BytesIO() svg2png(small_svg, write_to=icon_file, scale=2) icon_file.seek(0) return icon_file