Compare commits

...

13 Commits

Author SHA1 Message Date
c51337d249 Bump version: 0.24.0.3 → 0.24.0.4 2021-02-03 01:07:21 +02:00
f4896e0b79 Merge branch 'bugfix-v0.23.4'
# Conflicts:
#	erepublik/__init__.py
#	setup.cfg
#	setup.py
2021-02-03 00:50:03 +02:00
13f5c673ad Migrate Citizen.write_log to self.logger.warning where applicable 2021-02-03 00:48:50 +02:00
e95ffbd505 cleanup 2021-02-03 00:48:13 +02:00
5e638806b5 Switch from manual log creation to Python logging 2021-02-03 00:47:52 +02:00
7f3bd9b864 Bump version: 0.24.0.2 → 0.24.0.3 2021-01-28 01:46:11 +02:00
9154d2e493 Captcha solving 2021-01-28 01:46:03 +02:00
8e8b882ace Bump version: 0.24.0.1 → 0.24.0.2 2021-01-28 00:37:12 +02:00
d7b020c7ea bugfix 2021-01-28 00:37:10 +02:00
cb567bf5c0 Bump version: 0.24.0 → 0.24.0.1 2021-01-28 00:31:53 +02:00
bb6d1be1b5 bugfix 2021-01-28 00:31:46 +02:00
c546c8f5eb Bump version: 0.23.4.14 → 0.24.0 2021-01-27 23:43:48 +02:00
9acc2d2e65 Lets deploy! Preperation for 8th of February 2021-01-27 23:43:29 +02:00
9 changed files with 514 additions and 363 deletions

View File

@ -4,7 +4,7 @@
__author__ = """Eriks Karls"""
__email__ = 'eriks@72.lv'
__version__ = '0.23.4.16'
__version__ = '0.24.0.4'
from erepublik import classes, constants, utils
from erepublik.citizen import Citizen

View File

@ -84,10 +84,8 @@ class SlowRequests(Session):
body = f"[{utils.now().strftime('%F %T')}]\tURL: '{url}'\tMETHOD: {method}\tARGS: {args}\n"
with open(self.request_log_name, 'ab') as file:
file.write(body.encode("UTF-8"))
pass
def _log_response(self, url, resp, redirect: bool = False):
from erepublik import Citizen
if self.debug:
if resp.history and not redirect:
for hist_resp in resp.history:
@ -96,7 +94,7 @@ class SlowRequests(Session):
fd_path = 'debug/requests'
fd_time = self.last_time.strftime('%Y/%m/%d/%H-%M-%S')
fd_name = utils.slugify(url[len(Citizen.url):])
fd_name = utils.slugify(url[len(CitizenBaseAPI.url):])
fd_extra = '_REDIRECT' if redirect else ""
try:
@ -107,7 +105,6 @@ class SlowRequests(Session):
filename = f'{fd_path}/{fd_time}_{fd_name}{fd_extra}.{fd_ext}'
utils.write_file(filename, resp.text)
pass
class CitizenBaseAPI:
@ -141,6 +138,27 @@ class CitizenBaseAPI:
url = f'http://{username}:{password}@{host}:{port}' if username and password else f'socks5://{host}:{port}'
self._req.proxies = dict(http=url)
def _get_main_session_captcha(self) -> Response:
return self.get(f'{self.url}/main/sessionCaptcha')
def _get_main_session_unlock_popup(self) -> Response:
return self.get(f'{self.url}/main/sessionUnlockPopup')
def _post_main_session_get_challenge(self, captcha_id: int, image_id: str = "") -> Response:
env = dict(l=['tets', ], s=[], c=["l_chathwe", "l_chatroom"], m=0)
data = dict(_token=self.token, captchaId=captcha_id, env=utils.b64json(env))
if image_id:
data.update(imageId=image_id, isRefresh=True)
return self.post(f'{self.url}/main/sessionGetChallenge', data=data)
def _post_main_session_unlock(
self, captcha: int, image: str, challenge: str, coords: List[Dict[str, int]], src: str
) -> Response:
env = dict(l=['tets', ], s=[], c=["l_chathwe", "l_chatroom"], m=0)
data = dict(_token=self.token, captchaId=captcha, imageId=image, challengeId=challenge,
clickMatrix=coords, isMobile=0, env=utils.b64json(env), src=src)
return self.post(f'{self.url}/main/sessionUnlock', data=data)
class ErepublikAnniversaryAPI(CitizenBaseAPI):
def _post_main_collect_anniversary_reward(self) -> Response:
@ -457,9 +475,20 @@ class ErepublikMilitaryAPI(CitizenBaseAPI):
data = dict(sideId=side_id, battleId=battle_id, _token=self.token, battleZoneId=zone_id)
return self.post(f"{self.url}/military/fight-shooot/{battle_id}", data=data)
def _post_fight_deploy_deploy_report_data(self, deployment_id: int):
def _post_fight_deploy_deploy_report_data(self, deployment_id: int) -> Response:
data = dict(_token=self.token, deploymentId=deployment_id)
return self.post(f"{self.url}/military/fightDeploy-deployReportData", json=data)
return self.post(f"{self.url}/military/fightDeploy-deployReportData", data=data)
def _post_fight_deploy_get_inventory(self, battle_id: int, side_id: int, battle_zone_id: int) -> Response:
data = dict(_token=self.token, battleId=battle_id, sideCountryId=side_id, battleZoneId=battle_zone_id)
return self.post(f"{self.url}/military/fightDeploy-getInventory", data=data)
def _post_fight_deploy_start_deploy(
self, battle_id: int, side_id: int, battle_zone_id: int, energy: int, weapon: int, **kwargs
) -> Response:
data = dict(_token=self.token, battleId=battle_id, battleZoneId=battle_zone_id, sideCountryId=side_id,
weaponQuality=weapon, totalEnergy=energy, **kwargs)
return self.post(f"{self.url}/military/fightDeploy-startDeploy", data=data)
class ErepublikPoliticsAPI(CitizenBaseAPI):

View File

@ -1,5 +1,5 @@
import logging
import re
import sys
import warnings
import weakref
from datetime import datetime, time, timedelta
@ -13,6 +13,7 @@ from requests import HTTPError, RequestException, Response
from . import access_points, classes, constants, types, utils
from .classes import OfferItem
from .logging import ErepublikLogConsoleHandler, ErepublikFormatter, ErepublikFileHandler, ErepublikErrorHTTTPHandler
class BaseCitizen(access_points.CitizenAPI):
@ -27,6 +28,7 @@ class BaseCitizen(access_points.CitizenAPI):
eb_normal: int = 0
eb_double: int = 0
eb_small: int = 0
eb_triple: int = 0
division: int = 0
maverick: bool = False
@ -43,6 +45,8 @@ class BaseCitizen(access_points.CitizenAPI):
stop_threads: Event = None
telegram: classes.TelegramReporter = None
logger: logging.Logger
r: Response = None
name: str = 'Not logged in!'
logged_in: bool = False
@ -57,6 +61,9 @@ class BaseCitizen(access_points.CitizenAPI):
self.my_companies = classes.MyCompanies(self)
self.reporter = classes.Reporter(self)
self.stop_threads = Event()
logger_class = logging.getLoggerClass()
self.logger = logger_class('Citizen')
self.telegram = classes.TelegramReporter(stop_event=self.stop_threads)
self.config.email = email
@ -86,6 +93,7 @@ class BaseCitizen(access_points.CitizenAPI):
self.token = re_login_token.group(1)
self._login()
else:
self.logger.error("Something went wrong! Can't find token in page!")
raise classes.ErepublikException("Something went wrong! Can't find token in page! Exiting!")
try:
self.update_citizen_info(resp.text)
@ -98,13 +106,13 @@ class BaseCitizen(access_points.CitizenAPI):
if 'params' in kwargs:
if '_token' in kwargs['params']:
kwargs['params']['_token'] = self.token
if url == self.r.url and not url == self.url: # Don't duplicate requests, except for homepage
if self.r and url == self.r.url and not url == self.url: # Don't duplicate requests, except for homepage
response = self.r
else:
try:
response = super().get(url, **kwargs)
except RequestException as e:
self.write_log('Network error while issuing GET request', e)
self.logger.error('Network error while issuing GET request', exc_info=e)
self.sleep(60)
return self.get(url, **kwargs)
@ -137,14 +145,14 @@ class BaseCitizen(access_points.CitizenAPI):
try:
response = super().post(url, data=data, json=json, **kwargs)
except RequestException as e:
self.write_log('Network error while issuing POST request', e)
self.logger.error('Network error while issuing POST request', exc_info=e)
self.sleep(60)
return self.post(url, data=data, json=json, **kwargs)
try:
r_json = response.json()
if (r_json.get('error') or not r_json.get('status')) and r_json.get('message', '') == 'captcha':
utils.send_email(self.name, [response.text, ], player=self, captcha=True)
self.logger.warning('Regular captcha must be filled!', extra=r_json)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
@ -186,7 +194,6 @@ class BaseCitizen(access_points.CitizenAPI):
for kind, time_until in self.promos.items():
active_promos.append(f"{kind} active until {time_until}")
self.reporter.report_promo(kind, time_until)
utils.send_email(self.name, active_promos, player=self, promo=True)
new_date = re.search(r"var new_date = '(\d*)';", html)
if new_date:
@ -241,6 +248,31 @@ class BaseCitizen(access_points.CitizenAPI):
"""
self._update_inventory_data(self._get_economy_inventory_items().json())
def do_captcha_challenge(self, retry: int = 0) -> bool:
r = self._get_main_session_captcha()
data = re.search(r'\$j\.extend\(SERVER_DATA,([^)]+)\)', r.text)
if data:
data = utils.json_loads(utils.normalize_html_json(data.group(1)))
captcha_id = data.get('sessionValidation', {}).get("captchaId")
captcha_data = self._post_main_session_get_challenge(captcha_id).json()
coordinates = self.solve_captcha(captcha_data.get('src'))
r = self._post_main_session_unlock(
captcha_id, captcha_data['imageId'], captcha_data['challengeId'], coordinates, captcha_data['src']
).json()
if not r.get('error') and r.get('verified'):
return True
else:
self.report_error('Captcha failed!')
if retry < 6:
return self.do_captcha_challenge(retry + 1)
return False
def refresh_captcha_image(self, captcha_id: int, image_id: str):
return self._post_main_session_get_challenge(captcha_id, image_id).json()
def solve_captcha(self, src: str) -> List[Dict[str, int]]:
raise NotImplemented
@property
def inventory(self) -> classes.Inventory:
return self.get_inventory()
@ -271,7 +303,7 @@ class BaseCitizen(access_points.CitizenAPI):
return
self._last_inventory_update = self.now
self.food.update(q1=0, q2=0, q3=0, q4=0, q5=0, q6=0, q7=0)
self.eb_small = self.eb_double = self.eb_normal = 0
self.eb_triple = self.eb_small = self.eb_double = self.eb_normal = 0
active_items: types.InvFinal = {}
if data.get('activeEnhancements', {}).get('items', {}):
for item_data in data.get('activeEnhancements', {}).get('items', {}).values():
@ -289,7 +321,8 @@ class BaseCitizen(access_points.CitizenAPI):
expiration_info = [_expire_value_to_python(v) for v in expire_info['value']]
if not item_data.get('icon') and item_data.get('isPackBooster'):
item_data['icon'] = f"//www.erepublik.com/images/icons/boosters/52px/{item_data.get('type')}.png"
icon = item_data['icon'] if item_data['icon'] else "//www.erepublik.net/images/modules/manager/tab_storage.png"
icon = item_data['icon'] if item_data[
'icon'] else "//www.erepublik.net/images/modules/manager/tab_storage.png"
inv_item: types.InvFinalItem = dict(
name=item_data.get('name'), time_left=item_data['active']['time_left'], icon=icon,
kind=kind, expiration=expiration_info, quality=item_data.get('quality', 0)
@ -333,15 +366,12 @@ class BaseCitizen(access_points.CitizenAPI):
self.eb_normal = amount
elif q == 11:
self.eb_double = amount
elif q == 13:
self.eb_small += amount
elif q == 14:
self.eb_small += amount
elif q == 15:
item_data.update(token='energy_bar')
elif 11 < q < 17:
self.eb_small += amount
item_data.update(token='energy_bar')
elif q == 16:
self.eb_small += amount
elif q == 17:
self.eb_triple = amount
item_data.update(token='energy_bar')
kind = re.sub(r'_q\d\d*', "", item_data.get('token'))
@ -444,17 +474,14 @@ class BaseCitizen(access_points.CitizenAPI):
self._inventory.offers = offers
self.food['total'] = sum([self.food[q] * constants.FOOD_ENERGY[q] for q in constants.FOOD_ENERGY])
def write_log(self, *args, **kwargs):
if self.config.interactive:
utils.write_interactive_log(*args, **kwargs)
else:
utils.write_silent_log(*args, **kwargs)
def write_log(self, msg: str):
self.logger.info(msg)
def report_error(self, msg: str = "", is_warning: bool = False):
if is_warning:
utils.process_warning(msg, self.name, sys.exc_info(), self)
self.logger.warning(msg)
else:
utils.process_error(msg, self.name, sys.exc_info(), self, None, None)
self.logger.error(msg)
def sleep(self, seconds: Union[int, float, Decimal]):
if seconds < 0:
@ -464,12 +491,43 @@ class BaseCitizen(access_points.CitizenAPI):
else:
sleep(seconds)
def set_debug(self, debug: bool):
self.debug = bool(debug)
self._req.debug = bool(debug)
def init_logger(self):
for handler in list(self.logger.handlers):
self.logger.removeHandler(handler)
formatter = ErepublikFormatter()
if self.config.interactive:
console_handler = ErepublikLogConsoleHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
file_handler = ErepublikFileHandler()
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
error_handler = ErepublikErrorHTTTPHandler(self.reporter)
error_handler.setFormatter(formatter)
self.logger.addHandler(error_handler)
self.logger.setLevel(logging.INFO)
def set_debug(self, enable: bool):
self.debug = bool(enable)
self._req.debug = bool(enable)
self.logger.setLevel(logging.DEBUG if enable else logging.INFO)
for handler in self.logger.handlers:
if isinstance(handler, (ErepublikLogConsoleHandler, ErepublikFileHandler)):
handler.setLevel(logging.DEBUG if enable else logging.INFO)
self.logger.debug(f"Debug messages {'enabled' if enable else 'disabled'}!")
def set_interactive(self, enable: bool):
for handler in self.logger.handlers:
if isinstance(handler, (ErepublikLogConsoleHandler,)):
self.logger.removeHandler(handler)
if enable:
handler = ErepublikLogConsoleHandler()
handler.setLevel(logging.DEBUG if self.debug else logging.INFO)
self.logger.addHandler(handler)
def to_json(self, indent: bool = False) -> str:
return utils.json.dumps(self, cls=classes.ErepublikJSONEncoder, indent=4 if indent else None, sort_keys=True)
return utils.json_dumps(self, indent=4 if indent else None, sort_keys=True)
def get_countries_with_regions(self) -> Set[constants.Country]:
r_json = self._post_main_travel_data().json()
@ -482,9 +540,9 @@ class BaseCitizen(access_points.CitizenAPI):
def dump_instance(self):
filename = f"{self.__class__.__name__}__dump.json"
with open(filename, 'w') as f:
utils.json.dump(dict(config=self.config, cookies=self._req.cookies.get_dict(),
user_agent=self._req.headers.get("User-Agent")), f, cls=classes.ErepublikJSONEncoder)
self.write_log(f"Session saved to: '{filename}'")
utils.json_dump(dict(config=self.config, cookies=self._req.cookies.get_dict(),
user_agent=self._req.headers.get("User-Agent")), f,)
self.logger.debug(f"Session saved to: '{filename}'")
@classmethod
def load_from_dump(cls, dump_name: str):
@ -496,6 +554,7 @@ class BaseCitizen(access_points.CitizenAPI):
for k, v in data.get('config', {}).items():
if hasattr(player.config, k):
setattr(player.config, k, v)
player.init_logger()
player._resume_session()
return player
@ -527,7 +586,7 @@ class BaseCitizen(access_points.CitizenAPI):
ret = super().as_dict
ret.update(
name=self.name, __str__=self.__str__(),
ebs=dict(normal=self.eb_normal, double=self.eb_double, small=self.eb_small),
ebs=dict(normal=self.eb_normal, double=self.eb_double, small=self.eb_small, triple=self.eb_triple),
promos=self.promos, inventory=self._inventory.as_dict, ot_points=self.ot_points, food=self.food,
division=self.division, maveric=self.maverick, eday=self.eday, wheel_of_fortune=self.wheel_of_fortune,
debug=self.debug,
@ -697,12 +756,10 @@ class BaseCitizen(access_points.CitizenAPI):
self.eb_normal -= amount
elif q == '11':
self.eb_double -= amount
elif q == '12':
self.eb_small -= amount
elif q == '15':
self.eb_small -= amount
elif q == '16':
elif 11 < int(q) < 17:
self.eb_small -= amount
elif q == '17':
self.eb_triple -= amount
next_recovery = r_json.get('food_remaining_reset').split(":")
self.energy.set_reference_time(
utils.good_timedelta(self.now, timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2])))
@ -717,8 +774,7 @@ class BaseCitizen(access_points.CitizenAPI):
self.r = r
if r.url == f"{self.url}/login":
self.write_log("Citizen email and/or password is incorrect!")
raise KeyboardInterrupt
self.logger.error("Citizen email and/or password is incorrect!")
else:
re_name_id = re.search(r'<a data-fblog="profile_avatar" href="/en/citizen/profile/(\d+)" '
r'class="user_avatar" title="(.*?)">', r.text)
@ -737,17 +793,20 @@ class BaseCitizen(access_points.CitizenAPI):
try:
j = response.json()
if j['error'] and j['message'] == 'Too many requests':
self.write_log('Made too many requests! Sleeping for 30 seconds.')
self.logger.warning('Made too many requests! Sleeping for 30 seconds.')
self.sleep(30)
except (utils.json.JSONDecodeError, KeyError, TypeError):
pass
if response.status_code >= 400:
self.r = response
if response.status_code >= 500:
if response.text == 'Please verify your account.' or response.text == 'Forbidden':
self.do_captcha_challenge()
return True
elif response.status_code >= 500:
if self.restricted_ip:
self._req.cookies.clear()
return True
self.write_log('eRepublik servers are having internal troubles. Sleeping for 5 minutes')
self.logger.warning('eRepublik servers are having internal troubles. Sleeping for 5 minutes')
self.sleep(5 * 60)
else:
raise classes.ErepublikException(f"HTTP {response.status_code} error!")
@ -755,12 +814,12 @@ class BaseCitizen(access_points.CitizenAPI):
if re.search(r'Occasionally there are a couple of things which we need to check or to implement in order make '
r'your experience in eRepublik more pleasant. <strong>Don\'t worry about ongoing battles, timer '
r'will be stopped during maintenance.</strong>', response.text):
self.write_log('eRepublik is having maintenance. Sleeping for 5 minutes')
self.logger.warning('eRepublik is having maintenance. Sleeping for 5 mi#nutes')
self.sleep(5 * 60)
return True
if re.search('We are experiencing some tehnical dificulties', response.text):
self.write_log('eRepublik is having technical difficulties. Sleeping for 5 minutes')
elif re.search('We are experiencing some tehnical dificulties', response.text):
self.logger.warning('eRepublik is having technical difficulties. Sleeping for 5 minutes')
self.sleep(5 * 60)
return True
@ -778,9 +837,12 @@ class BaseCitizen(access_points.CitizenAPI):
:param msg: Message about the action
:param kwargs: Extra information regarding action
"""
kwargs = utils.json.loads(utils.json.dumps(kwargs or {}, cls=classes.ErepublikJSONEncoder))
kwargs = utils.json_loads(utils.json_dumps(kwargs or {}))
action = action[:32]
self.write_log(msg)
if msg.startswith('Unable to'):
self.logger.warning(msg)
else:
self.write_log(msg)
if self.reporter.allowed:
self.reporter.report_action(action, kwargs, msg)
if self.config.telegram:
@ -806,7 +868,7 @@ class CitizenAnniversary(BaseCitizen):
def spin_wheel_of_fortune(self, max_cost=0, spin_count=0):
if not self.config.spin_wheel_of_fortune:
self.write_log("Unable to spin wheel of fortune because 'config.spin_wheel_of_fortune' is False")
self.logger.warning("Unable to spin wheel of fortune because 'config.spin_wheel_of_fortune' is False")
return
def _write_spin_data(cost: int, prize: str):
@ -1012,7 +1074,7 @@ class CitizenCompanies(BaseCitizen):
if wam_list:
data.update(extra)
if not self.details.current_region == wam_holding.region:
self.write_log("Unable to work as manager because of location - please travel!")
self.logger.warning("Unable to work as manager because of location - please travel!")
return
employ_factories = self.my_companies.get_employable_factories()
@ -1038,7 +1100,7 @@ class CitizenCompanies(BaseCitizen):
"""
Assigns factory to new holding
"""
self.write_log(f"{company} moved to {holding}")
self.logger.debug(f"{company} moved to {holding}")
company._holding = weakref.ref(holding)
return self._post_economy_assign_to_holding(company.id, holding.id)
@ -1053,7 +1115,7 @@ class CitizenCompanies(BaseCitizen):
company_name = constants.INDUSTRIES[industry_id]
if building_type == 2:
company_name = 'Storage'
self.write_log(f'{company_name} created!')
self.logger.info(f'{company_name} created!')
return self._post_economy_create_company(industry_id, building_type)
@ -1105,8 +1167,7 @@ class CitizenEconomy(CitizenTravel):
if buy is None:
pass
elif buy['error']:
msg = f'Unable to buy q{q} house! \n{buy["message"]}'
self.write_log(msg)
self.logger.warning(f'Unable to buy q{q} house! \n{buy["message"]}')
else:
ok_to_activate = True
else:
@ -1212,7 +1273,7 @@ class CitizenEconomy(CitizenTravel):
if isinstance(industry, str):
industry = constants.INDUSTRIES[industry]
if not constants.INDUSTRIES[industry]:
self.write_log(f"Trying to sell unsupported industry {industry}")
self.logger.warning(f"Trying to sell unsupported industry {industry}")
_inv_qlt = quality if industry in [1, 2, 3, 4, 23] else 0
final_kind = industry in [1, 2, 4, 23]
@ -1267,7 +1328,7 @@ class CitizenEconomy(CitizenTravel):
quality = 1
product_name = raw_short_names[product_name]
elif not constants.INDUSTRIES[product_name]:
self.write_log(f"Industry '{product_name}' not implemented")
self.logger.error(f"Industry '{product_name}' not implemented")
raise classes.ErepublikException(f"Industry '{product_name}' not implemented")
offers: Dict[str, classes.OfferItem] = {}
@ -1300,7 +1361,7 @@ class CitizenEconomy(CitizenTravel):
constants.COUNTRIES[int(offer['country_id'])], int(offer['amount']),
int(offer['id']), int(offer['citizen_id'])
)
self.write_log(f"Scraped market in {self.now - start_dt}!")
self.logger.debug(f"Scraped market in {self.now - start_dt}!")
return offers
@ -1324,7 +1385,7 @@ class CitizenEconomy(CitizenTravel):
self.update_inventory()
else:
s = f"Don't have enough money! Needed: {amount * cheapest.price}cc, Have: {self.details.cc}cc"
self.write_log(s)
self.logger.warning(s)
self._report_action('BUY_FOOD', s)
def get_monetary_offers(self, currency: int = 62) -> List[Dict[str, Union[int, float]]]:
@ -1388,7 +1449,7 @@ class CitizenEconomy(CitizenTravel):
self._report_action('DONATE_ITEMS', msg)
return amount
elif re.search('You must wait 5 seconds before donating again', response.text):
self.write_log('Previous donation failed! Must wait at least 5 seconds before next donation!')
self.logger.warning('Previous donation failed! Must wait at least 5 seconds before next donation!')
self.sleep(5)
return self.donate_items(citizen_id, int(amount), industry_id, quality)
else:
@ -1540,7 +1601,7 @@ class CitizenMedia(BaseCitizen):
kwargs=article_data)
self._get_main_delete_article(article_id)
else:
self.write_log(f"Unable to delete article (#{article_id})!")
self.logger.warning(f"Unable to delete article (#{article_id})!")
class CitizenMilitary(CitizenTravel):
@ -1773,7 +1834,7 @@ class CitizenMilitary(CitizenTravel):
side = battle.defender if defender_side else battle.invader
if not silent:
self.write_log(battle)
self.write_log(str(battle))
travel = (self.config.travel_to_fight and self.should_travel_to_fight() or self.config.force_travel) \
if travel_needed else True
@ -1796,18 +1857,8 @@ class CitizenMilitary(CitizenTravel):
if battle.start > self.now:
self.sleep(utils.get_sleep_seconds(battle.start))
if travel_needed:
if battle.is_rw:
countries_to_travel = [battle.defender.country]
elif self.details.current_country in battle.invader.allies:
countries_to_travel = battle.invader.deployed + [battle.invader.country]
side = battle.invader
else:
countries_to_travel = battle.defender.deployed + [battle.defender.country]
side = battle.defender
if not self.travel_to_battle(battle, countries_to_travel):
break
if travel_needed and not self.change_division(battle, division, side):
break
if self.change_division(battle, division):
self.set_default_weapon(battle, division)
@ -1835,6 +1886,7 @@ class CitizenMilitary(CitizenTravel):
:rtype: int
"""
if self.restricted_ip:
self.logger.warning('Fighting is not allowed from restricted IP!')
self._report_action('IP_BLACKLISTED', 'Fighting is not allowed from restricted IP!')
return 1
if not division.is_air and self.config.boosters:
@ -1847,7 +1899,7 @@ class CitizenMilitary(CitizenTravel):
if count is None:
count = self.should_fight()[0]
self.write_log(f"Fighting in battle for {battle.region_name} on {side} side\n{battle}\n{str(division)}")
self.write_log(f"Fighting in battle for {battle.region_name} on {side} side in d{division.div}")
total_damage = 0
total_hits = 0
@ -1891,13 +1943,13 @@ class CitizenMilitary(CitizenTravel):
elif r_json.get('message') == 'NOT_ENOUGH_WEAPONS':
self.set_default_weapon(battle, division)
elif r_json.get('message') == "Cannot activate a zone with a non-native division":
self.write_log('Wrong division!!')
self.logger.warning('Wrong division!!')
return 0, 10, 0
elif r_json.get('message') == 'ZONE_INACTIVE':
self.write_log('Wrong division!!')
self.logger.warning('Wrong division!!')
return 0, 10, 0
elif r_json.get('message') == 'NON_BELLIGERENT':
self.write_log("Dictatorship/Liberation wars are not supported!")
self.logger.warning("Dictatorship/Liberation wars are not supported!")
return 0, 10, 0
elif r_json.get('message') in ['FIGHT_DISABLED', 'DEPLOYMENT_MODE']:
self._post_main_profile_update('options',
@ -1997,7 +2049,7 @@ class CitizenMilitary(CitizenTravel):
"""
resp = self._post_main_battlefield_change_division(battle.id, division.id, side.id if side else None)
if resp.json().get('error'):
self.write_log(resp.json().get('message'))
self.logger.warning(resp.json().get('message'))
return False
self._report_action('MILITARY_DIV_SWITCH', f"Switched to d{division.div} in battle {battle.id}",
kwargs=resp.json())
@ -2107,7 +2159,7 @@ class CitizenMilitary(CitizenTravel):
count = (self.energy.limit * 3) // 10
force_fight = True
else:
self.write_log('Waiting for fully recovered energy before leveling up.', False)
self.write_log('Waiting for fully recovered energy before leveling up.')
# Levelup reachable
elif self.is_levelup_close:
@ -2232,11 +2284,59 @@ class CitizenMilitary(CitizenTravel):
if division.wall['dom'] == 50 or division.wall['dom'] > 98:
yield division, division.wall['for'] == battle.invader.country.id
def report_fighting(self, battle: classes.Battle, invader: bool, division: classes.BattleDivision, damage: float, hits: int):
def report_fighting(self, battle: classes.Battle, invader: bool, division: classes.BattleDivision, damage: float,
hits: int):
self.reporter.report_fighting(battle, invader, division, damage, hits)
if self.config.telegram:
self.telegram.report_fight(battle, invader, division, damage, hits)
def get_deploy_inventory(self, division: classes.BattleDivision, side: classes.BattleSide):
ret = self._post_fight_deploy_get_inventory(division.battle.id, side.id, division.id).json()
# if ret.get('recoverableEnergyBuyFood'):
# self.buy_food()
# return self.get_deploy_inventory(division, side)
if ret.get('captcha'):
while not self.do_captcha_challenge():
self.sleep(5)
return ret
def deploy(self, division: classes.BattleDivision, side: classes.BattleSide, energy: int):
_energy = int(energy)
deploy_inv = self.get_deploy_inventory(division, side)
if not deploy_inv['minEnergy'] <= energy <= deploy_inv['maxEnergy']:
return 0
energy_sources = {}
source_idx = 0
recoverable = deploy_inv['recoverableEnergy']
for source in reversed(sorted(deploy_inv['energySources'], key=lambda s: (s['type'], s.get('quality', 0)))):
if source['type'] == 'pool':
_energy -= source['energy']
elif source['type'] in ['food', 'energy_bar']:
recovers = source['energy'] / source['amount']
amount = recoverable // recovers
amount = amount if amount < source['amount'] else source['amount']
if amount > 0:
energy_sources.update({f'energySources[{source_idx}][quality]': source['quality']})
energy_sources.update({f'energySources[{source_idx}][amount]': amount})
source_idx += 1
used_energy = amount * recovers
recoverable -= used_energy
_energy -= used_energy
energy -= _energy
weapon_q = -1
weapon_strength = 0
if not division.is_air:
for weapon in sorted(deploy_inv['weapons'], key=lambda w: w['damageperHit']):
if (weapon['damageperHit'] or 0) > weapon_strength and (weapon['amount'] or 0) > 50:
weapon_q = weapon['quality']
r = self._post_fight_deploy_start_deploy(
division.battle.id, side.id, division.id, energy, weapon_q, **energy_sources
).json()
self.write_log(r.get('message'))
if r.get('error'):
self.logger.error(f"Deploy failed: '{r.get('message')}'")
return energy
class CitizenPolitics(BaseCitizen):
def get_country_parties(self, country: constants.Country = None) -> dict:
@ -2308,7 +2408,7 @@ class CitizenSocial(BaseCitizen):
def add_every_player_as_friend(self):
cities = []
cities_dict = {}
self.write_log('WARNING! This will take a lot of time.')
self.logger.warning('his will take a lot of time.')
rj = self._post_main_travel_data(regionId=662, check='getCountryRegions').json()
for region_data in rj.get('regions', {}).values():
cities.append(region_data['cityId'])
@ -2419,7 +2519,7 @@ class CitizenTasks(CitizenEconomy):
self._eat('blue')
if self.energy.food_fights < 1:
seconds = (self.energy.reference_time - self.now).total_seconds()
self.write_log(f"I don't have energy to work. Will sleep for {seconds}s")
self.logger.warning(f"I don't have energy to work. Will sleep for {seconds}s")
self.sleep(seconds)
self._eat('blue')
self.work()
@ -2449,7 +2549,7 @@ class CitizenTasks(CitizenEconomy):
if self.energy.food_fights < len(tgs):
large = max(self.energy.reference_time, self.now)
sleep_seconds = utils.get_sleep_seconds(large)
self.write_log(f"I don't have energy to train. Will sleep for {sleep_seconds} seconds")
self.logger.warning(f"I don't have energy to train. Will sleep for {sleep_seconds} seconds")
self.sleep(sleep_seconds)
self._eat('blue')
self.train()
@ -2473,7 +2573,7 @@ class CitizenTasks(CitizenEconomy):
if self.energy.food_fights < 1:
large = max(self.energy.reference_time, self.now)
sleep_seconds = utils.get_sleep_seconds(large)
self.write_log(f"I don't have energy to work OT. Will sleep for {sleep_seconds}s")
self.logger.warning(f"I don't have energy to work OT. Will sleep for {sleep_seconds}s")
self.sleep(sleep_seconds)
self._eat('blue')
self.work_ot()
@ -2553,7 +2653,7 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
if hasattr(self.config, key):
setattr(self.config, key, value)
else:
self.write_log(f"Unknown config parameter! ({key}={value})")
self.logger.warning(f"Unknown config parameter! ({key}={value})")
def login(self):
self.get_csrf_token()
@ -2567,6 +2667,11 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
self.telegram.send_message(f"*Started* {utils.now():%F %T}")
self.update_all(True)
for handler in self.logger.handlers:
if isinstance(handler, ErepublikErrorHTTTPHandler):
self.logger.removeHandler(handler)
break
self.logger.addHandler(ErepublikErrorHTTTPHandler(self.reporter))
def update_citizen_info(self, html: str = None):
"""
@ -2584,8 +2689,8 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
if self.details.gold >= 54:
self.buy_tg_contract()
else:
self.write_log(f'Training ground contract active but '
f"don't have enough gold ({self.details.gold}g {self.details.cc}cc)")
self.logger.warning('Training ground contract active but '
f"don't have enough gold ({self.details.gold}g {self.details.cc}cc)")
if self.energy.is_energy_full and self.config.telegram:
self.telegram.report_full_energy(self.energy.available, self.energy.limit, self.energy.interval)
@ -2686,7 +2791,7 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
count = count if max_count > count else max_count
if not silent:
self.write_log(log_msg, False)
self.write_log(log_msg)
return count, log_msg, force_fight
@ -2742,17 +2847,17 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
if self.energy.limit - self.energy.recovered > self.energy.interval or not self.energy.recoverable % 2:
super().eat()
else:
self.write_log("I don't want to eat right now!")
self.logger.debug("I don't want to eat right now!")
else:
self.write_log(f"I'm out of food! But I'll try to buy some!\n{self.food}")
self.logger.warning(f"I'm out of food! But I'll try to buy some!\n{self.food}")
self.buy_food()
if self.food['total'] > self.energy.interval:
super().eat()
else:
self.write_log('I failed to buy food')
self.logger.warning('I failed to buy food')
def eat_eb(self):
self.write_log('Eating energy bar')
self.logger.warning('Eating energy bar')
if self.energy.recoverable:
self._eat('blue')
self._eat('orange')
@ -2816,7 +2921,7 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
if not rj.get('error'):
amount_needed -= amount
else:
self.write_log(rj.get('message', ""))
self.logger.warning(rj.get('message', ""))
self._report_action(
'ECONOMY_BUY', f"Unable to buy products! Reason: {rj.get('message')}", kwargs=rj
)
@ -2835,11 +2940,11 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
self._wam(holding)
elif response.get('message') == 'tax_money':
self._report_action('WORK_AS_MANAGER', 'Not enough money to work as manager!', kwargs=response)
self.write_log('Not enough money to work as manager!')
self.logger.warning('Not enough money to work as manager!')
else:
msg = f'I was not able to wam and or employ because:\n{response}'
self._report_action('WORK_AS_MANAGER', f'Worked as manager failed: {msg}', kwargs=response)
self.write_log(msg)
self.logger.warning(msg)
def work_as_manager(self) -> bool:
""" Does Work as Manager in all holdings with wam. If employees assigned - work them also
@ -2876,12 +2981,12 @@ class _Citizen(CitizenAnniversary, CitizenCompanies, CitizenLeaderBoard,
wam_count = self.my_companies.get_total_wam_count()
if wam_count:
self.write_log(f"Wam ff lockdown is now {wam_count}, was {self.my_companies.ff_lockdown}")
self.logger.debug(f"Wam ff lockdown is now {wam_count}, was {self.my_companies.ff_lockdown}")
self.my_companies.ff_lockdown = wam_count
self.travel_to_residence()
return bool(wam_count)
else:
self.write_log('Did not WAM because I would mess up levelup!')
self.logger.warning('Did not WAM because I would mess up levelup!')
self.my_companies.ff_lockdown = 0
self.update_companies()
@ -2923,9 +3028,7 @@ class Citizen(_Citizen):
def update_weekly_challenge(self):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -2936,9 +3039,7 @@ class Citizen(_Citizen):
def update_companies(self):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -2949,9 +3050,7 @@ class Citizen(_Citizen):
def update_war_info(self):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -2962,9 +3061,7 @@ class Citizen(_Citizen):
def update_job_info(self):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -2975,9 +3072,7 @@ class Citizen(_Citizen):
def update_money(self, page: int = 0, currency: int = 62):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -2988,9 +3083,7 @@ class Citizen(_Citizen):
def update_inventory(self):
if not self._update_lock.wait(self._update_timeout):
e = f'Update concurrency not freed in {self._update_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._update_lock.clear()
@ -3001,9 +3094,7 @@ class Citizen(_Citizen):
def _work_as_manager(self, wam_holding: classes.Holding) -> Optional[Dict[str, Any]]:
if not self._concurrency_lock.wait(self._concurrency_timeout):
e = f'Concurrency not freed in {self._concurrency_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._concurrency_lock.clear()
@ -3015,9 +3106,7 @@ class Citizen(_Citizen):
count: int = None, use_ebs: bool = False) -> Optional[int]:
if not self._concurrency_lock.wait(self._concurrency_timeout):
e = f'Concurrency not freed in {self._concurrency_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._concurrency_lock.clear()
@ -3029,9 +3118,7 @@ class Citizen(_Citizen):
count: int = 1) -> Optional[int]:
if not self._concurrency_lock.wait(self._concurrency_timeout):
e = f'Concurrency not freed in {self._concurrency_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._concurrency_lock.clear()
@ -3042,9 +3129,7 @@ class Citizen(_Citizen):
def buy_market_offer(self, offer: OfferItem, amount: int = None) -> Optional[Dict[str, Any]]:
if not self._concurrency_lock.wait(self._concurrency_timeout):
e = f'Concurrency not freed in {self._concurrency_timeout}sec!'
self.write_log(e)
if self.debug:
self.report_error(e)
self.report_error(e, not self.debug)
return None
try:
self._concurrency_lock.clear()

View File

@ -10,8 +10,8 @@ from requests import Response, Session, post
from . import constants, types, utils
__all__ = ['Battle', 'BattleDivision', 'BattleSide', 'Company', 'Config', 'Details', 'Energy', 'ErepublikException',
'ErepublikJSONEncoder', 'ErepublikNetworkException', 'EnergyToFight', 'Holding', 'Inventory', 'MyCompanies',
'OfferItem', 'Politics', 'Reporter', 'TelegramReporter', ]
'ErepublikNetworkException', 'EnergyToFight', 'Holding', 'Inventory', 'MyCompanies', 'OfferItem', 'Politics',
'Reporter', 'TelegramReporter', ]
class ErepublikException(Exception):
@ -482,6 +482,8 @@ class Details:
def __init__(self):
self.next_pp = []
self.mayhem_skills = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0}
_default_country = constants.Country(0, 'Unknown', 'Unknown', 'XX')
self.citizenship = self.current_country = self.residence_country = _default_country
@property
def xp_till_level_up(self):
@ -603,10 +605,10 @@ class Reporter:
if self.__to_update:
for unreported_data in self.__to_update:
unreported_data.update(player_id=self.citizen_id, key=self.key)
unreported_data = utils.json.loads(utils.json.dumps(unreported_data, cls=ErepublikJSONEncoder))
unreported_data = utils.json_loads(utils.json_dumps(unreported_data))
self._req.post(f"{self.url}/bot/update", json=unreported_data)
self.__to_update.clear()
data = utils.json.loads(utils.json.dumps(data, cls=ErepublikJSONEncoder))
data = utils.json.loads(utils.json_dumps(data))
r = self._req.post(f"{self.url}/bot/update", json=data)
return r
@ -687,33 +689,6 @@ class Reporter:
return []
class ErepublikJSONEncoder(utils.json.JSONEncoder):
def default(self, o):
from erepublik.citizen import Citizen
if isinstance(o, Decimal):
return float(f"{o:.02f}")
elif isinstance(o, datetime.datetime):
return dict(__type__='datetime', date=o.strftime("%Y-%m-%d"), time=o.strftime("%H:%M:%S"),
tzinfo=str(o.tzinfo) if o.tzinfo else None)
elif isinstance(o, datetime.date):
return dict(__type__='date', date=o.strftime("%Y-%m-%d"))
elif isinstance(o, datetime.timedelta):
return dict(__type__='timedelta', days=o.days, seconds=o.seconds,
microseconds=o.microseconds, total_seconds=o.total_seconds())
elif isinstance(o, Response):
return dict(headers=dict(o.__dict__['headers']), url=o.url, text=o.text, status_code=o.status_code)
elif hasattr(o, 'as_dict'):
return o.as_dict
elif isinstance(o, set):
return list(o)
elif isinstance(o, Citizen):
return o.to_json()
try:
return super().default(o)
except Exception as e: # noqa
return 'Object is not JSON serializable'
class BattleSide:
points: int
deployed: List[constants.Country]

157
erepublik/logging.py Normal file
View File

@ -0,0 +1,157 @@
import base64
import datetime
import inspect
import logging
import os
import sys
import weakref
from pathlib import Path
import requests
from logging import handlers, LogRecord
from typing import Union, Dict, Any
from erepublik.classes import Reporter
from erepublik.constants import erep_tz
from erepublik.utils import slugify, json_loads, json, now, json_dumps
class ErepublikFileHandler(handlers.TimedRotatingFileHandler):
_file_path: Path
def __init__(self, filename: str = 'log/erepublik.log', *args, **kwargs):
log_path = Path(filename)
self._file_path = log_path
log_path.parent.mkdir(parents=True, exist_ok=True)
at_time = erep_tz.localize(datetime.datetime.now()).replace(hour=0, minute=0, second=0, microsecond=0)
kwargs.update(atTime=at_time)
super().__init__(filename, *args, **kwargs)
def doRollover(self) -> None:
self._file_path.parent.mkdir(parents=True, exist_ok=True)
super().doRollover()
def emit(self, record: LogRecord) -> None:
self._file_path.parent.mkdir(parents=True, exist_ok=True)
super().emit(record)
class ErepublikLogConsoleHandler(logging.StreamHandler):
def __init__(self, *_):
super().__init__(sys.stdout)
class ErepublikFormatter(logging.Formatter):
"""override logging.Formatter to use an aware datetime object"""
dbg_fmt = "[%(asctime)s] DEBUG: %(module)s: %(lineno)d: %(msg)s"
info_fmt = "[%(asctime)s] %(msg)s"
default_fmt = "[%(asctime)s] %(levelname)s: %(msg)s"
def converter(self, timestamp: Union[int, float]) -> datetime.datetime:
return datetime.datetime.utcfromtimestamp(timestamp).astimezone(erep_tz)
def format(self, record: logging.LogRecord) -> str:
if record.levelno == logging.DEBUG:
self._fmt = self.dbg_fmt
elif record.levelno == logging.INFO:
self._fmt = self.info_fmt
else:
self._fmt = self.default_fmt
self._style = logging.PercentStyle(self._fmt)
return super().format(record)
def formatTime(self, record, datefmt=None):
dt = self.converter(record.created)
if datefmt:
s = dt.strftime(datefmt)
else:
s = dt.strftime('%Y-%m-%d %H:%M:%S')
return s
class ErepublikErrorHTTTPHandler(handlers.HTTPHandler):
def __init__(self, reporter: Reporter):
logging.Handler.__init__(self, level=logging.ERROR)
self._reporter = weakref.ref(reporter)
self.host = 'localhost:5000'
self.url = '/ebot/error/'
self.method = 'POST'
self.secure = False
self.credentials = (str(reporter.citizen_id), reporter.key)
self.context = None
@property
def reporter(self):
return self._reporter()
def mapLogRecord(self, record: logging.LogRecord) -> Dict[str, Any]:
data = super().mapLogRecord(record)
# Log last response
response = self.reporter.citizen.r
url = response.url
last_index = url.index("?") if "?" in url else len(response.url)
name = slugify(response.url[len(self.reporter.citizen.url):last_index])
html = response.text
try:
json_loads(html)
ext = 'json'
except json.decoder.JSONDecodeError:
ext = 'html'
try:
resp_time = datetime.datetime.strptime(
response.headers.get('date'), '%a, %d %b %Y %H:%M:%S %Z'
).replace(tzinfo=datetime.timezone.utc).astimezone(erep_tz).strftime('%F_%H-%M-%S')
except:
resp_time = slugify(response.headers.get('date'))
resp = dict(name=f"{resp_time}_{name}.{ext}", content=html.encode('utf-8'),
mimetype="application/json" if ext == 'json' else "text/html")
files = [('file', (resp.get('name'), resp.get('content'), resp.get('mimetype'))), ]
filename = f'log/{now().strftime("%F")}.log'
if os.path.isfile(filename):
files.append(('file', (filename[4:], open(filename, 'rb'), 'text/plain')))
trace = inspect.trace()
local_vars = {}
if trace:
local_vars = trace[-1][0].f_locals
if local_vars.get('__name__') == '__main__':
local_vars.update(commit_id=local_vars.get('COMMIT_ID'), interactive=local_vars.get('INTERACTIVE'),
version=local_vars.get('__version__'), config=local_vars.get('CONFIG'))
if local_vars:
if 'state_thread' in local_vars:
local_vars.pop('state_thread', None)
if isinstance(local_vars.get('self'), self.reporter.citizen.__class__):
local_vars['self'] = repr(local_vars['self'])
if isinstance(local_vars.get('player'), self.reporter.citizen.__class__):
local_vars['player'] = repr(local_vars['player'])
if isinstance(local_vars.get('citizen'), self.reporter.citizen.__class__):
local_vars['citizen'] = repr(local_vars['citizen'])
files.append(('file', ('local_vars.json', json_dumps(local_vars), "application/json")))
files.append(('file', ('instance.json', self.reporter.citizen.to_json(indent=True), "application/json")))
data.update(files=files)
return data
def emit(self, record):
"""
Emit a record.
Send the record to the Web server as a percent-encoded dictionary
"""
try:
proto = 'https' if self.secure else 'http'
u, p = self.credentials
s = 'Basic ' + base64.b64encode(f'{u}:{p}'.encode('utf-8')).strip().decode('ascii')
headers = {'Authorization': s}
data = self.mapLogRecord(record)
files = data.pop('files') if 'files' in data else None
requests.post(f"{proto}://{self.host}{self.url}", headers=headers, data=data, files=files)
except Exception:
self.handleError(record)

View File

@ -1,18 +1,18 @@
import datetime
import inspect
import os
import re
import sys
import time
import traceback
import unicodedata
import warnings
from base64 import b64encode
from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, List, Union
import pytz
import requests
from requests import Response
from . import __version__, constants
@ -21,12 +21,12 @@ try:
except ImportError:
import json
__all__ = ['VERSION', 'calculate_hit', 'caught_error', 'date_from_eday', 'eday_from_date', 'deprecation',
'get_air_hit_dmg_value', 'get_file', 'get_ground_hit_dmg_value', 'get_sleep_seconds', 'good_timedelta',
'interactive_sleep', 'json', 'localize_dt', 'localize_timestamp', 'normalize_html_json', 'now',
'process_error', 'process_warning', 'send_email', 'silent_sleep', 'slugify', 'write_file', 'write_request',
'write_interactive_log', 'write_silent_log', 'get_final_hit_dmg', 'wait_for_lock',
'json_decode_object_hook', 'json_load', 'json_loads']
__all__ = [
'VERSION', 'calculate_hit', 'date_from_eday', 'eday_from_date', 'deprecation', 'get_final_hit_dmg', 'write_file',
'get_air_hit_dmg_value', 'get_file', 'get_ground_hit_dmg_value', 'get_sleep_seconds', 'good_timedelta', 'slugify',
'interactive_sleep', 'json', 'localize_dt', 'localize_timestamp', 'normalize_html_json', 'now', 'silent_sleep',
'json_decode_object_hook', 'json_load', 'json_loads', 'json_dump', 'json_dumps', 'b64json', 'ErepublikJSONEncoder',
]
VERSION: str = __version__
@ -102,25 +102,25 @@ def interactive_sleep(sleep_seconds: int):
silent_sleep = time.sleep
def _write_log(msg, timestamp: bool = True, should_print: bool = False):
erep_time_now = now()
txt = f"[{erep_time_now.strftime('%F %T')}] {msg}" if timestamp else msg
if not os.path.isdir('log'):
os.mkdir('log')
with open(f'log/{erep_time_now.strftime("%F")}.log', 'a', encoding='utf-8') as f:
f.write(f'{txt}\n')
if should_print:
print(txt)
def write_interactive_log(*args, **kwargs):
kwargs.pop('should_print', None)
_write_log(should_print=True, *args, **kwargs)
def write_silent_log(*args, **kwargs):
kwargs.pop('should_print', None)
_write_log(should_print=False, *args, **kwargs)
# def _write_log(msg, timestamp: bool = True, should_print: bool = False):
# erep_time_now = now()
# txt = f"[{erep_time_now.strftime('%F %T')}] {msg}" if timestamp else msg
# if not os.path.isdir('log'):
# os.mkdir('log')
# with open(f'log/{erep_time_now.strftime("%F")}.log', 'a', encoding='utf-8') as f:
# f.write(f'{txt}\n')
# if should_print:
# print(txt)
#
#
# def write_interactive_log(*args, **kwargs):
# kwargs.pop('should_print', None)
# _write_log(should_print=True, *args, **kwargs)
#
#
# def write_silent_log(*args, **kwargs):
# kwargs.pop('should_print', None)
# _write_log(should_print=False, *args, **kwargs)
def get_file(filepath: str) -> str:
@ -154,89 +154,6 @@ def write_file(filename: str, content: str) -> int:
return ret
def write_request(response: requests.Response, is_error: bool = False):
from erepublik import Citizen
# Remove GET args from url name
url = response.url
last_index = url.index("?") if "?" in url else len(response.url)
name = slugify(response.url[len(Citizen.url):last_index])
html = response.text
try:
json.loads(html)
ext = 'json'
except json.decoder.JSONDecodeError:
ext = 'html'
if not is_error:
filename = f"debug/requests/{now().strftime('%F_%H-%M-%S')}_{name}.{ext}"
write_file(filename, html)
else:
return dict(name=f"{now().strftime('%F_%H-%M-%S')}_{name}.{ext}", content=html.encode('utf-8'),
mimetype="application/json" if ext == 'json' else "text/html")
def send_email(name: str, content: List[Any], player=None, local_vars: Dict[str, Any] = None,
promo: bool = False, captcha: bool = False):
if local_vars is None:
local_vars = {}
from erepublik import Citizen
file_content_template = '<html><head><title>{title}</title></head><body>{body}</body></html>'
if isinstance(player, Citizen) and player.r:
resp = write_request(player.r, is_error=True)
else:
resp = dict(name='None.html', mimetype='text/html',
content=file_content_template.format(body='<br/>'.join(content), title='Error'))
if promo:
resp = dict(name=f"{name}.html", mimetype='text/html',
content=file_content_template.format(title='Promo', body='<br/>'.join(content)))
subject = f"[eBot][{now().strftime('%F %T')}] Promos: {name}"
elif captcha:
resp = dict(name=f'{name}.html', mimetype='text/html',
content=file_content_template.format(title='ReCaptcha', body='<br/>'.join(content)))
subject = f"[eBot][{now().strftime('%F %T')}] RECAPTCHA: {name}"
else:
subject = f"[eBot][{now().strftime('%F %T')}] Bug trace: {name}"
body = "".join(traceback.format_stack()) + \
"\n\n" + \
"\n".join(content)
data = dict(send_mail=True, subject=subject, bugtrace=body)
if promo:
data.update(promo=True)
elif captcha:
data.update(captcha=True)
else:
data.update(bug=True)
files = [('file', (resp.get('name'), resp.get('content'), resp.get('mimetype'))), ]
filename = f'log/{now().strftime("%F")}.log'
if os.path.isfile(filename):
files.append(('file', (filename[4:], open(filename, 'rb'), 'text/plain')))
if local_vars:
if 'state_thread' in local_vars:
local_vars.pop('state_thread', None)
if isinstance(local_vars.get('self'), Citizen):
local_vars['self'] = repr(local_vars['self'])
if isinstance(local_vars.get('player'), Citizen):
local_vars['player'] = repr(local_vars['player'])
if isinstance(local_vars.get('citizen'), Citizen):
local_vars['citizen'] = repr(local_vars['citizen'])
from erepublik.classes import ErepublikJSONEncoder
files.append(('file', ('local_vars.json', json.dumps(local_vars, cls=ErepublikJSONEncoder),
"application/json")))
if isinstance(player, Citizen):
files.append(('file', ('instance.json', player.to_json(indent=True), "application/json")))
requests.post('https://pasts.72.lv', data=data, files=files)
def normalize_html_json(js: str) -> str:
js = re.sub(r' \'(.*?)\'', lambda a: f'"{a.group(1)}"', js)
js = re.sub(r'(\d\d):(\d\d):(\d\d)', r'\1\2\3', js)
@ -245,72 +162,6 @@ def normalize_html_json(js: str) -> str:
return js
def caught_error(e: Exception):
process_error(str(e), 'Unclassified', sys.exc_info(), interactive=False)
def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None,
interactive: bool = None):
"""
Process error logging and email sending to developer
:param interactive: Should print interactively
:type interactive: bool
:param log_info: String to be written in output
:type log_info: str
:param name: String Instance name
:type name: str
:param exc_info: tuple output from sys.exc_info()
:type exc_info: tuple
:param citizen: Citizen instance
:type citizen: Citizen
:param commit_id: Caller's code version's commit id
:type commit_id: str
"""
type_, value_, traceback_ = exc_info
content = [log_info]
content += [f"eRepublik version {VERSION}"]
if commit_id:
content += [f"Commit id {commit_id}"]
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
if interactive:
write_interactive_log(log_info)
elif interactive is not None:
write_silent_log(log_info)
trace = inspect.trace()
if trace:
local_vars = trace[-1][0].f_locals
if local_vars.get('__name__') == '__main__':
local_vars.update(commit_id=local_vars.get('COMMIT_ID'), interactive=local_vars.get('INTERACTIVE'),
version=local_vars.get('__version__'), config=local_vars.get('CONFIG'))
else:
local_vars = dict()
send_email(name, content, citizen, local_vars=local_vars)
def process_warning(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None):
"""
Process error logging and email sending to developer
:param log_info: String to be written in output
:param name: String Instance name
:param exc_info: tuple output from sys.exc_info()
:param citizen: Citizen instance
:param commit_id: Code's version by commit id
"""
type_, value_, traceback_ = exc_info
content = [log_info]
if commit_id:
content += [f'Commit id: {commit_id}']
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
trace = inspect.trace()
if trace:
local_vars = trace[-1][0].f_locals
else:
local_vars = dict()
send_email(name, content, citizen, local_vars=local_vars)
def slugify(value, allow_unicode=False) -> str:
"""
Function copied from Django2.2.1 django.utils.text.slugify
@ -377,25 +228,25 @@ def deprecation(message):
warnings.warn(message, DeprecationWarning, stacklevel=2)
def wait_for_lock(function):
def wrapper(instance, *args, **kwargs):
if not instance.concurrency_available.wait(600):
e = 'Concurrency not freed in 10min!'
instance.write_log(e)
if instance.debug:
instance.report_error(e)
return None
else:
instance.concurrency_available.clear()
try:
ret = function(instance, *args, **kwargs)
except Exception as e:
instance.concurrency_available.set()
raise e
instance.concurrency_available.set()
return ret
return wrapper
# def wait_for_lock(function):
# def wrapper(instance, *args, **kwargs):
# if not instance.concurrency_available.wait(600):
# e = 'Concurrency not freed in 10min!'
# instance.write_log(e)
# if instance.debug:
# instance.report_error(e)
# return None
# else:
# instance.concurrency_available.clear()
# try:
# ret = function(instance, *args, **kwargs)
# except Exception as e:
# instance.concurrency_available.set()
# raise e
# instance.concurrency_available.set()
# return ret
#
# return wrapper
def json_decode_object_hook(
@ -429,3 +280,56 @@ def json_load(f, **kwargs):
def json_loads(s: str, **kwargs):
kwargs.update(object_hook=json_decode_object_hook)
return json.loads(s, **kwargs)
def json_dump(obj, fp, *args, **kwargs):
if not kwargs.get('cls'):
kwargs.update(cls=ErepublikJSONEncoder)
return json.dump(obj, fp, *args, **kwargs)
def json_dumps(obj, *args, **kwargs):
if not kwargs.get('cls'):
kwargs.update(cls=ErepublikJSONEncoder)
return json.dumps(obj, *args, **kwargs)
def b64json(obj: Union[Dict[str, Union[int, List[str]]], List[str]]):
if isinstance(obj, list):
return b64encode(json.dumps(obj).encode('utf-8')).decode('utf-8')
elif isinstance(obj, (int, str)):
return obj
elif isinstance(obj, dict):
for k, v in obj.items():
obj[k] = b64json(v)
else:
from .classes import ErepublikException
raise ErepublikException(f'Unhandled object type! obj is {type(obj)}')
return b64encode(json.dumps(obj).encode('utf-8')).decode('utf-8')
class ErepublikJSONEncoder(json.JSONEncoder):
def default(self, o):
from erepublik.citizen import Citizen
if isinstance(o, Decimal):
return float(f"{o:.02f}")
elif isinstance(o, datetime.datetime):
return dict(__type__='datetime', date=o.strftime("%Y-%m-%d"), time=o.strftime("%H:%M:%S"),
tzinfo=str(o.tzinfo) if o.tzinfo else None)
elif isinstance(o, datetime.date):
return dict(__type__='date', date=o.strftime("%Y-%m-%d"))
elif isinstance(o, datetime.timedelta):
return dict(__type__='timedelta', days=o.days, seconds=o.seconds,
microseconds=o.microseconds, total_seconds=o.total_seconds())
elif isinstance(o, Response):
return dict(headers=dict(o.__dict__['headers']), url=o.url, text=o.text, status_code=o.status_code)
elif hasattr(o, 'as_dict'):
return o.as_dict
elif isinstance(o, set):
return list(o)
elif isinstance(o, Citizen):
return o.to_json()
try:
return super().default(o)
except Exception as e: # noqa
return 'Object is not JSON serializable'

View File

@ -1,19 +1,20 @@
bump2version==1.0.1
coverage==5.3.1
coverage==5.4
edx-sphinx-theme==1.6.1
flake8==3.8.4
ipython>=7.19.0
jedi!=0.18.0
isort==5.7.0
pip==20.3.3
pip==21.0
pre-commit==2.9.3
pur==5.3.0
PyInstaller==4.2
PySocks==1.7.1
pytest==6.2.1
pytest==6.2.2
pytz>=2020.5
requests>=2.25.1
responses==0.12.1
setuptools==51.3.3
setuptools==52.0.0
Sphinx==3.4.3
twine==3.3.0
wheel==0.36.2

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.23.4.16
current_version = 0.24.0.4
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)\.?(?P<dev>\d+)?

View File

@ -50,6 +50,6 @@ setup(
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/eeriks/erepublik/',
version='0.23.4.16',
version='0.24.0.4',
zip_safe=False,
)