diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8b999a3 --- /dev/null +++ b/.flake8 @@ -0,0 +1,5 @@ +[flake8] +ignore = E203,E722 +exclude = .git,__pycache__,venv,docs,debug,log +max-line-length = 120 +#max-complexity = 10 diff --git a/docs/conf.py b/docs/conf.py index af4b009..e705429 100755 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,10 +23,12 @@ import sys sys.path.insert(0, os.path.abspath("..")) -import erepublik -import edx_theme import datetime +import edx_theme + +import erepublik + # -- General configuration --------------------------------------------- # If your documentation needs a minimal Sphinx version, state it here. @@ -146,5 +148,13 @@ man_pages = [(master_doc, "erepublik", "eRepublik script Documentation", [author # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, "erepublik", "eRepublik script Documentation", author, "erepublik", "One line description of project.", "Miscellaneous"), + ( + master_doc, + "erepublik", + "eRepublik script Documentation", + author, + "erepublik", + "One line description of project.", + "Miscellaneous", + ), ] diff --git a/erepublik/_logging.py b/erepublik/_logging.py index 96ed6b1..b39327e 100644 --- a/erepublik/_logging.py +++ b/erepublik/_logging.py @@ -133,7 +133,9 @@ class ErepublikErrorHTTTPHandler(handlers.HTTPHandler): except: # noqa resp_time = slugify(response.headers.get("date")) return dict( - name=f"{resp_time}_{name}.{ext}", content=html.encode("utf-8"), mimetype="application/json" if ext == "json" else "text/html" + name=f"{resp_time}_{name}.{ext}", + content=html.encode("utf-8"), + mimetype="application/json" if ext == "json" else "text/html", ) def _get_local_vars(self) -> str: diff --git a/erepublik/access_points.py b/erepublik/access_points.py index 17fce96..f152d40 100644 --- a/erepublik/access_points.py +++ b/erepublik/access_points.py @@ -115,7 +115,9 @@ class SlowRequests(Session): firefox_template = "Mozilla/5.0 ({osystem}; rv:{version}.0) Gecko/20100101 Firefox/{version}.0" firefox_versions = range(85, 92) - chrome_template = "Mozilla/5.0 ({osystem}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36" + chrome_template = ( + "Mozilla/5.0 ({osystem}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{version} Safari/537.36" + ) chrome_versions = [ "85.0.4183.121", "86.0.4240.183", @@ -192,7 +194,11 @@ class CitizenBaseAPI: cookies_hash = hashlib.sha256(",".join(env["c"]).encode("utf8")).hexdigest() cookie_kwargs = dict( - expires=int(time.time()) + 120, path="/en/main/sessionUnlock", domain=".www.erepublik.com", secure=True, rest={"HttpOnly": True} + expires=int(time.time()) + 120, + path="/en/main/sessionUnlock", + domain=".www.erepublik.com", + secure=True, + rest={"HttpOnly": True}, ) self._req.cookies.set("sh", session_hash, **cookie_kwargs) self._req.cookies.set("ch", cookies_hash, **cookie_kwargs) @@ -271,7 +277,13 @@ class ErepublikArticleAPI(CitizenBaseAPI): return self.post(f"{self.url}/main/donate-article", data=data) def _post_main_write_article(self, title: str, content: str, country_id: int, kind_id: int) -> Response: - data = dict(_token=self.token, article_name=title, article_body=content, article_location=country_id, article_category=kind_id) + data = dict( + _token=self.token, + article_name=title, + article_body=content, + article_location=country_id, + article_category=kind_id, + ) return self.post(f"{self.url}/main/write-article", data=data) def _post_main_vote_article(self, article_id: int) -> Response: @@ -286,7 +298,9 @@ class ErepublikCompanyAPI(CitizenBaseAPI): def _post_economy_create_company(self, industry_id: int, building_type: int = 1) -> Response: data = {"_token": self.token, "company[industry_id]": industry_id, "company[building_type]": building_type} - return self.post(f"{self.url}/economy/create-company", data=data, headers={"Referer": f"{self.url}/economy/create-company"}) + return self.post( + f"{self.url}/economy/create-company", data=data, headers={"Referer": f"{self.url}/economy/create-company"} + ) def _get_economy_inventory_items(self) -> Response: return self.get(f"{self.url}/economy/inventory-items/") @@ -366,9 +380,13 @@ class ErepublikCountryAPI(CitizenBaseAPI): def _get_country_military(self, country_name: str) -> Response: return self.get(f"{self.url}/country/military/{country_name}") - def _post_main_country_donate(self, country_id: int, action: str, value: Union[int, float], quality: int = None) -> Response: + def _post_main_country_donate( + self, country_id: int, action: str, value: Union[int, float], quality: int = None + ) -> Response: data = dict(countryId=country_id, action=action, _token=self.token, value=value, quality=quality) - return self.post(f"{self.url}/main/country-donate", data=data, headers={"Referer": f"{self.url}/country/economy/Latvia"}) + return self.post( + f"{self.url}/main/country-donate", data=data, headers={"Referer": f"{self.url}/country/economy/Latvia"} + ) class ErepublikEconomyAPI(CitizenBaseAPI): @@ -396,13 +414,17 @@ class ErepublikEconomyAPI(CitizenBaseAPI): def _post_economy_donate_items_action(self, citizen_id: int, amount: int, industry: int, quality: int) -> Response: data = dict(citizen_id=citizen_id, amount=amount, industry_id=industry, quality=quality, _token=self.token) return self.post( - f"{self.url}/economy/donate-items-action", data=data, headers={"Referer": f"{self.url}/economy/donate-items/{citizen_id}"} + f"{self.url}/economy/donate-items-action", + data=data, + headers={"Referer": f"{self.url}/economy/donate-items/{citizen_id}"}, ) def _post_economy_donate_money_action(self, citizen_id: int, amount: float = 0.0, currency: int = 62) -> Response: data = dict(citizen_id=citizen_id, _token=self.token, currency_id=currency, amount=amount) return self.post( - f"{self.url}/economy/donate-money-action", data=data, headers={"Referer": f"{self.url}/economy/donate-money/{citizen_id}"} + f"{self.url}/economy/donate-money-action", + data=data, + headers={"Referer": f"{self.url}/economy/donate-money/{citizen_id}"}, ) def _post_economy_exchange_purchase(self, amount: float, currency: int, offer: int) -> Response: @@ -434,7 +456,12 @@ class ErepublikEconomyAPI(CitizenBaseAPI): def _post_economy_marketplace_actions(self, action: str, **kwargs) -> Response: if action == "buy": data = dict( - _token=self.token, offerId=kwargs["offer"], amount=kwargs["amount"], orderBy="price_asc", currentPage=1, buyAction=1 + _token=self.token, + offerId=kwargs["offer"], + amount=kwargs["amount"], + orderBy="price_asc", + currentPage=1, + buyAction=1, ) elif action == "sell": data = dict( @@ -454,16 +481,24 @@ class ErepublikEconomyAPI(CitizenBaseAPI): class ErepublikLeaderBoardAPI(CitizenBaseAPI): - def _get_main_leaderboards_damage_aircraft_rankings(self, country_id: int, weeks: int = 0, mu_id: int = 0) -> Response: # noqa + def _get_main_leaderboards_damage_aircraft_rankings( + self, country_id: int, weeks: int = 0, mu_id: int = 0 + ) -> Response: # noqa return self.get(f"{self.url}/main/leaderboards-damage-aircraft-rankings/{country_id}/{weeks}/{mu_id}/0") - def _get_main_leaderboards_damage_rankings(self, country_id: int, weeks: int = 0, mu_id: int = 0, div: int = 0) -> Response: # noqa + def _get_main_leaderboards_damage_rankings( + self, country_id: int, weeks: int = 0, mu_id: int = 0, div: int = 0 + ) -> Response: # noqa return self.get(f"{self.url}/main/leaderboards-damage-rankings/{country_id}/{weeks}/{mu_id}/{div}") - def _get_main_leaderboards_kills_aircraft_rankings(self, country_id: int, weeks: int = 0, mu_id: int = 0) -> Response: # noqa + def _get_main_leaderboards_kills_aircraft_rankings( + self, country_id: int, weeks: int = 0, mu_id: int = 0 + ) -> Response: # noqa return self.get(f"{self.url}/main/leaderboards-kills-aircraft-rankings/{country_id}/{weeks}/{mu_id}/0") - def _get_main_leaderboards_kills_rankings(self, country_id: int, weeks: int = 0, mu_id: int = 0, div: int = 0) -> Response: # noqa + def _get_main_leaderboards_kills_rankings( + self, country_id: int, weeks: int = 0, mu_id: int = 0, div: int = 0 + ) -> Response: # noqa return self.get(f"{self.url}/main/leaderboards-kills-rankings/{country_id}/{weeks}/{mu_id}/{div}") @@ -543,7 +578,14 @@ class ErepublikMilitaryAPI(CitizenBaseAPI): return self.post(f"{self.url}/military/battle-console", data=data) def _post_military_deploy_bomb(self, battle_id: int, division_id: int, side_id: int, bomb_id: int) -> Response: - data = dict(battleId=battle_id, battleZoneId=division_id, sideId=side_id, sideCountryId=side_id, bombId=bomb_id, _token=self.token) + data = dict( + battleId=battle_id, + battleZoneId=division_id, + sideId=side_id, + sideCountryId=side_id, + bombId=bomb_id, + _token=self.token, + ) return self.post(f"{self.url}/military/deploy-bomb", data=data) def _post_military_fight_air(self, battle_id: int, side_id: int, zone_id: int) -> Response: @@ -599,7 +641,9 @@ class ErepublikPoliticsAPI(CitizenBaseAPI): return self.get(f"{self.url}/main/presidential-elections/{country_id}/{timestamp}") def _post_propose_president_candidate(self, party_slug: str, citizen_id: int) -> Response: - return self.post(f"{self.url}/propose-president-candidate/{party_slug}", data=dict(_token=self.token, citizen=citizen_id)) + return self.post( + f"{self.url}/propose-president-candidate/{party_slug}", data=dict(_token=self.token, citizen=citizen_id) + ) def _get_auto_propose_president_candidate(self, party_slug: str) -> Response: return self.get(f"{self.url}/auto-propose-president-candidate/{party_slug}") @@ -611,11 +655,24 @@ class ErepublikPresidentAPI(CitizenBaseAPI): return self.post(f"{self.url}/wars/attack-region/{war_id}/{region_id}", data=data) def _post_new_war(self, self_country_id: int, attack_country_id: int, debate: str = "") -> Response: - data = dict(requirments=1, _token=self.token, debate=debate, countryNameConfirm=constants.COUNTRIES[attack_country_id].link) + data = dict( + requirments=1, + _token=self.token, + debate=debate, + countryNameConfirm=constants.COUNTRIES[attack_country_id].link, + ) return self.post(f"{self.url}/{constants.COUNTRIES[self_country_id].link}/new-war", data=data) def _post_new_donation(self, country_id: int, amount: int, org_name: str, debate: str = "") -> Response: - data = dict(requirments=1, _token=self.token, debate=debate, currency=1, value=amount, commit="Propose", type_name=org_name) + data = dict( + requirments=1, + _token=self.token, + debate=debate, + currency=1, + value=amount, + commit="Propose", + type_name=org_name, + ) return self.post(f"{self.url}/{constants.COUNTRIES[country_id].link}/new-donation", data=data) @@ -705,7 +762,12 @@ class ErepublikProfileAPI(CitizenBaseAPI): def _post_main_messages_compose(self, subject: str, body: str, citizens: List[int]) -> Response: url_pk = 0 if len(citizens) > 1 else str(citizens[0]) - data = dict(citizen_name=",".join([str(x) for x in citizens]), citizen_subject=subject, _token=self.token, citizen_message=body) + data = dict( + citizen_name=",".join([str(x) for x in citizens]), + citizen_subject=subject, + _token=self.token, + citizen_message=body, + ) return self.post(f"{self.url}/main/messages-compose/{url_pk}", data=data) def _post_military_group_missions(self) -> Response: @@ -810,7 +872,8 @@ class ErepublikWallPostAPI(CitizenBaseAPI): # ## Medal posting def _post_main_wall_post_automatic(self, message: str, achievement_id: int) -> Response: return self.post( - f"{self.url}/main/wall-post/automatic", data=dict(_token=self.token, message=message, achievementId=achievement_id) + f"{self.url}/main/wall-post/automatic", + data=dict(_token=self.token, message=message, achievementId=achievement_id), ) diff --git a/erepublik/citizen.py b/erepublik/citizen.py index 525f174..5458097 100644 --- a/erepublik/citizen.py +++ b/erepublik/citizen.py @@ -9,11 +9,16 @@ from threading import Event from time import sleep from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, TypedDict, Union -from requests import HTTPError, RequestException, Response +from requests import RequestException, Response from erepublik import _types as types from erepublik import access_points, classes, constants, utils -from erepublik._logging import ErepublikErrorHTTTPHandler, ErepublikFileHandler, ErepublikFormatter, ErepublikLogConsoleHandler +from erepublik._logging import ( + ErepublikErrorHTTTPHandler, + ErepublikFileHandler, + ErepublikFormatter, + ErepublikLogConsoleHandler, +) class BaseCitizen(access_points.CitizenAPI): @@ -77,7 +82,8 @@ class BaseCitizen(access_points.CitizenAPI): (after 15min time of inactivity opening page in eRepublik.com redirects to home page), by explicitly requesting homepage. """ - # Idiots have fucked up their session manager - after logging in You might be redirected to public homepage instead of authenticated + # Idiots have fucked up their session manager - after logging in + # You might be redirected to public homepage instead of authenticated resp = self._req.get(self.url if self.logged_in else f"{self.url}/economy/myCompanies") self.r = resp if self._errors_in_response(resp): @@ -216,7 +222,9 @@ class BaseCitizen(access_points.CitizenAPI): # timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2])))) self.details.current_region = citizen.get("regionLocationId", 0) - self.details.current_country = constants.COUNTRIES.get(citizen.get("countryLocationId", 0)) # country where citizen is located + self.details.current_country = constants.COUNTRIES.get( + citizen.get("countryLocationId", 0) + ) # country where citizen is located self.details.residence_region = citizen.get("residence", {}).get("regionId", 0) self.details.residence_country = constants.COUNTRIES.get(citizen.get("residence", {}).get("countryId", 0)) self.details.citizen_id = citizen.get("citizenId", 0) @@ -240,7 +248,13 @@ class BaseCitizen(access_points.CitizenAPI): self.politics.is_party_president = bool(party.get("is_party_president")) self.politics.party_slug = f"{party.get('stripped_title')}-{party.get('party_id')}" - self.wheel_of_fortune = bool(re.search(r'', html)) + self.wheel_of_fortune = bool( + re.search(r'', html) + ) + + def update_all(self): + self.update_citizen_info() + self.update_inventory() def update_inventory(self): """ @@ -290,7 +304,8 @@ class BaseCitizen(access_points.CitizenAPI): solve_data = ApiReturn( **self.post( - "https://erep.lv/captcha/api", data=dict(citizen_id=self.details.citizen_id, src=src, password="CaptchaDevAPI") + "https://erep.lv/captcha/api", + data=dict(citizen_id=self.details.citizen_id, src=src, password="CaptchaDevAPI"), ).json() ) if solve_data["status"]: @@ -311,7 +326,8 @@ class BaseCitizen(access_points.CitizenAPI): def _expire_value_to_python(_expire_value: str) -> Dict[str, Union[int, datetime]]: _data = re.search( - r"((?P\d+) item\(s\) )?[eE]xpires? on Day (?P\d,\d{3}), (?P', r.text): + for id_, name in re.findall( + r'', r.text + ): ret.update({id_: name}) return ret @@ -886,7 +928,9 @@ class BaseCitizen(access_points.CitizenAPI): return bool( re.search( - r'body id="error"|Internal Server Error|' r'CSRF attack detected|meta http-equiv="refresh"|' r"not_authenticated", + r'body id="error"|Internal Server Error|' + r'CSRF attack detected|meta http-equiv="refresh"|' + r"not_authenticated", response.text, ) ) @@ -1085,7 +1129,9 @@ class CitizenTravel(BaseCitizen): ) -> Union[List[Any], Dict[str, Dict[str, Any]]]: return ( self._post_main_travel_data( - holdingId=holding.id if holding else 0, battleId=battle.id if battle else 0, countryId=country.id if country else 0 + holdingId=holding.id if holding else 0, + battleId=battle.id if battle else 0, + countryId=country.id if country else 0, ) .json() .get("regions", []) @@ -1094,13 +1140,18 @@ class CitizenTravel(BaseCitizen): def get_travel_countries(self) -> Set[constants.Country]: warnings.simplefilter("always") warnings.warn( - "CitizenTravel.get_travel_countries() are being deprecated, " "please use BaseCitizen.get_countries_with_regions()", + "CitizenTravel.get_travel_countries() are being deprecated, " + "please use BaseCitizen.get_countries_with_regions()", DeprecationWarning, ) return self.get_countries_with_regions() class CitizenCompanies(BaseCitizen): + def update_all(self): + super().update_all() + self.update_companies() + def employ_employees(self) -> bool: self.update_companies() ret = True @@ -1149,7 +1200,9 @@ class CitizenCompanies(BaseCitizen): if sum(employ_factories.values()) > self.my_companies.work_units: employ_factories = {} - response = self._post_economy_work("production", wam=[c.id for c in wam_list], employ=employ_factories).json() + response = self._post_economy_work( + "production", wam=[c.id for c in wam_list], employ=employ_factories + ).json() return response def update_companies(self): @@ -1187,6 +1240,10 @@ class CitizenCompanies(BaseCitizen): class CitizenEconomy(CitizenTravel): + def update_all(self): + super().update_all() + self.update_money() + def update_money(self, page: int = 0, currency: int = 62): """ Gets monetary market offers to get exact amount of CC and Gold available @@ -1269,7 +1326,8 @@ class CitizenEconomy(CitizenTravel): time_left = timedelta(seconds=house["time_left"]) active_until = utils.good_timedelta(self.now, time_left) self._report_action( - "ACTIVATE_HOUSE", f"Activated {house['name']}. Expires at {active_until.strftime('%F %T')} (after {time_left})" + "ACTIVATE_HOUSE", + f"Activated {house['name']}. Expires at {active_until.strftime('%F %T')} (after {time_left})", ) return True return False @@ -1326,10 +1384,14 @@ class CitizenEconomy(CitizenTravel): price = offer["price"] ret = self._post_economy_marketplace_actions("delete", offer_id=offer_id).json() if ret.get("error"): - self._report_action("ECONOMY_DELETE_OFFER", f"Unable to delete offer: '{ret.get('message')}'", kwargs=offer) + self._report_action( + "ECONOMY_DELETE_OFFER", f"Unable to delete offer: '{ret.get('message')}'", kwargs=offer + ) else: self._report_action( - "ECONOMY_DELETE_OFFER", f"Removed offer for {amount} x {industry} q{q} for {price}cc/each", kwargs=offer + "ECONOMY_DELETE_OFFER", + f"Removed offer for {amount} x {industry} q{q} for {price}cc/each", + kwargs=offer, ) return not ret.get("error") else: @@ -1344,7 +1406,9 @@ class CitizenEconomy(CitizenTravel): _inv_qlt = quality if industry in [1, 2, 3, 4, 23] else 0 final_kind = industry in [1, 2, 4, 23] - items = (self.inventory.final if final_kind else self.inventory.raw).get(constants.INDUSTRIES[industry], {_inv_qlt: {"amount": 0}}) + items = (self.inventory.final if final_kind else self.inventory.raw).get( + constants.INDUSTRIES[industry], {_inv_qlt: {"amount": 0}} + ) if items[_inv_qlt]["amount"] < amount: self.update_inventory() items = (self.inventory.final if final_kind else self.inventory.raw).get( @@ -1358,7 +1422,14 @@ class CitizenEconomy(CitizenTravel): ) return False - data = dict(country_id=self.details.citizenship.id, industry=industry, quality=quality, amount=amount, price=price, buy=False) + data = dict( + country_id=self.details.citizenship.id, + industry=industry, + quality=quality, + amount=amount, + price=price, + buy=False, + ) ret = self._post_economy_marketplace_actions("sell", **data).json() message = f"Posted market offer for {amount}q{quality} " f"{constants.INDUSTRIES[industry]} for price {price}cc" self._report_action("ECONOMY_SELL_PRODUCTS", message, kwargs=ret) @@ -1386,7 +1457,9 @@ class CitizenEconomy(CitizenTravel): self.travel_to_residence() return json_ret - def get_market_offers(self, product_name: str, quality: int = None, country: constants.Country = None) -> Dict[str, classes.OfferItem]: + def get_market_offers( + self, product_name: str, quality: int = None, country: constants.Country = None + ) -> Dict[str, classes.OfferItem]: raw_short_names = dict(frm="foodRaw", wrm="weaponRaw", hrm="houseRaw", arm="airplaneRaw") q1_industries = list(raw_short_names.values()) q5_industries = ["house", "aircraft", "ticket"] @@ -1496,7 +1569,9 @@ class CitizenEconomy(CitizenTravel): self._report_action("BUY_GOLD", "Unable to buy gold!", kwargs=response.json()) return False else: - self._report_action("BUY_GOLD", f"New amount {self.details.cc}cc, {self.details.gold}g", kwargs=response.json()) + self._report_action( + "BUY_GOLD", f"New amount {self.details.cc}cc, {self.details.gold}g", kwargs=response.json() + ) return True def donate_money(self, citizen_id: int = 1620414, amount: float = 0.0, currency: int = 62) -> bool: @@ -1528,13 +1603,17 @@ class CitizenEconomy(CitizenTravel): return self.donate_items(citizen_id, int(amount), industry_id, quality) else: if re.search(r"You do not have enough items in your inventory to make this donation", response.text): - self._report_action("DONATE_ITEMS", f"Unable to donate {amount}q{quality} " f"{industry}, not enough left!") + self._report_action( + "DONATE_ITEMS", f"Unable to donate {amount}q{quality} " f"{industry}, not enough left!" + ) return 0 available = re.search( - r"Cannot transfer the items because the user has only (\d+) free slots in (his|her) storage.", response.text + r"Cannot transfer the items because the user has only (\d+) free slots in (his|her) storage.", + response.text, ).group(1) self._report_action( - "DONATE_ITEMS", f"Unable to donate {amount}q{quality}{industry}" f", receiver has only {available} storage left!" + "DONATE_ITEMS", + f"Unable to donate {amount}q{quality}{industry}" f", receiver has only {available} storage left!", ) self.sleep(5) return self.donate_items(citizen_id, int(available), industry_id, quality) @@ -1550,7 +1629,9 @@ class CitizenEconomy(CitizenTravel): self._report_action("CONTRIBUTE_CC", f"Contributed {amount}cc to {country}'s treasury", kwargs=data) return True else: - self._report_action("CONTRIBUTE_CC", f"Unable to contribute {amount}cc to {country}'s" f" treasury", kwargs=r.json()) + self._report_action( + "CONTRIBUTE_CC", f"Unable to contribute {amount}cc to {country}'s" f" treasury", kwargs=r.json() + ) return False def contribute_food_to_country(self, amount, quality, country: constants.Country) -> bool: @@ -1562,11 +1643,15 @@ class CitizenEconomy(CitizenTravel): r = self._post_main_country_donate(country.id, "currency", amount, quality) if r.json().get("status") or not r.json().get("error"): - self._report_action("CONTRIBUTE_FOOD", f"Contributed {amount}q{quality} food to " f"{country}'s treasury", kwargs=data) + self._report_action( + "CONTRIBUTE_FOOD", f"Contributed {amount}q{quality} food to " f"{country}'s treasury", kwargs=data + ) return True else: self._report_action( - "CONTRIBUTE_FOOD", f"Unable to contribute {amount}q{quality} food to " f"{country}'s treasury", kwargs=r.json() + "CONTRIBUTE_FOOD", + f"Unable to contribute {amount}q{quality} food to " f"{country}'s treasury", + kwargs=r.json(), ) return False @@ -1582,7 +1667,9 @@ class CitizenEconomy(CitizenTravel): self._report_action("CONTRIBUTE_GOLD", f"Contributed {amount}g to {country}'s treasury", kwargs=data) return True else: - self._report_action("CONTRIBUTE_GOLD", f"Unable to contribute {amount}g to {country}'s treasury", kwargs=r.json()) + self._report_action( + "CONTRIBUTE_GOLD", f"Unable to contribute {amount}g to {country}'s treasury", kwargs=r.json() + ) return False def report_money_donation(self, citizen_id: int, amount: float, is_currency: bool = True): @@ -1624,7 +1711,9 @@ class CitizenMedia(BaseCitizen): self._report_action("ARTICLE_ENDORSE", f"Endorsed article ({article_id}) with {amount}cc") return True else: - self._report_action("ARTICLE_ENDORSE", f"Unable to endorse article ({article_id}) with {amount}cc", kwargs=resp) + self._report_action( + "ARTICLE_ENDORSE", f"Unable to endorse article ({article_id}) with {amount}cc", kwargs=resp + ) return False else: return False @@ -1679,782 +1768,842 @@ class CitizenMedia(BaseCitizen): article_data = self.get_article(article_id) if article_data and article_data["articleData"]["canDelete"]: self._report_action( - "ARTICLE_DELETE", f"Attempting to delete article '{article_data['article']['title']}' (#{article_id})", kwargs=article_data + "ARTICLE_DELETE", + f"Attempting to delete article '{article_data['article']['title']}' (#{article_id})", + kwargs=article_data, ) self._get_main_delete_article(article_id) else: self.write_warning(f"Unable to delete article (#{article_id})!") -class CitizenMilitary(CitizenTravel): - all_battles: Dict[int, classes.Battle] = None - __last_war_update_data = None - - active_fs: bool = False - - @property - def as_dict(self): - d = super().as_dict - d.update(active_fs=self.active_fs, all_battles=self.all_battles) - return d - - def update_war_info(self): - if self.__last_war_update_data and self.__last_war_update_data.get("last_updated", 0) + 30 > self.now.timestamp(): - r_json = self.__last_war_update_data - else: - r_json = self._get_military_campaigns_json_list().json() - if r_json.get("countries"): - if self.all_battles is None: - self.all_battles = {} - self.__last_war_update_data = r_json - if r_json.get("battles"): - all_battles = {} - for battle_data in r_json.get("battles", {}).values(): - all_battles[battle_data.get("id")] = classes.Battle(battle_data) - # old_all_battles = self.all_battles - self.all_battles = all_battles - # for battle in old_all_battles.values(): - # utils._clear_up_battle_memory(battle) - - def get_battle_for_war(self, war_id: int) -> Optional[classes.Battle]: - self.update_war_info() - war_info = self.get_war_status(war_id) - return self.all_battles.get(war_info.get("battle_id"), None) - - def get_war_status(self, war_id: int) -> Dict[str, Union[bool, Dict[int, str]]]: - r = self._get_wars_show(war_id) - html = r.text - ret = {} - reg_re = re.compile(fr'data-war-id="{war_id}" data-region-id="(\d+)" data-region-name="([- \w]+)"') - if reg_re.findall(html): - ret.update(regions={}, can_attack=True) - for reg in reg_re.findall(html): - ret["regions"].update({int(reg[0]): reg[1]}) - elif re.search( - r'Join', html - ): - battle_id = re.search( - r'Join', html - ).group(1) - ret.update(can_attack=False, battle_id=int(battle_id)) - elif re.search(r"This war is no longer active.", html): - ret.update(can_attack=False, ended=True) - else: - ret.update(can_attack=False) - return ret - - def get_available_weapons(self, battle_id: int): - return self._get_military_show_weapons(battle_id).json() - - def set_default_weapon(self, battle: classes.Battle, division: classes.BattleDivision) -> int: - available_weapons = self._get_military_show_weapons(battle.id).json() - while not isinstance(available_weapons, list): - available_weapons = self._get_military_show_weapons(battle.id).json() - weapon_quality = -1 - weapon_damage = 0 - if not division.is_air: - for weapon in available_weapons: - try: - if weapon["weaponQuantity"] > 30 and weapon["weaponInfluence"] > weapon_damage: - weapon_quality = int(weapon["weaponId"]) - weapon_damage = weapon["weaponInfluence"] - except ValueError: - pass - return self.change_weapon(battle, weapon_quality, division) - - def change_weapon(self, battle: classes.Battle, quality: int, battle_zone: classes.BattleDivision) -> int: - r = self._post_military_change_weapon(battle.id, battle_zone.id, quality) - influence = r.json().get("weaponInfluence") - self._report_action("MILITARY_WEAPON", f"Switched to q{quality} weapon," f" new influence {influence}", kwargs=r.json()) - return influence - - def sorted_battles(self, sort_by_time: bool = True, only_tp=False) -> List[classes.Battle]: - cs_battles_priority_air: List[classes.Battle] = [] - cs_battles_priority_ground: List[classes.Battle] = [] - cs_battles_air: List[classes.Battle] = [] - cs_battles_ground: List[classes.Battle] = [] - deployed_battles_air: List[classes.Battle] = [] - deployed_battles_ground: List[classes.Battle] = [] - ally_battles_air: List[classes.Battle] = [] - ally_battles_ground: List[classes.Battle] = [] - other_battles_air: List[classes.Battle] = [] - other_battles_ground: List[classes.Battle] = [] - - ret_battles: List[classes.Battle] = [] - if sort_by_time: - battle_list = sorted(self.all_battles.values(), key=lambda b: b.start) - battle_list.reverse() - else: - battle_list = sorted(self.all_battles.values(), key=lambda b: b.id) - - contribution_json = self._get_military_campaigns_json_citizen().json() - contributions: List[Dict[str, int]] = contribution_json.get("contributions") or [] - contributions.sort(key=lambda b: -b.get("damage")) - - for contribution_battle in contributions: - if contribution_battle.get("battle_id") and contribution_battle.get("battle_id") in self.all_battles: - ret_battles.append(self.all_battles[contribution_battle.get("battle_id")]) - - for battle in battle_list: - battle_sides = [battle.invader.country, battle.defender.country] - if battle.id in ret_battles: - continue - # CS Battles - elif self.details.citizenship in battle_sides: - if battle.has_air: - if battle.defender.id == self.details.citizenship: - cs_battles_priority_air.append(battle) - else: - cs_battles_air.append(battle) - else: - if battle.defender.id == self.details.citizenship: - cs_battles_priority_ground.append(battle) - else: - cs_battles_ground.append(battle) - - # Current location battles: - elif self.details.current_country in battle_sides: - if battle.has_air: - deployed_battles_air.append(battle) - else: - deployed_battles_ground.append(battle) - - # Deployed battles and allied battles: - elif self.details.current_country in battle.invader.allies + battle.defender.allies + battle_sides: - if self.details.current_country in battle.invader.deployed + battle.defender.deployed: - if battle.has_air: - deployed_battles_air.append(battle) - else: - deployed_battles_ground.append(battle) - # Allied battles: - else: - if battle.has_air: - ally_battles_air.append(battle) - else: - ally_battles_ground.append(battle) - else: - if battle.has_air: - other_battles_air.append(battle) - else: - other_battles_ground.append(battle) - - cs_battles = cs_battles_priority_air + cs_battles_priority_ground + cs_battles_air + cs_battles_ground - if only_tp: - return cs_battles - deployed_battles = deployed_battles_air + deployed_battles_ground - other_battles = ally_battles_air + ally_battles_ground + other_battles_air + other_battles_ground - ret_battles = ret_battles + cs_battles + deployed_battles + other_battles - return ret_battles - - def get_cheap_tp_divisions(self) -> Dict[str, List[Tuple[int, classes.BattleDivision]]]: - air_divs: List[Tuple[int, classes.BattleDivision]] = [] - ground_divs: List[Tuple[int, classes.BattleDivision]] = [] - check_maverick = self.maverick and self.config.maverick - for battle in reversed(self.sorted_battles(True, True)): - for division in battle.div.values(): - is_start_ok = utils.good_timedelta(division.battle.start, timedelta(minutes=-1)) < self.now - if not division.terrain and is_start_ok and not division.div_end: - if division.is_air and self.config.air: - division_medals = self.get_battle_round_data(division) - medal = division_medals[self.details.citizenship == division.battle.defender.country] - if not medal: - air_divs.append((0, division)) - else: - air_divs.append((medal.get("1").get("raw_value"), division)) - elif not division.is_air and self.config.ground: - if not division.div == self.division and not check_maverick: - continue - division_medals = self.get_battle_round_data(division) - medal = division_medals[self.details.citizenship == division.battle.defender.country] - if not medal: - ground_divs.append((0, division)) - else: - ground_divs.append((medal.get("1").get("raw_value"), division)) - - air_divs.sort(key=lambda z: (z[0], z[1].battle.start)) - ground_divs.sort(key=lambda z: (z[0], z[1].battle.start)) - return {"air": air_divs, "ground": ground_divs} - - @property - def has_battle_contribution(self): - return bool(self.__last_war_update_data.get("citizen_contribution", [])) - - def find_battle_to_fight(self, silent: bool = False) -> Tuple[classes.Battle, classes.BattleDivision, classes.BattleSide]: - self.update_war_info() - for battle in self.sorted_battles(self.config.sort_battles_time): - if not isinstance(battle, classes.Battle): - continue - if battle.is_dict_lib: - continue - battle_zone: Optional[classes.BattleDivision] = None - for div in battle.div.values(): - if div.terrain == 0: - if div.div_end: - continue - maverick_ok = self.maverick and self.config.maverick - if self.config.air and div.is_air: - battle_zone = div - break - elif self.config.ground and not div.is_air and (div.div == self.division or maverick_ok): - battle_zone = div - break - else: - continue - if not battle_zone: - continue - allies = battle.invader.deployed + battle.defender.deployed + [battle.invader.country, battle.defender.country] - - travel_needed = self.details.current_country not in allies - - if battle.is_rw: - side = battle.defender if self.config.rw_def_side else battle.invader - else: - defender_side = self.details.current_country in battle.defender.allies + [ - battle.defender.country, - ] - side = battle.defender if defender_side else battle.invader - - if not silent: - self.write_log(str(battle)) - - travel = (self.config.travel_to_fight and self.should_travel_to_fight() or self.config.force_travel) if travel_needed else True - - if not travel: - continue - yield battle, battle_zone, side - - def find_battle_and_fight(self): - count = self.should_fight()[0] - if count: - self.write_log("Checking for battles to fight in...") - for battle, division, side in self.find_battle_to_fight(): - - allies = battle.invader.deployed + battle.defender.deployed + [battle.invader.country, battle.defender.country] - - travel_needed = self.details.current_country not in allies - - if battle.start > self.now: - self.sleep(utils.get_sleep_seconds(battle.start)) - - if travel_needed and not self.change_division(battle, division, side): - break - - if self.change_division(battle, division): - self.set_default_weapon(battle, division) - self.fight(battle, division, side, count) - self.travel_to_residence() - break - - def fight( - self, - battle: classes.Battle, - division: classes.BattleDivision, - side: classes.BattleSide = None, - count: int = None, - use_ebs: bool = False, - ) -> Optional[int]: - """Fight in a battle. - - Will auto activate booster and travel if allowed to do it. - :param battle: Battle battle to fight in - :type battle: Battle - :param division: Division number to fight in available choices - :type division: BattleDivision - :param side: BattleSide or None. Battle side to fight in, If side not == invader id or not in invader deployed - allies list, then defender's side is chosen - :type side: BattleSide - :param count: How many hits to do, if not specified self.should_fight() is called. - :type count: int - :param use_ebs: Should use energy bars if count > 0 and not enough food_fights - :type use_ebs: bool - :return: None if no errors while fighting, otherwise error count. - :rtype: int - """ - if self.restricted_ip: - self.write_warning("Fighting is not allowed from restricted IP!") - self._report_action("IP_BLACKLISTED", "Fighting is not allowed from restricted IP!") - return 1 - if not division.is_air and self.config.boosters: - self.activate_damage_booster(not division.is_air) - if side is None: - side = battle.defender if self.details.citizenship in battle.defender.allies + [battle.defender.country] else battle.invader - if count is None: - count = self.should_fight()[0] - - self.write_log(f"Fighting in battle for {battle.region_name} on {side} side in d{division.div}") - - if self.now < utils.localize_dt(datetime(2021, 2, 8)): - error_count = total_damage = total_hits = 0 - ok_to_fight = True - while ok_to_fight and error_count < 10 and count > 0: - while all((count > 0, error_count < 10, self.energy.energy >= 50)): - hits, error, damage = self._shoot(battle, division, side) - count -= hits - total_hits += hits - total_damage += damage - error_count += error - else: - if self.energy.energy < 50 or error_count >= 10 or count <= 0: - self.write_log(f"Hits: {total_hits:>4} | Damage: {total_damage}") - ok_to_fight = False - if total_damage: - self.report_fighting(battle, not side.is_defender, division, total_damage, total_hits) - return error_count - else: - deployment_id = self.deploy(division, side, count * 10) - self.sleep(count // 3) # TODO: connect to eRepublik's WS and get from there when deploy ends - energy_used = 0 - if deployment_id: - self.write_warning( - "If eRepublik responds with HTTP 500 Internal Error, it is kind of ok, because deployment has not finished yet." - ) - deployment_data = self._post_military_fight_deploy_deploy_report_data(deployment_id).json() - if not deployment_data.get("error"): - data = deployment_data["data"] - total_damage = int(data["damage"].replace(",", "")) - energy_used = int(data["energySpent"].replace(",", "")) - self.details.pp += int(data["rewards"]["prestigePoints"].replace(",", "")) - self.report_fighting(battle, not side.is_defender, division, total_damage, energy_used // 10) - return energy_used - - def _shoot(self, battle: classes.Battle, division: classes.BattleDivision, side: classes.BattleSide): - if division.is_air: - response = self._post_military_fight_air(battle.id, side.id, division.id) - else: - response = self._post_military_fight_ground(battle.id, side.id, division.id) - - if "Zone is not meant for " in response.text: - self.sleep(5) - return 0, 1, 0 - try: - r_json = response.json() - except (ValueError, HTTPError, RequestException): - return 0, 10, 0 - hits = 0 - damage = 0 - err = False - if r_json.get("error"): - if r_json.get("message") == "SHOOT_LOCKOUT": - pass - elif r_json.get("message") == "NOT_ENOUGH_WEAPONS": - self.set_default_weapon(battle, division) - elif r_json.get("message") == "Cannot activate a zone with a non-native division": - self.write_warning("Wrong division!!") - return 0, 10, 0 - elif r_json.get("message") == "ZONE_INACTIVE": - self.write_warning("Wrong division!!") - return 0, 10, 0 - elif r_json.get("message") == "NON_BELLIGERENT": - self.write_warning("Dictatorship/Liberation wars are not supported!") - return 0, 10, 0 - elif r_json.get("message") in ["FIGHT_DISABLED", "DEPLOYMENT_MODE"]: - self._post_main_profile_update("options", params='{"optionName":"enable_web_deploy","optionValue":"off"}') - self.set_default_weapon(battle, division) - else: - if r_json.get("message") == "UNKNOWN_SIDE": - self._rw_choose_side(battle, side) - elif r_json.get("message") == "CHANGE_LOCATION": - countries = [side.country] + side.deployed - self.travel_to_battle(battle, countries) - err = True - elif r_json.get("message") == "ENEMY_KILLED": - # Non-InfantryKit players - if r_json["user"]["earnedXp"]: - hits = r_json["user"]["earnedXp"] - # InfantryKit player - # The almost always safe way (breaks on levelup hit) - elif self.energy.energy >= r_json["details"]["wellness"]: # Haven't reached levelup - hits = (self.energy.energy - r_json["details"]["wellness"]) // 10 - else: - hits = r_json["hits"] - if r_json["user"]["epicBattle"]: - hits /= 1 + r_json["user"]["epicBattle"] - - self.energy.energy = r_json["details"]["wellness"] - self.details.xp = int(r_json["details"]["points"]) - damage = r_json["user"]["givenDamage"] * (1.1 if r_json["oldEnemy"]["isNatural"] else 1) - else: - err = True - - return hits, err, damage - - def deploy_bomb( - self, battle: classes.Battle, division: classes.BattleDivision, bomb_id: int, inv_side: bool, count: int = 1 - ) -> Optional[int]: - """Deploy bombs in a battle for given side. - - :param battle: Battle - :type battle: classes.Battle - :param division: BattleDivision - :type division: classes.BattleDivision - :param bomb_id: int bomb id - :type bomb_id: int - :param inv_side: should deploy on invader side - :type inv_side: bool - :param count: how many bombs to deploy - :type count: int - :return: Deployed count - :rtype: int - """ - - if not isinstance(count, int) or count < 1: - count = 1 - has_traveled = False - if battle.is_rw: - has_traveled = self.travel_to_battle(battle, [battle.defender.country]) - self._rw_choose_side(battle, battle.invader if inv_side else battle.defender) - if inv_side: - good_countries = [battle.invader.country] + battle.invader.deployed - if self.details.current_country not in good_countries: - has_traveled = self.travel_to_battle(battle, good_countries) - else: - involved = [battle.invader.country, battle.defender.country] + battle.invader.deployed + battle.defender.deployed - if self.details.current_country not in involved: - count = 0 - side = battle.invader if inv_side else battle.defender - errors = deployed_count = 0 - while (not deployed_count == count) and errors < 10: - r = self._post_military_deploy_bomb(battle.id, division.id, side.id, bomb_id).json() - if not r.get("error"): - deployed_count += 1 - self.sleep(0.5) - elif r.get("message") == "LOCKED": - self.sleep(0.5) - elif r.get("message") == "INVALID_BOMB": - errors = 10 - else: - errors += 1 - - if has_traveled: - self.travel_to_residence() - - self._report_action("MILITARY_BOMB", f"Deployed {deployed_count} bombs in battle {battle.id}") - return deployed_count - - def change_division(self, battle: classes.Battle, division: classes.BattleDivision, side: classes.BattleSide = None) -> bool: - """Change division. - - :param battle: classes.Battle - :type battle: classes.Battle - :param division: int target division to switch to - :type division: classes.BattleDivision - :param side: Side to choose - :type side: classes.BattleSide - :return: - """ - resp = self._post_main_battlefield_change_division(battle.id, division.id, side.id if side else None) - if resp.json().get("error"): - self.write_warning(resp.json().get("message")) - return False - self._report_action("MILITARY_DIV_SWITCH", f"Switched to d{division.div} in battle {battle.id}", kwargs=resp.json()) - return True - - def get_ground_hit_dmg_value( - self, - rang: int = None, - strength: float = None, - elite: bool = None, - ne: bool = False, - booster_50: bool = False, - booster_100: bool = False, - tp: bool = True, - ) -> Decimal: - if not rang or not strength or elite is None: - r = self._get_main_citizen_profile_json(self.details.citizen_id).json() - if not rang: - rang = r["military"]["militaryData"]["ground"]["rankNumber"] - if not strength: - strength = r["military"]["militaryData"]["ground"]["strength"] - if elite is None: - elite = r["citizenAttributes"]["level"] > 100 - if ne: - tp = True - - return utils.calculate_hit(strength, rang, tp, elite, ne, 50 if booster_50 else 100 if booster_100 else 0) - - def get_air_hit_dmg_value(self, rang: int = None, elite: bool = None, ne: bool = False, weapon: bool = False) -> Decimal: - if not rang or elite is None: - r = self._get_main_citizen_profile_json(self.details.citizen_id).json() - if not rang: - rang = r["military"]["militaryData"]["aircraft"]["rankNumber"] - if elite is None: - elite = r["citizenAttributes"]["level"] > 100 - - return utils.calculate_hit(0, rang, True, elite, ne, 0, 20 if weapon else 0) - - def activate_damage_booster(self, ground: bool = True) -> int: - kind = "damage" if ground else "aircraftDamage" - if self.config.boosters and not self.get_active_damage_booster(ground): - booster: Optional[types.InvFinalItem] = None - for quality, data in sorted(self.inventory.boosters.get(kind, {}).items(), key=lambda x: x[0]): - for _duration, _booster in sorted(data.items(), key=lambda y: y[0]): - critical_amount = 2 if quality < 10 and ground else 10 - if _booster.get("amount") > critical_amount: - booster = _booster - break - break - if booster: - kind = "damage" if ground else "air_damage" - self._report_action("MILITARY_BOOSTER", f"Activated {booster['name']}") - resp = self._post_economy_activate_booster(booster["quality"], booster["durability"], kind).json() - self._update_inventory_data(resp) - return self.get_active_damage_booster(ground) - - def get_active_damage_booster(self, ground: bool = True) -> int: - kind = "damage" if ground else "aircraftDamage" - boosters = self.inventory.active.get(kind, {}) - quality = 0 - for q, boost in boosters.items(): - if boost["quality"] * 10 > quality: - quality = boost["quality"] * 10 - return quality - - def get_active_ground_damage_booster(self) -> int: - return self.get_active_damage_booster(True) - - def get_active_air_damage_booster(self) -> int: - return self.get_active_damage_booster(False) - - def activate_battle_effect(self, battle_id: int, kind: str) -> bool: - self._report_action("MILITARY_BOOSTER", f"Activated {kind} booster") - resp = self._post_main_activate_battle_effect(battle_id, kind, self.details.citizen_id).json() - return not resp.get("error") - - def activate_pp_booster(self, pp_item: types.InvFinalItem) -> bool: - self._report_action("MILITARY_BOOSTER", f'Activated {pp_item["name"]}') - resp = self._post_economy_activate_booster(pp_item["quality"], pp_item["durability"], "prestige_points").json() - self._update_inventory_data(resp) - return pp_item.get("kind") in self.inventory.active - - def _rw_choose_side(self, battle: classes.Battle, side: classes.BattleSide) -> Response: - return self._post_main_battlefield_travel(side.id, battle.id) - - def should_travel_to_fight(self) -> bool: - ret = False - if self.config.always_travel: - ret = True - elif self.should_do_levelup: # Do levelup - ret = True - # Get to next Energy +1 - elif self.next_reachable_energy and self.config.next_energy: - ret = True - # 1h worth of energy - elif self.energy.energy + self.energy.interval * 3 >= self.energy.limit: - ret = True - return ret - - def should_fight(self) -> Tuple[int, str, bool]: - """Checks if citizen should fight at this moment - :rtype: Tuple[int, str, bool] - """ - count = 0 - force_fight = False - msg = "Fighting not allowed!" - if not self.config.fight: - return count, msg, force_fight - - # Do levelup - if self.is_levelup_reachable: - msg = "Level up" - if self.should_do_levelup: - count = (self.energy.limit * 3) // 10 - force_fight = True - else: - self.write_log("Waiting for fully recovered energy before leveling up.") - - # Levelup reachable - elif self.is_levelup_close: - count = self.details.xp_till_level_up - (self.energy.limit // 10) + 5 - msg = "Fighting for close Levelup. Doing %i hits" % count - force_fight = True - - elif self.details.pp < 75: - count = 75 - self.details.pp - msg = "Obligatory fighting for at least 75pp" - force_fight = True - - elif self.config.continuous_fighting and self.has_battle_contribution: - count = self.energy.food_fights - msg = "Continuing to fight in previous battle" - - # All-in (type = all-in and full ff) - elif self.config.all_in and self.energy.energy + self.energy.interval * 3 >= self.energy.limit: - count = self.energy.food_fights - msg = "Fighting all-in. Doing %i hits" % count - - # Get to next Energy +1 - elif self.config.next_energy and self.next_reachable_energy: - count = self.next_reachable_energy - msg = "Fighting for +1 energy. Doing %i hits" % count - - # 1h worth of energy - elif self.energy.energy + self.energy.interval * 3 >= self.energy.limit: - count = self.energy.interval - msg = "Fighting for 1h energy. Doing %i hits" % count - force_fight = True - - return (count if count > 0 else 0), msg, force_fight - - def get_battle_round_data(self, division: classes.BattleDivision) -> Tuple[Any, Any]: - battle = division.battle - - r = self._post_military_battle_console( - battle.id, - "battleStatistics", - 1, - zoneId=battle.zone_id, - round_id=battle.zone_id, - division=division.div, - battleZoneId=division.id, - type="damage", - ) - r_json = r.json() - return r_json.get(str(battle.invader.id)).get("fighterData"), r_json.get(str(battle.defender.id)).get("fighterData") - - def get_battle_division_stats(self, division: classes.BattleDivision) -> Dict[str, Any]: - battle = division.battle - r = self._get_military_battle_stats(battle.id, division.div, division.id) - return r.json() - - def get_division_max_hit(self, division: classes.BattleDivision) -> int: - """Returns max hit in division for current side (if not on either side returns 0) - - :param division: BattleDivision for which to get max hit value - :type division: classes.BattleDivision - :return: max hit value - :rtype: int - """ - return self.get_battle_division_stats(division).get("maxHit", -1) - - def schedule_attack(self, war_id: int, region_id: int, region_name: str, at_time: datetime): - if at_time: - self.sleep(utils.get_sleep_seconds(at_time)) - self.get_csrf_token() - self.launch_attack(war_id, region_id, region_name) - - def get_active_wars(self, country: constants.Country = None) -> List[int]: - r = self._get_country_military(country.link if country else self.details.citizenship.link) - all_war_ids = re.findall(r'//www\.erepublik\.com/en/wars/show/(\d+)"', r.text) - return [int(wid) for wid in all_war_ids] - - def get_last_battle_of_war_end_time(self, war_id: int) -> datetime: - r = self._get_wars_show(war_id) - html = r.text - last_battle_id = int(re.search(r'', html).group(1)) - console = self._post_military_battle_console(last_battle_id, "warList", 1).json() - battle = console.get("list")[0] - return utils.localize_dt(datetime.strptime(battle.get("result").get("end"), "%Y-%m-%d %H:%M:%S")) - - def launch_attack(self, war_id: int, region_id: int, region_name: str): - self._post_wars_attack_region(war_id, region_id, region_name) - self._report_action("MILITARY_QUEUE_ATTACK", f"Battle for *{region_name}* queued") - - def get_country_mus(self, country: constants.Country) -> Dict[int, str]: - ret = {} - r = self._get_main_leaderboards_damage_rankings(country.id) - for data in r.json()["mu_filter"]: - if data["id"]: - ret.update({data["id"]: data["name"]}) - r = self._get_main_leaderboards_damage_aircraft_rankings(country.id) - for data in r.json()["mu_filter"]: - if data["id"]: - ret.update({data["id"]: data["name"]}) - return ret - - def get_mu_members(self, mu_id: int) -> Dict[int, str]: - ret = {} - r = self._get_military_unit_data(mu_id) - - for page in range(int(r.json()["panelContents"]["pages"])): - r = self._get_military_unit_data(mu_id, currentPage=page + 1) - for user in r.json()["panelContents"]["members"]: - if not user["isDead"]: - ret.update({user["citizenId"]: user["name"]}) - return ret - - def get_citizen_weekly_daily_orders_done(self, citizen_id: int = None, weeks_ago: int = 0) -> int: - if citizen_id is None: - citizen_id = self.details.citizen_id - profile = self._get_main_citizen_profile_json(citizen_id).json() - mu_id = profile.get("military", {}).get("militaryUnit", {}).get("id", 0) - if mu_id: - name = profile.get("citizen", {}).get("name") - member = self._get_military_unit_data( - mu_id, currentPage=1, panel="members", sortBy="dailyOrdersCompleted", weekFilter=f"week{weeks_ago}", search=name - ).json() - return member.get("panelContents", {}).get("members", [{}])[0].get("dailyOrdersCompleted") - return 0 - - def get_possibly_empty_medals(self): - self.update_war_info() - for battle in self.all_battles.values(): - for division in battle.div.values(): - if division.wall["dom"] == 50 or division.wall["dom"] > 98: - yield division, division.wall["for"] == battle.invader.country.id - - def report_fighting(self, battle: classes.Battle, invader: bool, division: classes.BattleDivision, damage: float, hits: int): - self.reporter.report_fighting(battle, invader, division, damage, hits) - if self.config.telegram: - self.telegram.report_fight(battle, invader, division, damage, hits) - - def get_deploy_inventory(self, division: classes.BattleDivision, side: classes.BattleSide): - ret = self._post_fight_deploy_get_inventory(division.battle.id, side.id, division.id).json() - # if ret.get('recoverableEnergyBuyFood'): - # self.buy_food() - # return self.get_deploy_inventory(division, side) - if ret.get("captcha"): - self.do_captcha_challenge() - if ret.get("error"): - if ret.get("message") == "Deployment disabled.": - self._post_main_profile_update("options", params='{"optionName":"enable_web_deploy","optionValue":"on"}') - return self.get_deploy_inventory(division, side) - else: - self.report_error(f"Unable to get deployment inventory because: {ret.get('message')}") - return ret - - def deploy(self, division: classes.BattleDivision, side: classes.BattleSide, energy: int, _retry=0) -> int: - _energy = int(energy) - deploy_inv = self.get_deploy_inventory(division, side) - if not deploy_inv["minEnergy"] <= energy <= deploy_inv["maxEnergy"]: - return 0 - energy_sources = {} - source_idx = 0 - recoverable = deploy_inv["recoverableEnergy"] - for source in reversed(sorted(deploy_inv["energySources"], key=lambda s: (s["type"], s.get("quality", 0)))): - if source["type"] == "pool": - _energy -= source["energy"] - elif source["type"] in ["food", "energy_bar"]: - recovers = source["energy"] // source["amount"] - amount = (recoverable if source["type"] == "food" else _energy) // recovers - amount = amount if amount < source["amount"] else source["amount"] - if amount > 0: - energy_sources.update({f"energySources[{source_idx}][quality]": source["quality"]}) - energy_sources.update({f"energySources[{source_idx}][amount]": amount}) - source_idx += 1 - used_energy = amount * recovers - recoverable -= used_energy - _energy -= used_energy - if _energy <= 0: - break - if _energy > 0: - energy -= _energy - weapon_q = -1 - weapon_strength = 0 - if not division.is_air: - for weapon in sorted(deploy_inv["weapons"], key=lambda w: w["damageperHit"]): - if (weapon["damageperHit"] or 0) > weapon_strength and (weapon["amount"] or 0) > 50: - weapon_q = weapon["quality"] - r = self._post_fight_deploy_start_deploy(division.battle.id, side.id, division.id, energy, weapon_q, **energy_sources).json() - if r.get("error"): - self.report_error(f"Deploy failed: '{r.get('message')}'") - if r.get("message") == "Deployment disabled.": - self._post_main_profile_update("options", params='{"optionName":"enable_web_deploy","optionValue":"on"}') - if _retry < 5: - return self.deploy(division, side, energy, _retry + 1) - else: - self.report_error("Unable to deploy 5 times!") - return 0 - return r.get("deploymentId") +# TODO: Fix fighting and reimplement CitizenMilitary module +# class CitizenMilitary(CitizenTravel): +# all_battles: Dict[int, classes.Battle] = None +# __last_war_update_data = None +# +# active_fs: bool = False +# +# @property +# def as_dict(self): +# d = super().as_dict +# d.update(active_fs=self.active_fs, all_battles=self.all_battles) +# return d +# def update_all(): +# super().update_all() +# self.update_war_info() +# +# def update_war_info(self): +# if ( +# self.__last_war_update_data +# and self.__last_war_update_data.get("last_updated", 0) + 30 > self.now.timestamp() +# ): +# r_json = self.__last_war_update_data +# else: +# r_json = self._get_military_campaigns_json_list().json() +# if r_json.get("countries"): +# if self.all_battles is None: +# self.all_battles = {} +# self.__last_war_update_data = r_json +# if r_json.get("battles"): +# all_battles = {} +# for battle_data in r_json.get("battles", {}).values(): +# all_battles[battle_data.get("id")] = classes.Battle(battle_data) +# # old_all_battles = self.all_battles +# self.all_battles = all_battles +# # for battle in old_all_battles.values(): +# # utils._clear_up_battle_memory(battle) +# +# def get_battle_for_war(self, war_id: int) -> Optional[classes.Battle]: +# self.update_war_info() +# war_info = self.get_war_status(war_id) +# return self.all_battles.get(war_info.get("battle_id"), None) +# +# def get_war_status(self, war_id: int) -> Dict[str, Union[bool, Dict[int, str]]]: +# r = self._get_wars_show(war_id) +# html = r.text +# ret = {} +# reg_re = re.compile(fr'data-war-id="{war_id}" data-region-id="(\d+)" data-region-name="([- \w]+)"') +# if reg_re.findall(html): +# ret.update(regions={}, can_attack=True) +# for reg in reg_re.findall(html): +# ret["regions"].update({int(reg[0]): reg[1]}) +# elif re.search( +# r'Join', +# html, +# ): +# battle_id = re.search( +# r'Join', +# html, +# ).group(1) +# ret.update(can_attack=False, battle_id=int(battle_id)) +# elif re.search(r"This war is no longer active.", html): +# ret.update(can_attack=False, ended=True) +# else: +# ret.update(can_attack=False) +# return ret +# +# def get_available_weapons(self, battle_id: int): +# return self._get_military_show_weapons(battle_id).json() +# +# def set_default_weapon(self, battle: classes.Battle, division: classes.BattleDivision) -> int: +# available_weapons = self._get_military_show_weapons(battle.id).json() +# while not isinstance(available_weapons, list): +# available_weapons = self._get_military_show_weapons(battle.id).json() +# weapon_quality = -1 +# weapon_damage = 0 +# if not division.is_air: +# for weapon in available_weapons: +# try: +# if weapon["weaponQuantity"] > 30 and weapon["weaponInfluence"] > weapon_damage: +# weapon_quality = int(weapon["weaponId"]) +# weapon_damage = weapon["weaponInfluence"] +# except ValueError: +# pass +# return self.change_weapon(battle, weapon_quality, division) +# +# def change_weapon(self, battle: classes.Battle, quality: int, battle_zone: classes.BattleDivision) -> int: +# r = self._post_military_change_weapon(battle.id, battle_zone.id, quality) +# influence = r.json().get("weaponInfluence") +# self._report_action( +# "MILITARY_WEAPON", f"Switched to q{quality} weapon," f" new influence {influence}", kwargs=r.json() +# ) +# return influence +# +# def sorted_battles(self, sort_by_time: bool = True, only_tp=False) -> List[classes.Battle]: +# cs_battles_priority_air: List[classes.Battle] = [] +# cs_battles_priority_ground: List[classes.Battle] = [] +# cs_battles_air: List[classes.Battle] = [] +# cs_battles_ground: List[classes.Battle] = [] +# deployed_battles_air: List[classes.Battle] = [] +# deployed_battles_ground: List[classes.Battle] = [] +# ally_battles_air: List[classes.Battle] = [] +# ally_battles_ground: List[classes.Battle] = [] +# other_battles_air: List[classes.Battle] = [] +# other_battles_ground: List[classes.Battle] = [] +# +# ret_battles: List[classes.Battle] = [] +# if sort_by_time: +# battle_list = sorted(self.all_battles.values(), key=lambda b: b.start) +# battle_list.reverse() +# else: +# battle_list = sorted(self.all_battles.values(), key=lambda b: b.id) +# +# contribution_json = self._get_military_campaigns_json_citizen().json() +# contributions: List[Dict[str, int]] = contribution_json.get("contributions") or [] +# contributions.sort(key=lambda b: -b.get("damage")) +# +# for contribution_battle in contributions: +# if contribution_battle.get("battle_id") and contribution_battle.get("battle_id") in self.all_battles: +# ret_battles.append(self.all_battles[contribution_battle.get("battle_id")]) +# +# for battle in battle_list: +# battle_sides = [battle.invader.country, battle.defender.country] +# if battle.id in ret_battles: +# continue +# # CS Battles +# elif self.details.citizenship in battle_sides: +# if battle.has_air: +# if battle.defender.id == self.details.citizenship: +# cs_battles_priority_air.append(battle) +# else: +# cs_battles_air.append(battle) +# else: +# if battle.defender.id == self.details.citizenship: +# cs_battles_priority_ground.append(battle) +# else: +# cs_battles_ground.append(battle) +# +# # Current location battles: +# elif self.details.current_country in battle_sides: +# if battle.has_air: +# deployed_battles_air.append(battle) +# else: +# deployed_battles_ground.append(battle) +# +# # Deployed battles and allied battles: +# elif self.details.current_country in battle.invader.allies + battle.defender.allies + battle_sides: +# if self.details.current_country in battle.invader.deployed + battle.defender.deployed: +# if battle.has_air: +# deployed_battles_air.append(battle) +# else: +# deployed_battles_ground.append(battle) +# # Allied battles: +# else: +# if battle.has_air: +# ally_battles_air.append(battle) +# else: +# ally_battles_ground.append(battle) +# else: +# if battle.has_air: +# other_battles_air.append(battle) +# else: +# other_battles_ground.append(battle) +# +# cs_battles = cs_battles_priority_air + cs_battles_priority_ground + cs_battles_air + cs_battles_ground +# if only_tp: +# return cs_battles +# deployed_battles = deployed_battles_air + deployed_battles_ground +# other_battles = ally_battles_air + ally_battles_ground + other_battles_air + other_battles_ground +# ret_battles = ret_battles + cs_battles + deployed_battles + other_battles +# return ret_battles +# +# def get_cheap_tp_divisions(self) -> Dict[str, List[Tuple[int, classes.BattleDivision]]]: +# air_divs: List[Tuple[int, classes.BattleDivision]] = [] +# ground_divs: List[Tuple[int, classes.BattleDivision]] = [] +# check_maverick = self.maverick and self.config.maverick +# for battle in reversed(self.sorted_battles(True, True)): +# for division in battle.div.values(): +# is_start_ok = utils.good_timedelta(division.battle.start, timedelta(minutes=-1)) < self.now +# if not division.terrain and is_start_ok and not division.div_end: +# if division.is_air and self.config.air: +# division_medals = self.get_battle_round_data(division) +# medal = division_medals[self.details.citizenship == division.battle.defender.country] +# if not medal: +# air_divs.append((0, division)) +# else: +# air_divs.append((medal.get("1").get("raw_value"), division)) +# elif not division.is_air and self.config.ground: +# if not division.div == self.division and not check_maverick: +# continue +# division_medals = self.get_battle_round_data(division) +# medal = division_medals[self.details.citizenship == division.battle.defender.country] +# if not medal: +# ground_divs.append((0, division)) +# else: +# ground_divs.append((medal.get("1").get("raw_value"), division)) +# +# air_divs.sort(key=lambda z: (z[0], z[1].battle.start)) +# ground_divs.sort(key=lambda z: (z[0], z[1].battle.start)) +# return {"air": air_divs, "ground": ground_divs} +# +# @property +# def has_battle_contribution(self): +# return bool(self.__last_war_update_data.get("citizen_contribution", [])) +# +# def find_battle_to_fight( +# self, silent: bool = False +# ) -> Tuple[classes.Battle, classes.BattleDivision, classes.BattleSide]: +# self.update_war_info() +# for battle in self.sorted_battles(self.config.sort_battles_time): +# if not isinstance(battle, classes.Battle): +# continue +# if battle.is_dict_lib: +# continue +# battle_zone: Optional[classes.BattleDivision] = None +# for div in battle.div.values(): +# if div.terrain == 0: +# if div.div_end: +# continue +# maverick_ok = self.maverick and self.config.maverick +# if self.config.air and div.is_air: +# battle_zone = div +# break +# elif self.config.ground and not div.is_air and (div.div == self.division or maverick_ok): +# battle_zone = div +# break +# else: +# continue +# if not battle_zone: +# continue +# allies = ( +# battle.invader.deployed + battle.defender.deployed + [battle.invader.country, battle.defender.country] +# ) +# +# travel_needed = self.details.current_country not in allies +# +# if battle.is_rw: +# side = battle.defender if self.config.rw_def_side else battle.invader +# else: +# defender_side = self.details.current_country in battle.defender.allies + [ +# battle.defender.country, +# ] +# side = battle.defender if defender_side else battle.invader +# +# if not silent: +# self.write_log(str(battle)) +# +# travel = ( +# (self.config.travel_to_fight and self.should_travel_to_fight() or self.config.force_travel) +# if travel_needed +# else True +# ) +# +# if not travel: +# continue +# yield battle, battle_zone, side +# +# def find_battle_and_fight(self): +# count = self.should_fight()[0] +# if count: +# self.write_log("Checking for battles to fight in...") +# for battle, division, side in self.find_battle_to_fight(): +# +# allies = ( +# battle.invader.deployed +# + battle.defender.deployed +# + [battle.invader.country, battle.defender.country] +# ) +# +# travel_needed = self.details.current_country not in allies +# +# if battle.start > self.now: +# self.sleep(utils.get_sleep_seconds(battle.start)) +# +# if travel_needed and not self.change_division(battle, division, side): +# break +# +# if self.change_division(battle, division): +# self.set_default_weapon(battle, division) +# self.fight(battle, division, side, count) +# self.travel_to_residence() +# break +# +# def fight( +# self, +# battle: classes.Battle, +# division: classes.BattleDivision, +# side: classes.BattleSide = None, +# count: int = None, +# ) -> Optional[int]: +# """Fight in a battle. +# +# Will auto activate booster and travel if allowed to do it. +# :param battle: Battle battle to fight in +# :type battle: Battle +# :param division: Division number to fight in available choices +# :type division: BattleDivision +# :param side: BattleSide or None. Battle side to fight in, If side not == invader id or not in invader deployed +# allies list, then defender's side is chosen +# :type side: BattleSide +# :param count: How many hits to do, if not specified self.should_fight() is called. +# :type count: int +# :param use_ebs: Should use energy bars if count > 0 and not enough food_fights +# :type use_ebs: bool +# :return: None if no errors while fighting, otherwise error count. +# :rtype: int +# """ +# if self.restricted_ip: +# self.write_warning("Fighting is not allowed from restricted IP!") +# self._report_action("IP_BLACKLISTED", "Fighting is not allowed from restricted IP!") +# return 1 +# if not division.is_air and self.config.boosters: +# self.activate_damage_booster(not division.is_air) +# if side is None: +# side = ( +# battle.defender +# if self.details.citizenship in battle.defender.allies + [battle.defender.country] +# else battle.invader +# ) +# if count is None: +# count = self.should_fight()[0] +# +# self.write_log(f"Fighting in battle for {battle.region_name} on {side} side in d{division.div}") +# +# if self.now < utils.localize_dt(datetime(2021, 2, 8)): +# error_count = total_damage = total_hits = 0 +# ok_to_fight = True +# while ok_to_fight and error_count < 10 and count > 0: +# while all((count > 0, error_count < 10, self.energy.energy >= 50)): +# hits, error, damage = self._shoot(battle, division, side) +# count -= hits +# total_hits += hits +# total_damage += damage +# error_count += error +# else: +# if self.energy.energy < 50 or error_count >= 10 or count <= 0: +# self.write_log(f"Hits: {total_hits:>4} | Damage: {total_damage}") +# ok_to_fight = False +# if total_damage: +# self.report_fighting(battle, not side.is_defender, division, total_damage, total_hits) +# return error_count +# else: +# deployment_id = self.deploy(division, side, count * 10) +# self.sleep(count // 3) # TODO: connect to eRepublik's WS and get from there when deploy ends +# energy_used = 0 +# if deployment_id: +# self.write_warning( +# "If eRepublik responds with HTTP 500 Internal Error, " +# "it is kind of ok, because deployment has not finished yet." +# ) +# deployment_data = self._post_military_fight_deploy_deploy_report_data(deployment_id).json() +# if not deployment_data.get("error"): +# data = deployment_data["data"] +# total_damage = int(data["damage"].replace(",", "")) +# energy_used = int(data["energySpent"].replace(",", "")) +# self.details.pp += int(data["rewards"]["prestigePoints"].replace(",", "")) +# self.report_fighting(battle, not side.is_defender, division, total_damage, energy_used // 10) +# return energy_used +# +# def _shoot(self, battle: classes.Battle, division: classes.BattleDivision, side: classes.BattleSide): +# if division.is_air: +# response = self._post_military_fight_air(battle.id, side.id, division.id) +# else: +# response = self._post_military_fight_ground(battle.id, side.id, division.id) +# +# if "Zone is not meant for " in response.text: +# self.sleep(5) +# return 0, 1, 0 +# try: +# r_json = response.json() +# except (ValueError, HTTPError, RequestException): +# return 0, 10, 0 +# hits = 0 +# damage = 0 +# err = False +# if r_json.get("error"): +# if r_json.get("message") == "SHOOT_LOCKOUT": +# pass +# elif r_json.get("message") == "NOT_ENOUGH_WEAPONS": +# self.set_default_weapon(battle, division) +# elif r_json.get("message") == "Cannot activate a zone with a non-native division": +# self.write_warning("Wrong division!!") +# return 0, 10, 0 +# elif r_json.get("message") == "ZONE_INACTIVE": +# self.write_warning("Wrong division!!") +# return 0, 10, 0 +# elif r_json.get("message") == "NON_BELLIGERENT": +# self.write_warning("Dictatorship/Liberation wars are not supported!") +# return 0, 10, 0 +# elif r_json.get("message") in ["FIGHT_DISABLED", "DEPLOYMENT_MODE"]: +# self._post_main_profile_update( +# "options", params='{"optionName":"enable_web_deploy","optionValue":"off"}' +# ) +# self.set_default_weapon(battle, division) +# else: +# if r_json.get("message") == "UNKNOWN_SIDE": +# self._rw_choose_side(battle, side) +# elif r_json.get("message") == "CHANGE_LOCATION": +# countries = [side.country] + side.deployed +# self.travel_to_battle(battle, countries) +# err = True +# elif r_json.get("message") == "ENEMY_KILLED": +# # Non-InfantryKit players +# if r_json["user"]["earnedXp"]: +# hits = r_json["user"]["earnedXp"] +# # InfantryKit player +# # The almost always safe way (breaks on levelup hit) +# elif self.energy.energy >= r_json["details"]["wellness"]: # Haven't reached levelup +# hits = (self.energy.energy - r_json["details"]["wellness"]) // 10 +# else: +# hits = r_json["hits"] +# if r_json["user"]["epicBattle"]: +# hits /= 1 + r_json["user"]["epicBattle"] +# +# self.energy.energy = r_json["details"]["wellness"] +# self.details.xp = int(r_json["details"]["points"]) +# damage = r_json["user"]["givenDamage"] * (1.1 if r_json["oldEnemy"]["isNatural"] else 1) +# else: +# err = True +# +# return hits, err, damage +# +# def deploy_bomb( +# self, battle: classes.Battle, division: classes.BattleDivision, bomb_id: int, inv_side: bool, count: int = 1 +# ) -> Optional[int]: +# """Deploy bombs in a battle for given side. +# +# :param battle: Battle +# :type battle: classes.Battle +# :param division: BattleDivision +# :type division: classes.BattleDivision +# :param bomb_id: int bomb id +# :type bomb_id: int +# :param inv_side: should deploy on invader side +# :type inv_side: bool +# :param count: how many bombs to deploy +# :type count: int +# :return: Deployed count +# :rtype: int +# """ +# +# if not isinstance(count, int) or count < 1: +# count = 1 +# has_traveled = False +# if battle.is_rw: +# has_traveled = self.travel_to_battle(battle, [battle.defender.country]) +# self._rw_choose_side(battle, battle.invader if inv_side else battle.defender) +# if inv_side: +# good_countries = [battle.invader.country] + battle.invader.deployed +# if self.details.current_country not in good_countries: +# has_traveled = self.travel_to_battle(battle, good_countries) +# else: +# involved = ( +# [battle.invader.country, battle.defender.country] + battle.invader.deployed + battle.defender.deployed +# ) +# if self.details.current_country not in involved: +# count = 0 +# side = battle.invader if inv_side else battle.defender +# errors = deployed_count = 0 +# while (not deployed_count == count) and errors < 10: +# r = self._post_military_deploy_bomb(battle.id, division.id, side.id, bomb_id).json() +# if not r.get("error"): +# deployed_count += 1 +# self.sleep(0.5) +# elif r.get("message") == "LOCKED": +# self.sleep(0.5) +# elif r.get("message") == "INVALID_BOMB": +# errors = 10 +# else: +# errors += 1 +# +# if has_traveled: +# self.travel_to_residence() +# +# self._report_action("MILITARY_BOMB", f"Deployed {deployed_count} bombs in battle {battle.id}") +# return deployed_count +# +# def change_division( +# self, battle: classes.Battle, division: classes.BattleDivision, side: classes.BattleSide = None +# ) -> bool: +# """Change division. +# +# :param battle: classes.Battle +# :type battle: classes.Battle +# :param division: int target division to switch to +# :type division: classes.BattleDivision +# :param side: Side to choose +# :type side: classes.BattleSide +# :return: +# """ +# resp = self._post_main_battlefield_change_division(battle.id, division.id, side.id if side else None) +# if resp.json().get("error"): +# self.write_warning(resp.json().get("message")) +# return False +# self._report_action( +# "MILITARY_DIV_SWITCH", f"Switched to d{division.div} in battle {battle.id}", kwargs=resp.json() +# ) +# return True +# +# def get_ground_hit_dmg_value( +# self, +# rang: int = None, +# strength: float = None, +# elite: bool = None, +# ne: bool = False, +# booster_50: bool = False, +# booster_100: bool = False, +# tp: bool = True, +# ) -> Decimal: +# if not rang or not strength or elite is None: +# r = self._get_main_citizen_profile_json(self.details.citizen_id).json() +# if not rang: +# rang = r["military"]["militaryData"]["ground"]["rankNumber"] +# if not strength: +# strength = r["military"]["militaryData"]["ground"]["strength"] +# if elite is None: +# elite = r["citizenAttributes"]["level"] > 100 +# if ne: +# tp = True +# +# return utils.calculate_hit(strength, rang, tp, elite, ne, 50 if booster_50 else 100 if booster_100 else 0) +# +# def get_air_hit_dmg_value( +# self, rang: int = None, elite: bool = None, ne: bool = False, weapon: bool = False +# ) -> Decimal: +# if not rang or elite is None: +# r = self._get_main_citizen_profile_json(self.details.citizen_id).json() +# if not rang: +# rang = r["military"]["militaryData"]["aircraft"]["rankNumber"] +# if elite is None: +# elite = r["citizenAttributes"]["level"] > 100 +# +# return utils.calculate_hit(0, rang, True, elite, ne, 0, 20 if weapon else 0) +# +# def activate_damage_booster(self, ground: bool = True) -> int: +# kind = "damage" if ground else "aircraftDamage" +# if self.config.boosters and not self.get_active_damage_booster(ground): +# booster: Optional[types.InvFinalItem] = None +# for quality, data in sorted(self.inventory.boosters.get(kind, {}).items(), key=lambda x: x[0]): +# for _duration, _booster in sorted(data.items(), key=lambda y: y[0]): +# critical_amount = 2 if quality < 10 and ground else 10 +# if _booster.get("amount") > critical_amount: +# booster = _booster +# break +# break +# if booster: +# kind = "damage" if ground else "air_damage" +# self._report_action("MILITARY_BOOSTER", f"Activated {booster['name']}") +# resp = self._post_economy_activate_booster(booster["quality"], booster["durability"], kind).json() +# self._update_inventory_data(resp) +# return self.get_active_damage_booster(ground) +# +# def get_active_damage_booster(self, ground: bool = True) -> int: +# kind = "damage" if ground else "aircraftDamage" +# boosters = self.inventory.active.get(kind, {}) +# quality = 0 +# for q, boost in boosters.items(): +# if boost["quality"] * 10 > quality: +# quality = boost["quality"] * 10 +# return quality +# +# def get_active_ground_damage_booster(self) -> int: +# return self.get_active_damage_booster(True) +# +# def get_active_air_damage_booster(self) -> int: +# return self.get_active_damage_booster(False) +# +# def activate_battle_effect(self, battle_id: int, kind: str) -> bool: +# self._report_action("MILITARY_BOOSTER", f"Activated {kind} booster") +# resp = self._post_main_activate_battle_effect(battle_id, kind, self.details.citizen_id).json() +# return not resp.get("error") +# +# def activate_pp_booster(self, pp_item: types.InvFinalItem) -> bool: +# self._report_action("MILITARY_BOOSTER", f'Activated {pp_item["name"]}') +# resp = self._post_economy_activate_booster( +# pp_item["quality"], pp_item["durability"], "prestige_points" +# ).json() +# self._update_inventory_data(resp) +# return pp_item.get("kind") in self.inventory.active +# +# def _rw_choose_side(self, battle: classes.Battle, side: classes.BattleSide) -> Response: +# return self._post_main_battlefield_travel(side.id, battle.id) +# +# def should_travel_to_fight(self) -> bool: +# ret = False +# if self.config.always_travel: +# ret = True +# elif self.should_do_levelup: # Do levelup +# ret = True +# # Get to next Energy +1 +# elif self.next_reachable_energy and self.config.next_energy: +# ret = True +# # 1h worth of energy +# elif self.energy.energy + self.energy.interval * 3 >= self.energy.limit: +# ret = True +# return ret +# +# def should_fight(self) -> Tuple[int, str, bool]: +# """Checks if citizen should fight at this moment +# :rtype: Tuple[int, str, bool] +# """ +# count = 0 +# force_fight = False +# msg = "Fighting not allowed!" +# if not self.config.fight: +# return count, msg, force_fight +# +# # Do levelup +# if self.is_levelup_reachable: +# msg = "Level up" +# if self.should_do_levelup: +# count = (self.energy.limit * 3) // 10 +# force_fight = True +# else: +# self.write_log("Waiting for fully recovered energy before leveling up.") +# +# # Levelup reachable +# elif self.is_levelup_close: +# count = self.details.xp_till_level_up - (self.energy.limit // 10) + 5 +# msg = "Fighting for close Levelup. Doing %i hits" % count +# force_fight = True +# +# elif self.details.pp < 75: +# count = 75 - self.details.pp +# msg = "Obligatory fighting for at least 75pp" +# force_fight = True +# +# elif self.config.continuous_fighting and self.has_battle_contribution: +# count = self.energy.food_fights +# msg = "Continuing to fight in previous battle" +# +# # All-in (type = all-in and full ff) +# elif self.config.all_in and self.energy.energy + self.energy.interval * 3 >= self.energy.limit: +# count = self.energy.food_fights +# msg = "Fighting all-in. Doing %i hits" % count +# +# # Get to next Energy +1 +# elif self.config.next_energy and self.next_reachable_energy: +# count = self.next_reachable_energy +# msg = "Fighting for +1 energy. Doing %i hits" % count +# +# # 1h worth of energy +# elif self.energy.energy + self.energy.interval * 3 >= self.energy.limit: +# count = self.energy.interval +# msg = "Fighting for 1h energy. Doing %i hits" % count +# force_fight = True +# +# return (count if count > 0 else 0), msg, force_fight +# +# def get_battle_round_data(self, division: classes.BattleDivision) -> Tuple[Any, Any]: +# battle = division.battle +# +# r = self._post_military_battle_console( +# battle.id, +# "battleStatistics", +# 1, +# zoneId=battle.zone_id, +# round_id=battle.zone_id, +# division=division.div, +# battleZoneId=division.id, +# type="damage", +# ) +# r_json = r.json() +# return r_json.get(str(battle.invader.id)).get("fighterData"), r_json.get(str(battle.defender.id)).get( +# "fighterData" +# ) +# +# def get_battle_division_stats(self, division: classes.BattleDivision) -> Dict[str, Any]: +# battle = division.battle +# r = self._get_military_battle_stats(battle.id, division.div, division.id) +# return r.json() +# +# def get_division_max_hit(self, division: classes.BattleDivision) -> int: +# """Returns max hit in division for current side (if not on either side returns 0) +# +# :param division: BattleDivision for which to get max hit value +# :type division: classes.BattleDivision +# :return: max hit value +# :rtype: int +# """ +# return self.get_battle_division_stats(division).get("maxHit", -1) +# +# def schedule_attack(self, war_id: int, region_id: int, region_name: str, at_time: datetime): +# if at_time: +# self.sleep(utils.get_sleep_seconds(at_time)) +# self.get_csrf_token() +# self.launch_attack(war_id, region_id, region_name) +# +# def get_active_wars(self, country: constants.Country = None) -> List[int]: +# r = self._get_country_military(country.link if country else self.details.citizenship.link) +# all_war_ids = re.findall(r'//www\.erepublik\.com/en/wars/show/(\d+)"', r.text) +# return [int(wid) for wid in all_war_ids] +# +# def get_last_battle_of_war_end_time(self, war_id: int) -> datetime: +# r = self._get_wars_show(war_id) +# html = r.text +# last_battle_id = int(re.search( +# r'', html +# ).group(1)) +# console = self._post_military_battle_console(last_battle_id, "warList", 1).json() +# battle = console.get("list")[0] +# return utils.localize_dt(datetime.strptime(battle.get("result").get("end"), "%Y-%m-%d %H:%M:%S")) +# +# def launch_attack(self, war_id: int, region_id: int, region_name: str): +# self._post_wars_attack_region(war_id, region_id, region_name) +# self._report_action("MILITARY_QUEUE_ATTACK", f"Battle for *{region_name}* queued") +# +# def get_country_mus(self, country: constants.Country) -> Dict[int, str]: +# ret = {} +# r = self._get_main_leaderboards_damage_rankings(country.id) +# for data in r.json()["mu_filter"]: +# if data["id"]: +# ret.update({data["id"]: data["name"]}) +# r = self._get_main_leaderboards_damage_aircraft_rankings(country.id) +# for data in r.json()["mu_filter"]: +# if data["id"]: +# ret.update({data["id"]: data["name"]}) +# return ret +# +# def get_mu_members(self, mu_id: int) -> Dict[int, str]: +# ret = {} +# r = self._get_military_unit_data(mu_id) +# +# for page in range(int(r.json()["panelContents"]["pages"])): +# r = self._get_military_unit_data(mu_id, currentPage=page + 1) +# for user in r.json()["panelContents"]["members"]: +# if not user["isDead"]: +# ret.update({user["citizenId"]: user["name"]}) +# return ret +# +# def get_citizen_weekly_daily_orders_done(self, citizen_id: int = None, weeks_ago: int = 0) -> int: +# if citizen_id is None: +# citizen_id = self.details.citizen_id +# profile = self._get_main_citizen_profile_json(citizen_id).json() +# mu_id = profile.get("military", {}).get("militaryUnit", {}).get("id", 0) +# if mu_id: +# name = profile.get("citizen", {}).get("name") +# member = self._get_military_unit_data( +# mu_id, +# currentPage=1, +# panel="members", +# sortBy="dailyOrdersCompleted", +# weekFilter=f"week{weeks_ago}", +# search=name, +# ).json() +# return member.get("panelContents", {}).get("members", [{}])[0].get("dailyOrdersCompleted") +# return 0 +# +# def get_possibly_empty_medals(self): +# self.update_war_info() +# for battle in self.all_battles.values(): +# for division in battle.div.values(): +# if division.wall["dom"] == 50 or division.wall["dom"] > 98: +# yield division, division.wall["for"] == battle.invader.country.id +# +# def report_fighting( +# self, battle: classes.Battle, invader: bool, division: classes.BattleDivision, damage: float, hits: int +# ): +# self.reporter.report_fighting(battle, invader, division, damage, hits) +# if self.config.telegram: +# self.telegram.report_fight(battle, invader, division, damage, hits) +# +# def get_deploy_inventory(self, division: classes.BattleDivision, side: classes.BattleSide): +# ret = self._post_fight_deploy_get_inventory(division.battle.id, side.id, division.id).json() +# # if ret.get('recoverableEnergyBuyFood'): +# # self.buy_food() +# # return self.get_deploy_inventory(division, side) +# if ret.get("captcha"): +# self.do_captcha_challenge() +# if ret.get("error"): +# if ret.get("message") == "Deployment disabled.": +# self._post_main_profile_update( +# "options", params='{"optionName":"enable_web_deploy","optionValue":"on"}' +# ) +# return self.get_deploy_inventory(division, side) +# else: +# self.report_error(f"Unable to get deployment inventory because: {ret.get('message')}") +# return ret +# +# def deploy(self, division: classes.BattleDivision, side: classes.BattleSide, energy: int, _retry=0) -> int: +# _energy = int(energy) +# deploy_inv = self.get_deploy_inventory(division, side) +# if not deploy_inv["minEnergy"] <= energy <= deploy_inv["maxEnergy"]: +# return 0 +# energy_sources = {} +# source_idx = 0 +# recoverable = deploy_inv["recoverableEnergy"] +# for source in reversed(sorted(deploy_inv["energySources"], key=lambda s: (s["type"], s.get("quality", 0)))): +# if source["type"] == "pool": +# _energy -= source["energy"] +# elif source["type"] in ["food", "energy_bar"]: +# recovers = source["energy"] // source["amount"] +# amount = (recoverable if source["type"] == "food" else _energy) // recovers +# amount = amount if amount < source["amount"] else source["amount"] +# if amount > 0: +# energy_sources.update({f"energySources[{source_idx}][quality]": source["quality"]}) +# energy_sources.update({f"energySources[{source_idx}][amount]": amount}) +# source_idx += 1 +# used_energy = amount * recovers +# recoverable -= used_energy +# _energy -= used_energy +# if _energy <= 0: +# break +# if _energy > 0: +# energy -= _energy +# weapon_q = -1 +# weapon_strength = 0 +# if not division.is_air: +# for weapon in sorted(deploy_inv["weapons"], key=lambda w: w["damageperHit"]): +# if (weapon["damageperHit"] or 0) > weapon_strength and (weapon["amount"] or 0) > 50: +# weapon_q = weapon["quality"] +# r = self._post_fight_deploy_start_deploy( +# division.battle.id, side.id, division.id, energy, weapon_q, **energy_sources +# ).json() +# if r.get("error"): +# self.report_error(f"Deploy failed: '{r.get('message')}'") +# if r.get("message") == "Deployment disabled.": +# self._post_main_profile_update( +# "options", params='{"optionName":"enable_web_deploy","optionValue":"on"}' +# ) +# if _retry < 5: +# return self.deploy(division, side, energy, _retry + 1) +# else: +# self.report_error("Unable to deploy 5 times!") +# return 0 +# return r.get("deploymentId") class CitizenPolitics(BaseCitizen): @@ -2470,7 +2619,9 @@ class CitizenPolitics(BaseCitizen): self._report_action("POLITIC_PARTY_PRESIDENT", "Applied for party president elections") return self._get_candidate_party(self.politics.party_slug) else: - self._report_action("POLITIC_CONGRESS", "Unable to apply for party president elections - not a party member") + self._report_action( + "POLITIC_CONGRESS", "Unable to apply for party president elections - not a party member" + ) return None def candidate_for_congress(self, presentation: str = "") -> Optional[Response]: @@ -2481,14 +2632,18 @@ class CitizenPolitics(BaseCitizen): self._report_action("POLITIC_CONGRESS", "Unable to apply for congress elections - not a party member") return None - def get_country_president_election_result(self, country: constants.Country, year: int, month: int) -> Dict[str, int]: + def get_country_president_election_result( + self, country: constants.Country, year: int, month: int + ) -> Dict[str, int]: timestamp = int(constants.erep_tz.localize(datetime(year, month, 5)).timestamp()) resp = self._get_presidential_elections(country.id, timestamp) candidates = re.findall(r'class="candidate_info">(.*?)', resp.text, re.S | re.M) ret = {} for candidate in candidates: name = re.search( - r'', candidate + r'', + candidate, ) name = name.group(1) votes = re.search(r'(\d+) votes', candidate).group(1) @@ -2499,12 +2654,16 @@ class CitizenPolitics(BaseCitizen): class CitizenSocial(BaseCitizen): def send_mail(self, subject: str, msg: str, ids: List[int]): for player_id in ids: - self._report_action("SOCIAL_MESSAGE", f"Sent a message to {player_id}", kwargs=dict(subject=subject, msg=msg, id=player_id)) + self._report_action( + "SOCIAL_MESSAGE", f"Sent a message to {player_id}", kwargs=dict(subject=subject, msg=msg, id=player_id) + ) self._post_main_messages_compose(subject, msg, [player_id]) def write_on_country_wall(self, message: str) -> bool: self._get_main() - post_to_wall_as = re.findall(r'id="post_to_country_as".*?.*', self.r.text, re.S | re.M) + post_to_wall_as = re.findall( + r'id="post_to_country_as".*?.*', self.r.text, re.S | re.M + ) r = self._post_main_country_post_create(message, max(post_to_wall_as, key=int) if post_to_wall_as else 0) self._report_action("SOCIAL_WRITE_WALL_COUNTRY", "Wrote a message to the country wall") @@ -2726,7 +2885,7 @@ class CitizenTasks(CitizenEconomy): class _Citizen( - CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard, CitizenMedia, CitizenPolitics, CitizenSocial, CitizenMilitary, CitizenTasks + CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard, CitizenMedia, CitizenPolitics, CitizenSocial, CitizenTasks ): def __init__(self, email: str = "", password: str = "", auto_login: bool = False): super().__init__(email, password) @@ -2780,7 +2939,8 @@ class _Citizen( self.buy_tg_contract() else: self.write_warning( - "Training ground contract active but " f"don't have enough gold ({self.details.gold}g {self.details.cc}cc)" + "Training ground contract active but " + f"don't have enough gold ({self.details.gold}g {self.details.cc}cc)" ) if self.energy.is_energy_full and self.config.telegram: self.telegram.report_full_energy(self.energy.energy, self.energy.limit, self.energy.interval) @@ -2799,13 +2959,13 @@ class _Citizen( if award_id and title and medal.get("details").get("isWallMaterial"): self._post_main_wall_post_automatic(title.lower(), award_id) - if "ccValue" in params: + if params.get("ccValue"): reward = params.get("ccValue") or 0 currency = "Currency" - elif "goldValue" in params: + elif params.get("goldValue"): reward = params.get("goldValue") or 0 currency = "Gold" - elif "energyValue" in params: + elif params.get("energyValue"): reward = params.get("energyValue") or 0 currency = "Energy" else: @@ -2825,7 +2985,10 @@ class _Citizen( data[(title, reward)]["count"] += count self._post_main_global_alerts_close(medal.get("id")) if data: - msgs = [f"{d['count']} x {d['kind']}, totaling {d['count'] * d['reward']} " f"{d['currency']}" for d in data.values()] + msgs = [ + f"{d['count']} x {d['kind']}, totaling {d['count'] * d['reward']} " f"{d['currency']}" + for d in data.values() + ] msgs = "\n".join(msgs) if self.config.telegram: @@ -2841,11 +3004,7 @@ class _Citizen( # Do full update max every 5 min if utils.good_timedelta(self._last_full_update, timedelta(minutes=5)) < self.now or force_update: self._last_full_update = self.now - self.update_citizen_info() - self.update_war_info() - self.update_inventory() - self.update_companies() - self.update_money() + super().update_all() self.update_weekly_challenge() self.check_for_notification_medals() self.send_state_update() @@ -2863,40 +3022,42 @@ class _Citizen( elif status == "completed": should_collect = True elif reward.get("icon", "") == "energy_booster": - pps = re.search(r"Reach (\d+) Prestige Points to unlock the following reward: \+1 Energy", reward.get("tooltip", "")) + pps = re.search( + r"Reach (\d+) Prestige Points to unlock the following reward: \+1 Energy", reward.get("tooltip", "") + ) if pps: self.details.next_pp.append(int(pps.group(1))) if should_collect: self._post_main_weekly_challenge_collect_all(max_collectable_id) - def should_fight(self, silent: bool = True) -> Tuple[int, str, bool]: - count, log_msg, force_fight = super().should_fight() - - if count > 0 and not force_fight: - if self.energy.food_fights - self.my_companies.ff_lockdown < count: - log_msg = ( - f"Fight count modified (old count: {count} | FF: {self.energy.food_fights} | " - f"WAM ff_lockdown: {self.my_companies.ff_lockdown} |" - f" New count: {count - self.my_companies.ff_lockdown})" - ) - count -= self.my_companies.ff_lockdown - if count <= 0: - count = 0 - log_msg = f"Not fighting because WAM needs {self.my_companies.ff_lockdown} food fights" - - if self.max_time_till_full_ff > self.time_till_week_change: - max_count = (int(self.time_till_week_change.total_seconds()) // 360 * self.energy.interval) // 10 - log_msg = "End for Weekly challenge is near " f"(Recoverable until WC end {max_count}hp | want to do {count}hits)" - count = count if max_count > count else max_count - - if not silent: - self.write_log(log_msg) - - return count, log_msg, force_fight - - def collect_weekly_reward(self): - utils.deprecation(f"Logic moved to {self.__class__.__name__}.update_weekly_challenge()!") - self.update_weekly_challenge() + # def should_fight(self, silent: bool = True) -> Tuple[int, str, bool]: + # if not hasattr(super, "should_fight"): + # return 0, "Unable to fight", False + # count, log_msg, force_fight = super().should_fight() + # + # if count > 0 and not force_fight: + # if self.energy.food_fights - self.my_companies.ff_lockdown < count: + # log_msg = ( + # f"Fight count modified (old count: {count} | FF: {self.energy.food_fights} | " + # f"WAM ff_lockdown: {self.my_companies.ff_lockdown} |" + # f" New count: {count - self.my_companies.ff_lockdown})" + # ) + # count -= self.my_companies.ff_lockdown + # if count <= 0: + # count = 0 + # log_msg = f"Not fighting because WAM needs {self.my_companies.ff_lockdown} food fights" + # + # if self.max_time_till_full_ff > self.time_till_week_change: + # max_count = (int(self.time_till_week_change.total_seconds()) // 360 * self.energy.interval) // 10 + # log_msg = ( + # f"End for Weekly challenge is near (Recoverable until WC end {max_count}hp | want to do {count}hits)" + # ) + # count = count if max_count > count else max_count + # + # if not silent: + # self.write_log(log_msg) + # + # return count, log_msg, force_fight def collect_daily_task(self): self.update_citizen_info() @@ -2961,7 +3122,9 @@ class _Citizen( else: price = lowest_price.price - 0.01 - self.post_market_offer(industry=constants.INDUSTRIES[kind], amount=int(amount), quality=int(quality), price=price) + self.post_market_offer( + industry=constants.INDUSTRIES[kind], amount=int(amount), quality=int(quality), price=price + ) def _wam(self, holding: classes.Holding) -> NoReturn: response = self.work_as_manager_in_holding(holding) @@ -2986,7 +3149,9 @@ class _Citizen( result = response.get("result", {}) amount_needed = round(result.get("consume", 0) - result.get("stock", 0) + 0.5) self._report_action( - "WORK_AS_MANAGER", f"Unable to wam! Missing {amount_needed} {raw_kind}, will try to buy.", kwargs=response + "WORK_AS_MANAGER", + f"Unable to wam! Missing {amount_needed} {raw_kind}, will try to buy.", + kwargs=response, ) start_place = (self.details.current_country, self.details.current_region) while amount_needed > 0: @@ -2996,13 +3161,17 @@ class _Citizen( if not best_offer.country == self.details.current_country: self.travel_to_country(best_offer.country) - self._report_action("ECONOMY_BUY", f"Attempting to buy {amount} {raw_kind} for {best_offer.price * amount}cc") + self._report_action( + "ECONOMY_BUY", f"Attempting to buy {amount} {raw_kind} for {best_offer.price * amount}cc" + ) rj = self.buy_from_market(amount=amount, offer=best_offer.offer_id) if not rj.get("error"): amount_needed -= amount else: self.write_warning(rj.get("message", "")) - self._report_action("ECONOMY_BUY", f"Unable to buy products! Reason: {rj.get('message')}", kwargs=rj) + self._report_action( + "ECONOMY_BUY", f"Unable to buy products! Reason: {rj.get('message')}", kwargs=rj + ) break else: if not start_place == (self.details.current_country, self.details.current_region): @@ -3070,25 +3239,6 @@ class _Citizen( self.update_companies() return bool(self.my_companies.get_total_wam_count()) - def sorted_battles(self, sort_by_time: bool = True, only_tp=False) -> List[classes.Battle]: - battles: List[classes.Battle] = self.reporter.fetch_battle_priorities(self.details.current_country) - return battles + super().sorted_battles(sort_by_time, only_tp) - - def command_central(self): - while not self.stop_threads.is_set(): - try: - tasks = self.reporter.fetch_tasks() - for task, args in tasks: - try: - fn = getattr(self, task) - if callable(fn): - fn(*args) - except AttributeError: - continue - self.stop_threads.wait(90) - except: # noqa - self.report_error("Command central is broken") - class Citizen(_Citizen): _concurrency_lock: Event @@ -3125,17 +3275,6 @@ class Citizen(_Citizen): finally: self._update_lock.set() - def update_war_info(self): - if not self._update_lock.wait(self._update_timeout): - e = f"Update concurrency not freed in {self._update_timeout}sec!" - self.report_error(e) - return None - try: - self._update_lock.clear() - super().update_war_info() - finally: - self._update_lock.set() - def update_job_info(self): if not self._update_lock.wait(self._update_timeout): e = f"Update concurrency not freed in {self._update_timeout}sec!" @@ -3180,37 +3319,6 @@ class Citizen(_Citizen): finally: self._concurrency_lock.set() - def fight( - self, - battle: classes.Battle, - division: classes.BattleDivision, - side: classes.BattleSide = None, - count: int = None, - use_ebs: bool = False, - ) -> Optional[int]: - if not self._concurrency_lock.wait(self._concurrency_timeout): - e = f"Concurrency not freed in {self._concurrency_timeout}sec!" - self.report_error(e) - return None - try: - self._concurrency_lock.clear() - return super().fight(battle, division, side, count, use_ebs) - finally: - self._concurrency_lock.set() - - def deploy_bomb( - self, battle: classes.Battle, division: classes.BattleDivision, bomb_id: int, inv_side: bool, count: int = 1 - ) -> Optional[int]: - if not self._concurrency_lock.wait(self._concurrency_timeout): - e = f"Concurrency not freed in {self._concurrency_timeout}sec!" - self.report_error(e) - return None - try: - self._concurrency_lock.clear() - return super().deploy_bomb(battle, division, bomb_id, inv_side, count) - finally: - self._concurrency_lock.set() - def buy_market_offer(self, offer: classes.OfferItem, amount: int = None) -> Optional[Dict[str, Any]]: if not self._concurrency_lock.wait(self._concurrency_timeout): e = f"Concurrency not freed in {self._concurrency_timeout}sec!" diff --git a/erepublik/classes.py b/erepublik/classes.py index 5608a6f..4f1e86b 100644 --- a/erepublik/classes.py +++ b/erepublik/classes.py @@ -129,7 +129,13 @@ class Holding: @property def as_dict(self) -> Dict[str, Union[str, int, List[Dict[str, Union[str, int, bool, float, Decimal]]]]]: - return dict(name=self.name, id=self.id, region=self.region, companies=[c.as_dict for c in self.companies], wam_count=self.wam_count) + return dict( + name=self.name, + id=self.id, + region=self.region, + companies=[c.as_dict for c in self.companies], + wam_count=self.wam_count, + ) @property def citizen(self): @@ -303,7 +309,13 @@ class MyCompanies: """ for holding in holdings.values(): if holding.get("id") not in self.holdings: - self.holdings.update({int(holding.get("id")): Holding(holding["id"], holding["region_id"], self.citizen, holding["name"])}) + self.holdings.update( + { + int(holding.get("id")): Holding( + holding["id"], holding["region_id"], self.citizen, holding["name"] + ) + } + ) if not self.holdings.get(0): self.holdings.update({0: Holding(0, 0, self.citizen, "Unassigned")}) # unassigned @@ -373,7 +385,12 @@ class MyCompanies: self, ) -> Dict[ str, - Union[str, int, datetime.datetime, Dict[str, Dict[str, Union[str, int, List[Dict[str, Union[str, int, bool, float, Decimal]]]]]]], + Union[ + str, + int, + datetime.datetime, + Dict[str, Dict[str, Union[str, int, List[Dict[str, Union[str, int, bool, float, Decimal]]]]]], + ], ]: return dict( name=str(self), @@ -530,12 +547,16 @@ class Energy: @property def is_recoverable_full(self): - warnings.warn("Deprecated since auto auto-eat! Will be removed soon. Use Energy.is_energy_full", DeprecationWarning) + warnings.warn( + "Deprecated since auto auto-eat! Will be removed soon. Use Energy.is_energy_full", DeprecationWarning + ) return self.is_energy_full @property def is_recovered_full(self): - warnings.warn("Deprecated since auto auto-eat! Will be removed soon. Use Energy.is_energy_full", DeprecationWarning) + warnings.warn( + "Deprecated since auto auto-eat! Will be removed soon. Use Energy.is_energy_full", DeprecationWarning + ) return self.is_energy_full @property @@ -691,7 +712,12 @@ class Reporter: @property def as_dict(self) -> Dict[str, Union[bool, int, str, List[Dict[Any, Any]]]]: return dict( - name=self.name, email=self.email, citizen_id=self.citizen_id, key=self.key, allowed=self.allowed, queue=self.__to_update + name=self.name, + email=self.email, + citizen_id=self.citizen_id, + key=self.key, + allowed=self.allowed, + queue=self.__to_update, ) def __init__(self, citizen): @@ -751,7 +777,10 @@ class Reporter: r = self.__bot_update(dict(key=self.key, check=True, player_id=self.citizen_id)) if r: if not r.json().get("status"): - self._req.post(f"{self.url}/bot/register", json=dict(name=self.name, email=self.email, player_id=self.citizen_id)) + self._req.post( + f"{self.url}/bot/register", + json=dict(name=self.name, email=self.email, player_id=self.citizen_id), + ) self.__registered = True self.allowed = True self.report_action("STARTED", value=utils.now().strftime("%F %T")) @@ -790,7 +819,9 @@ class Reporter: self._bot_update(data) def report_action(self, action: str, json_val: Dict[Any, Any] = None, value: str = None): - json_data = dict(player_id=getattr(self, "citizen_id", None), log={"action": action}, key=getattr(self, "key", None)) + json_data = dict( + player_id=getattr(self, "citizen_id", None), log={"action": action}, key=getattr(self, "key", None) + ) if json_val: json_data["log"].update(dict(json=json_val)) if value: @@ -836,14 +867,18 @@ class Reporter: try: battle_response = self._req.get(f"{self.url}/api/v1/battles/{country.id}") return [ - self.citizen.all_battles[bid] for bid in battle_response.json().get("battle_ids", []) if bid in self.citizen.all_battles + self.citizen.all_battles[bid] + for bid in battle_response.json().get("battle_ids", []) + if bid in self.citizen.all_battles ] except: # noqa return [] def fetch_tasks(self) -> List[Dict[str, Any]]: try: - task_response = self._req.post(f"{self.url}/api/v1/command", data=dict(citizen=self.citizen_id, key=self.key)).json() + task_response = self._req.post( + f"{self.url}/api/v1/command", data=dict(citizen=self.citizen_id, key=self.key) + ).json() if task_response.get("status"): return task_response.get("data") else: @@ -894,7 +929,13 @@ class BattleSide: @property def as_dict(self) -> Dict[str, Union[int, constants.Country, bool, List[constants.Country]]]: - return dict(points=self.points, country=self.country, is_defender=self.is_defender, allies=self.allies, deployed=self.deployed) + return dict( + points=self.points, + country=self.country, + is_defender=self.is_defender, + allies=self.allies, + deployed=self.deployed, + ) @property def battle(self): @@ -917,7 +958,12 @@ class BattleDivision: @property def as_dict(self): return dict( - id=self.id, division=self.div, terrain=(self.terrain, self.terrain_display), wall=self.wall, epic=self.epic, end=self.div_end + id=self.id, + division=self.div, + terrain=(self.terrain, self.terrain_display), + wall=self.wall, + epic=self.epic, + end=self.div_end, ) @property @@ -1210,7 +1256,8 @@ class TelegramReporter: def report_item_donation(self, citizen_id: int, amount: float, product: str): self.send_message( - f"*Donation*: {amount} x {product} to citizen " f"[{citizen_id}](https://www.erepublik.com/en/citizen/profile/{citizen_id})" + f"*Donation*: {amount} x {product} to citizen " + f"[{citizen_id}](https://www.erepublik.com/en/citizen/profile/{citizen_id})" ) def report_money_donation(self, citizen_id: int, amount: float, is_currency: bool = True): @@ -1228,7 +1275,9 @@ class TelegramReporter: message = "\n\n".join(self.__queue) if self.player_name: message = f"Player *{self.player_name}*\n\n" + message - response = post(f"{self.api_url}/sendMessage", json=dict(chat_id=self.chat_id, text=message, parse_mode="Markdown")) + response = post( + f"{self.api_url}/sendMessage", json=dict(chat_id=self.chat_id, text=message, parse_mode="Markdown") + ) self._last_time = utils.now() if response.json().get("ok"): self.__queue.clear() @@ -1275,5 +1324,11 @@ class Inventory: @property def as_dict(self) -> Dict[str, Union[types.InvFinal, types.InvRaw, int]]: return dict( - active=self.active, final=self.final, boosters=self.boosters, raw=self.raw, offers=self.offers, total=self.total, used=self.used + active=self.active, + final=self.final, + boosters=self.boosters, + raw=self.raw, + offers=self.offers, + total=self.total, + used=self.used, ) diff --git a/erepublik/utils.py b/erepublik/utils.py index 70922e2..43dbd94 100644 --- a/erepublik/utils.py +++ b/erepublik/utils.py @@ -182,7 +182,14 @@ def slugify(value, allow_unicode=False) -> str: def calculate_hit( - strength: float, rang: int, tp: bool, elite: bool, ne: bool, booster: int = 0, weapon: int = 200, is_deploy: bool = False + strength: float, + rang: int, + tp: bool, + elite: bool, + ne: bool, + booster: int = 0, + weapon: int = 200, + is_deploy: bool = False, ) -> Decimal: dec = 3 if is_deploy else 0 base_str = 1 + Decimal(str(round(strength, 3))) / 400 @@ -217,7 +224,12 @@ def get_air_hit_dmg_value( def get_final_hit_dmg( - base_dmg: Union[Decimal, float, str], rang: int, tp: bool = False, elite: bool = False, ne: bool = False, booster: int = 0 + base_dmg: Union[Decimal, float, str], + rang: int, + tp: bool = False, + elite: bool = False, + ne: bool = False, + booster: int = 0, ) -> Decimal: dmg = Decimal(str(base_dmg)) @@ -313,7 +325,11 @@ class ErepublikJSONEncoder(json.JSONEncoder): return dict(__type__="date", date=o.strftime("%Y-%m-%d")) elif isinstance(o, datetime.timedelta): return dict( - __type__="timedelta", days=o.days, seconds=o.seconds, microseconds=o.microseconds, total_seconds=o.total_seconds() + __type__="timedelta", + days=o.days, + seconds=o.seconds, + microseconds=o.microseconds, + total_seconds=o.total_seconds(), ) elif isinstance(o, Response): return dict(headers=dict(o.__dict__["headers"]), url=o.url, text=o.text, status_code=o.status_code) diff --git a/examples/eat_work_train.py b/examples/eat_work_train.py index 0816b77..2c2c7d6 100644 --- a/examples/eat_work_train.py +++ b/examples/eat_work_train.py @@ -60,7 +60,9 @@ def main(): player.write_log("Doing task: Work as manager") success = player.work_as_manager() if success: - next_time = utils.good_timedelta(now.replace(hour=14, minute=0, second=0, microsecond=0), timedelta(days=1)) + next_time = utils.good_timedelta( + now.replace(hour=14, minute=0, second=0, microsecond=0), timedelta(days=1) + ) else: next_time = utils.good_timedelta(now.replace(second=0, microsecond=0), timedelta(minutes=30)) @@ -86,7 +88,9 @@ def main(): if sleep_seconds <= 0: player.write_log(f"Loop detected! Offending task: '{next_tasks[0]}'") player.write_log("My next Tasks and there time:\n" + "\n".join(sorted(next_tasks))) - player.write_log(f"Sleeping until (eRep): {closest_next_time.strftime('%F %T')}" f" (sleeping for {sleep_seconds}s)") + player.write_log( + f"Sleeping until (eRep): {closest_next_time.strftime('%F %T')}" f" (sleeping for {sleep_seconds}s)" + ) seconds_to_sleep = sleep_seconds if sleep_seconds > 0 else 0 player.sleep(seconds_to_sleep) except Exception as e: diff --git a/pyproject.toml b/pyproject.toml index 7e28eca..b2c5a8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,11 @@ [tool.black] -line-length = 140 +line-length = 120 target-version = ['py38', 'py39'] +extend-exclude = 'venv' +workers = 4 + +[tool.isort] +profile = "black" +multi_line_output = 3 +line_length = 120 diff --git a/setup.py b/setup.py index d75c9d6..7af355e 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,9 @@ setup_requirements = [] with open("requirements-tests.txt") as test_req_file: test_requirements = test_req_file.read() - test_requirements = [line.strip() for line in test_requirements.split() if line.strip()[:2].strip() not in ("#", "-r")] + test_requirements = [ + line.strip() for line in test_requirements.split() if line.strip()[:2].strip() not in ("#", "-r") + ] setup( author="Eriks Karls",