import logging
import re
import warnings
import weakref
from datetime import datetime, time, timedelta
from decimal import Decimal
from itertools import product
from threading import Event
from time import sleep
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from requests import HTTPError, RequestException, Response
from erepublik import _types as types
from erepublik import access_points, classes, constants, utils
from erepublik._logging import ErepublikErrorHTTTPHandler, ErepublikFileHandler, ErepublikFormatter, ErepublikLogConsoleHandler
class BaseCitizen(access_points.CitizenAPI):
_last_full_update: datetime = constants.min_datetime
_last_inventory_update: datetime = constants.min_datetime
promos: Dict[str, datetime] = None
_inventory: classes.Inventory
ot_points: int = 0
food: Dict[str, int] = dict(q1=0, q2=0, q3=0, q4=0, q5=0, q6=0, q7=0, total=0)
eb_normal: int = 0
eb_double: int = 0
eb_small: int = 0
eb_triple: int = 0
division: int = 0
maverick: bool = False
eday: int = 0
wheel_of_fortune: bool
debug: bool = False
config: classes.Config = None
energy: classes.Energy = None
details: classes.Details = None
politics: classes.Politics = None
my_companies: classes.MyCompanies = None
reporter: classes.Reporter = None
stop_threads: Event = None
telegram: classes.TelegramReporter = None
logger: logging.Logger
r: Response = None
name: str = "Not logged in!"
logged_in: bool = False
restricted_ip: bool = False
def __init__(self, email: str = "", password: str = ""):
super().__init__()
self.config = classes.Config()
self.energy = classes.Energy()
self.details = classes.Details()
self.politics = classes.Politics()
self.my_companies = classes.MyCompanies(self)
self.reporter = classes.Reporter(self)
self.stop_threads = Event()
logger_class = logging.getLoggerClass()
self.logger = logger_class("Citizen")
self.telegram = classes.TelegramReporter(stop_event=self.stop_threads)
self.config.email = email
self.config.password = password
self._inventory = classes.Inventory()
self.wheel_of_fortune = False
def get_csrf_token(self):
"""
get_csrf_token is the function which logs you in, and updates csrf tokens
(after 15min time of inactivity opening page in eRepublik.com redirects to home page),
by explicitly requesting homepage.
"""
# Idiots have fucked up their session manager - after logging in You might be redirected to public homepage instead of authenticated
resp = self._req.get(self.url if self.logged_in else f"{self.url}/economy/myCompanies")
self.r = resp
if self._errors_in_response(resp):
self.get_csrf_token()
return
html = resp.text
self._check_response_for_medals(html)
re_token = re.search(r"var csrfToken = \'(\w{32})\'", html)
re_login_token = re.search(r'', html)
if re_token:
self.token = re_token.group(1)
elif re_login_token:
self.token = re_login_token.group(1)
self._login()
else:
self.report_error("Something went wrong! Can't find token in page!")
raise classes.ErepublikException("Something went wrong! Can't find token in page! Exiting!")
try:
self.update_citizen_info(resp.text)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
def get(self, url: str, **kwargs) -> Response:
if (self.now - self._req.last_time).seconds >= 15 * 60:
self.get_csrf_token()
if "params" in kwargs:
if "_token" in kwargs["params"]:
kwargs["params"]["_token"] = self.token
if self.r and url == self.r.url and not url == self.url: # Don't duplicate requests, except for homepage
response = self.r
else:
try:
response = super().get(url, **kwargs)
except RequestException:
self.report_error("Network error while issuing GET request")
self.sleep(60)
return self.get(url, **kwargs)
try:
self.update_citizen_info(response.text)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
if self._errors_in_response(response):
self.get_csrf_token()
self.get(url, **kwargs)
else:
self._check_response_for_medals(response.text)
self.r = response
return response
def post(self, url: str, data: dict = None, json: dict = None, **kwargs) -> Response:
if json is None:
json = {}
if data is None:
data = {}
if (self.now - self._req.last_time).seconds >= 14 * 60:
self.get_csrf_token()
if "_token" in data:
data["_token"] = self.token
if "_token" in json:
json["_token"] = self.token
try:
response = super().post(url, data=data, json=json, **kwargs)
except RequestException:
self.report_error("Network error while issuing POST request")
self.sleep(60)
return self.post(url, data=data, json=json, **kwargs)
try:
r_json = response.json()
if (r_json.get("error") or not r_json.get("status")) and r_json.get("message", "") == "captcha":
self.write_warning("Regular captcha must be filled!", extra=r_json)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
if self._errors_in_response(response):
self.get_csrf_token()
if data:
data.update({"_token": self.token})
elif json:
json.update({"_token": self.token})
response = self.post(url, data=data, json=json, **kwargs)
else:
self._check_response_for_medals(response.text)
self.r = response
return response
def update_citizen_info(self, html: str = None):
"""
Gets main page and updates most information about player
"""
if html is None:
self._get_main()
return
ugly_js_match = re.search(r'"promotions":\s*(\[{?.*?}?])', html)
ugly_js = ugly_js_match.group(1) if ugly_js_match else "null"
promos = utils.json_loads(utils.normalize_html_json(ugly_js))
if self.promos:
self.promos = {k: v for k, v in self.promos.items() if v > self.now}
else:
self.promos = {}
try:
if promos:
for promo in promos:
kind = promo["typeId"]
time_until = utils.localize_timestamp(promo["expiresAt"])
if kind not in self.promos:
self.reporter.report_promo(kind, time_until)
self.promos[kind] = time_until
except Exception:
self.process_error()
new_date = re.search(r"var new_date = '(\d*)';", html)
if new_date:
self.energy.set_reference_time(utils.good_timedelta(self.now, timedelta(seconds=int(new_date.group(1)))))
ugly_js = re.search(r"var erepublik = ({.*}),\s+", html).group(1)
citizen_js = utils.json.loads(ugly_js)
citizen = citizen_js.get("citizen", {})
self.details.citizen_id = int(citizen["citizenId"])
self.name = citizen["name"]
self.eday = citizen_js.get("settings").get("eDay")
self.division = int(citizen.get("division", 0))
self.energy.interval = citizen.get("energyPerInterval", 0)
self.energy.limit = citizen.get("energyToRecover", 0)
self.energy.energy = citizen.get("energy", 0)
# self.energy.set_reference_time(utils.good_timedelta(self.now,
# timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2]))))
self.details.current_region = citizen.get("regionLocationId", 0)
self.details.current_country = constants.COUNTRIES.get(citizen.get("countryLocationId", 0)) # country where citizen is located
self.details.residence_region = citizen.get("residence", {}).get("regionId", 0)
self.details.residence_country = constants.COUNTRIES.get(citizen.get("residence", {}).get("countryId", 0))
self.details.citizen_id = citizen.get("citizenId", 0)
self.details.citizenship = constants.COUNTRIES.get(int(citizen.get("country", 0)))
self.details.xp = citizen.get("currentExperiencePoints", 0)
self.details.level = citizen.get("userLevel", 0)
self.details.daily_task_done = citizen.get("dailyTasksDone", False)
self.details.daily_task_reward = citizen.get("hasReward", False)
self.maverick = citizen.get("canSwitchDivisions", False)
if citizen.get("dailyOrderDone", False) and not citizen.get("hasDailyOrderReward", False):
self._post_military_group_missions()
self.details.next_pp.sort()
for skill in citizen.get("terrainSkills", {}).values():
self.details.mayhem_skills.update({int(skill["terrain_id"]): int(skill["skill_points"])})
if citizen.get("party", []):
party = citizen.get("party")
self.politics.is_party_member = True
self.politics.party_id = party.get("party_id")
self.politics.is_party_president = bool(party.get("is_party_president"))
self.politics.party_slug = f"{party.get('stripped_title')}-{party.get('party_id')}"
self.wheel_of_fortune = bool(re.search(r'