Fixups and type checks
This commit is contained in:
parent
f10eeec498
commit
b723660f23
@ -115,12 +115,13 @@ class SlowRequests(Session):
|
||||
|
||||
class CitizenBaseAPI:
|
||||
url: str = "https://www.erepublik.com/en"
|
||||
_req: SlowRequests = None
|
||||
token: str = ""
|
||||
_req: SlowRequests
|
||||
token: str
|
||||
|
||||
def __init__(self):
|
||||
""" Class for unifying eRepublik known endpoints and their required/optional parameters """
|
||||
self._req = SlowRequests()
|
||||
self.token = ""
|
||||
|
||||
def post(self, url: str, data=None, json=None, **kwargs) -> Response:
|
||||
return self._req.post(url, data, json, **kwargs)
|
||||
|
@ -833,15 +833,13 @@ class CitizenCompanies(BaseCitizen):
|
||||
def work_as_manager_in_holding(self, holding: classes.Holding) -> Optional[Dict[str, Any]]:
|
||||
return self._work_as_manager(holding)
|
||||
|
||||
def _work_as_manager(self, wam_holding: classes.Holding = None) -> Optional[Dict[str, Any]]:
|
||||
def _work_as_manager(self, wam_holding: classes.Holding) -> Optional[Dict[str, Any]]:
|
||||
if self.restricted_ip:
|
||||
return None
|
||||
self.update_companies()
|
||||
self.update_inventory()
|
||||
data = {"action_type": "production"}
|
||||
extra = {}
|
||||
wam_list = []
|
||||
if wam_holding:
|
||||
raw_factories = wam_holding.get_wam_companies(raw_factory=True)
|
||||
fin_factories = wam_holding.get_wam_companies(raw_factory=False)
|
||||
|
||||
@ -1016,7 +1014,7 @@ class CitizenEconomy(CitizenTravel):
|
||||
self.write_log(f"Trying to sell unsupported industry {industry}")
|
||||
|
||||
data = {
|
||||
"country_id": self.details.citizenship,
|
||||
"country_id": self.details.citizenship.id,
|
||||
"industry": industry,
|
||||
"quality": quality,
|
||||
"amount": amount,
|
||||
@ -1071,7 +1069,7 @@ class CitizenEconomy(CitizenTravel):
|
||||
start_dt = self.now
|
||||
iterable = [countries, [quality] if quality else range(1, max_quality + 1)]
|
||||
for country, q in product(*iterable):
|
||||
r = self._post_economy_marketplace(country, constants.INDUSTRIES[product_name], q).json()
|
||||
r = self._post_economy_marketplace(country.id, constants.INDUSTRIES[product_name], q).json()
|
||||
obj = offers[f"q{q}"]
|
||||
if not r.get("error", False):
|
||||
for offer in r["offers"]:
|
||||
@ -1525,7 +1523,7 @@ class CitizenMilitary(CitizenTravel):
|
||||
if not division.terrain and is_start_ok and not division.div_end:
|
||||
if division.is_air and self.config.air:
|
||||
division_medals = self.get_battle_round_data(division)
|
||||
medal = division_medals[self.details.citizenship == division.battle.defender.country]
|
||||
medal = division_medals[self.details.citizenship == division.battle.is_defender.country]
|
||||
if not medal:
|
||||
air_divs.append((0, division))
|
||||
else:
|
||||
@ -1534,7 +1532,7 @@ class CitizenMilitary(CitizenTravel):
|
||||
if not division.div == self.division and not self.maverick:
|
||||
continue
|
||||
division_medals = self.get_battle_round_data(division)
|
||||
medal = division_medals[self.details.citizenship == division.battle.defender.country]
|
||||
medal = division_medals[self.details.citizenship == division.battle.is_defender.country]
|
||||
if not medal:
|
||||
ground_divs.append((0, division))
|
||||
else:
|
||||
@ -1656,6 +1654,9 @@ class CitizenMilitary(CitizenTravel):
|
||||
self.write_log("Hits: {:>4} | Damage: {}".format(total_hits, total_damage))
|
||||
ok_to_fight = False
|
||||
if total_damage:
|
||||
self.reporter.report_action('FIGHT', dict(battle_id=battle.id, side=side, dmg=total_damage,
|
||||
air=battle.has_air, hits=total_hits,
|
||||
round=battle.zone_id))
|
||||
self.reporter.report_action("FIGHT", dict(battle=str(battle), side=str(side), dmg=total_damage,
|
||||
air=battle.has_air, hits=total_hits))
|
||||
return error_count
|
||||
@ -1905,7 +1906,7 @@ class CitizenMilitary(CitizenTravel):
|
||||
battleZoneId=division.id, type="damage")
|
||||
r_json = r.json()
|
||||
return (r_json.get(str(battle.invader.id)).get("fighterData"),
|
||||
r_json.get(str(battle.defender.id)).get("fighterData"))
|
||||
r_json.get(str(battle.is_defender.id)).get("fighterData"))
|
||||
|
||||
def schedule_attack(self, war_id: int, region_id: int, region_name: str, at_time: datetime):
|
||||
if at_time:
|
||||
@ -2344,11 +2345,11 @@ class Citizen(CitizenAnniversary, CitizenCompanies, CitizenEconomy, CitizenLeade
|
||||
self.reporter.report_action("NEW_MEDAL", info)
|
||||
|
||||
def set_debug(self, debug: bool):
|
||||
self.debug = debug
|
||||
self._req.debug = debug
|
||||
self.debug = bool(debug)
|
||||
self._req.debug = bool(debug)
|
||||
|
||||
def set_pin(self, pin: int):
|
||||
self.details.pin = pin
|
||||
self.details.pin = int(pin)
|
||||
|
||||
def update_all(self, force_update=False):
|
||||
# Do full update max every 5 min
|
||||
@ -2521,7 +2522,8 @@ class Citizen(CitizenAnniversary, CitizenCompanies, CitizenEconomy, CitizenLeade
|
||||
else:
|
||||
if not start_place == (self.details.current_country, self.details.current_region):
|
||||
self.travel_to_holding(holding)
|
||||
return self._wam(holding)
|
||||
self._wam(holding)
|
||||
return
|
||||
elif response.get("message") == "not_enough_health_food":
|
||||
self.buy_food()
|
||||
self._wam(holding)
|
||||
@ -2537,7 +2539,7 @@ class Citizen(CitizenAnniversary, CitizenCompanies, CitizenEconomy, CitizenLeade
|
||||
:rtype: bool
|
||||
"""
|
||||
if self.restricted_ip:
|
||||
self._report_action("IP_BLACKLISTED", "Fighting is not allowed from restricted IP!")
|
||||
self._report_action("IP_BLACKLISTED", "Work as manager is not allowed from restricted IP!")
|
||||
return False
|
||||
self.update_citizen_info()
|
||||
self.update_companies()
|
||||
@ -2554,6 +2556,10 @@ class Citizen(CitizenAnniversary, CitizenCompanies, CitizenEconomy, CitizenLeade
|
||||
self.update_companies()
|
||||
|
||||
for holding in regions.values():
|
||||
raw_usage = holding.get_wam_raw_usage()
|
||||
if (raw_usage['frm'] + raw_usage['wrm']) * 100 + self.inventory['used'] > self.inventory['total']:
|
||||
self._report_action('WAM_UNAVAILABLE', 'Not enough storage!')
|
||||
continue
|
||||
self.travel_to_holding(holding)
|
||||
self._wam(holding)
|
||||
self.update_companies()
|
||||
|
@ -294,7 +294,7 @@ class MyCompanies:
|
||||
|
||||
def __clear_data(self):
|
||||
for holding in self.holdings.values():
|
||||
for company in holding.companies:
|
||||
for company in holding.companies: # noqa
|
||||
del company
|
||||
holding.companies.clear()
|
||||
self.companies.clear()
|
||||
@ -549,7 +549,10 @@ class Reporter:
|
||||
self._citizen = weakref.ref(citizen)
|
||||
self._req = Session()
|
||||
self.url = "https://api.erep.lv"
|
||||
self._req.headers.update({"user-agent": "Bot reporter v2"})
|
||||
self._req.headers.update({"user-agent": "eRepublik Script Reporter v3",
|
||||
'erep-version': utils.__version__,
|
||||
'erep-user-id': self.citizen_id,
|
||||
'erep-user-name': self.citizen.name})
|
||||
self.__to_update = []
|
||||
self.__registered: bool = False
|
||||
|
||||
@ -626,13 +629,13 @@ class Reporter:
|
||||
except: # noqa
|
||||
return []
|
||||
|
||||
def fetch_tasks(self) -> Optional[Tuple[str, Tuple[Any]]]:
|
||||
def fetch_tasks(self) -> List[str, Tuple[Any]]:
|
||||
try:
|
||||
task_response = self._req.get(f'{self.url}/api/v1/command',
|
||||
params=dict(citizen=self.citizen_id, key=self.key))
|
||||
return task_response.json().get('task_collection')
|
||||
except: # noqa
|
||||
return
|
||||
return []
|
||||
|
||||
|
||||
class MyJSONEncoder(utils.json.JSONEncoder):
|
||||
@ -669,7 +672,7 @@ class BattleSide:
|
||||
battle: "Battle"
|
||||
_battle: weakref.ReferenceType
|
||||
country: constants.Country
|
||||
defender: bool
|
||||
is_defender: bool
|
||||
|
||||
def __init__(self, battle: "Battle", country: constants.Country, points: int, allies: List[constants.Country],
|
||||
deployed: List[constants.Country], defender: bool):
|
||||
@ -678,18 +681,18 @@ class BattleSide:
|
||||
self.points = points
|
||||
self.allies = allies
|
||||
self.deployed = deployed
|
||||
self.defender = defender
|
||||
self.is_defender = defender
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self.country.id
|
||||
|
||||
def __repr__(self):
|
||||
side_text = "Defender" if self.defender else "Invader "
|
||||
side_text = "Defender" if self.is_defender else "Invader "
|
||||
return f"<BattleSide: {side_text} {self.country.name}|{self.points:02d}p>"
|
||||
|
||||
def __str__(self):
|
||||
side_text = "Defender" if self.defender else "Invader "
|
||||
side_text = "Defender" if self.is_defender else "Invader "
|
||||
return f"{side_text} {self.country.name} - {self.points:02d} points"
|
||||
|
||||
def __format__(self, format_spec):
|
||||
@ -697,8 +700,8 @@ class BattleSide:
|
||||
|
||||
@property
|
||||
def as_dict(self):
|
||||
return dict(points=self.points, country=self.country, defender=self.defender, allies=self.allies,
|
||||
deployed=self.deployed, battle=repr(self.battle))
|
||||
return dict(points=self.points, country=self.country, is_defender=self.is_defender, allies=self.allies,
|
||||
deployed=self.deployed)
|
||||
|
||||
@property
|
||||
def battle(self):
|
||||
@ -721,7 +724,7 @@ class BattleDivision:
|
||||
@property
|
||||
def as_dict(self):
|
||||
return dict(id=self.id, division=self.div, terrain=(self.terrain, self.terrain_display), wall=self.wall,
|
||||
epic=self.epic, battle=str(self.battle), end=self.div_end)
|
||||
epic=self.epic, end=self.div_end)
|
||||
|
||||
@property
|
||||
def is_air(self):
|
||||
@ -788,7 +791,7 @@ class Battle:
|
||||
def as_dict(self):
|
||||
return dict(id=self.id, war_id=self.war_id, divisions=self.div, zone=self.zone_id, rw=self.is_rw,
|
||||
dict_lib=self.is_dict_lib, start=self.start, sides={'inv': self.invader, 'def': self.defender},
|
||||
region=[self.region_id, self.region_name])
|
||||
region=[self.region_id, self.region_name], link=self.link)
|
||||
|
||||
@property
|
||||
def has_air(self) -> bool:
|
||||
@ -797,6 +800,10 @@ class Battle:
|
||||
return True
|
||||
return not bool(self.zone_id % 4)
|
||||
|
||||
@property
|
||||
def has_started(self) -> bool:
|
||||
return self.start <= utils.now()
|
||||
|
||||
@property
|
||||
def has_ground(self) -> bool:
|
||||
for div in self.div.values():
|
||||
@ -920,7 +927,7 @@ class TelegramBot:
|
||||
def as_dict(self):
|
||||
return {'chat_id': self.chat_id, 'api_url': self.api_url, 'player': self.player_name,
|
||||
'last_time': self._last_time, 'next_time': self._next_time, 'queue': self.__queue,
|
||||
'initialized': self.__initialized, 'has_threads': bool(len(self._threads))}
|
||||
'initialized': self.__initialized, 'has_threads': not self._threads}
|
||||
|
||||
def do_init(self, chat_id: int, token: str, player_name: str = ""):
|
||||
self.chat_id = chat_id
|
||||
|
@ -1,4 +1,3 @@
|
||||
import copy
|
||||
import datetime
|
||||
import inspect
|
||||
import os
|
||||
@ -10,7 +9,7 @@ import traceback
|
||||
import unicodedata
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Mapping, Optional, Union, Dict
|
||||
from typing import Any, List, Optional, Union, Dict
|
||||
|
||||
import requests
|
||||
|
||||
@ -43,12 +42,11 @@ def localize_timestamp(timestamp: int) -> datetime.datetime:
|
||||
|
||||
|
||||
def localize_dt(dt: Union[datetime.date, datetime.datetime]) -> datetime.datetime:
|
||||
try:
|
||||
try:
|
||||
if isinstance(dt, datetime.datetime):
|
||||
return constants.erep_tz.localize(dt)
|
||||
except AttributeError:
|
||||
elif isinstance(dt, datetime.date):
|
||||
return constants.erep_tz.localize(datetime.datetime.combine(dt, datetime.time(0, 0, 0)))
|
||||
except ValueError:
|
||||
else:
|
||||
return dt.astimezone(constants.erep_tz)
|
||||
|
||||
|
||||
@ -117,10 +115,12 @@ def _write_log(msg, timestamp: bool = True, should_print: bool = False):
|
||||
|
||||
|
||||
def write_interactive_log(*args, **kwargs):
|
||||
kwargs.pop("should_print", None)
|
||||
_write_log(should_print=True, *args, **kwargs)
|
||||
|
||||
|
||||
def write_silent_log(*args, **kwargs):
|
||||
kwargs.pop("should_print", None)
|
||||
_write_log(should_print=False, *args, **kwargs)
|
||||
|
||||
|
||||
@ -278,16 +278,17 @@ def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commi
|
||||
elif interactive is not None:
|
||||
write_silent_log(log_info)
|
||||
trace = inspect.trace()
|
||||
local_vars = None
|
||||
if trace:
|
||||
trace = trace[-1][0].f_locals
|
||||
if trace.get('__name__') == '__main__':
|
||||
trace = {'commit_id': trace.get('COMMIT_ID'),
|
||||
'interactive': trace.get('INTERACTIVE'),
|
||||
'version': trace.get('__version__'),
|
||||
'config': trace.get('CONFIG')}
|
||||
trace_local_vars = trace[-1][0].f_locals
|
||||
if trace_local_vars.get('__name__') == '__main__':
|
||||
local_vars = {'commit_id': trace_local_vars.get('COMMIT_ID'),
|
||||
'interactive': trace_local_vars.get('INTERACTIVE'),
|
||||
'version': trace_local_vars.get('__version__'),
|
||||
'config': trace_local_vars.get('CONFIG')}
|
||||
else:
|
||||
trace = dict()
|
||||
send_email(name, content, citizen, local_vars=trace)
|
||||
local_vars = dict()
|
||||
send_email(name, content, citizen, local_vars=local_vars)
|
||||
|
||||
|
||||
def process_warning(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None):
|
||||
@ -307,10 +308,10 @@ def process_warning(log_info: str, name: str, exc_info: tuple, citizen=None, com
|
||||
|
||||
trace = inspect.trace()
|
||||
if trace:
|
||||
trace = trace[-1][0].f_locals
|
||||
local_vars = trace[-1][0].f_locals
|
||||
else:
|
||||
trace = dict()
|
||||
send_email(name, content, citizen, local_vars=trace)
|
||||
local_vars = dict()
|
||||
send_email(name, content, citizen, local_vars=local_vars)
|
||||
|
||||
|
||||
def slugify(value, allow_unicode=False) -> str:
|
||||
@ -371,8 +372,6 @@ def get_air_hit_dmg_value(citizen_id: int, natural_enemy: bool = False, true_pat
|
||||
|
||||
|
||||
def _clear_up_battle_memory(battle):
|
||||
from . import classes
|
||||
battle: classes.Battle
|
||||
del battle.invader._battle, battle.defender._battle
|
||||
for div_id, division in battle.div.items():
|
||||
del division._battle
|
||||
|
Loading…
x
Reference in New Issue
Block a user