import re
import sys
import warnings
import weakref
from datetime import datetime, time, timedelta
from decimal import Decimal
from itertools import product
from threading import Event
from time import sleep
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from requests import HTTPError, RequestException, Response
from . import access_points, classes, constants, types, utils
from .classes import OfferItem
class BaseCitizen(access_points.CitizenAPI):
_last_full_update: datetime = constants.min_datetime
_last_inventory_update: datetime = constants.min_datetime
promos: Dict[str, datetime] = None
_inventory: classes.Inventory
ot_points: int = 0
food: Dict[str, int] = {"q1": 0, "q2": 0, "q3": 0, "q4": 0, "q5": 0, "q6": 0, "q7": 0, "total": 0}
eb_normal: int = 0
eb_double: int = 0
eb_small: int = 0
division: int = 0
maverick: bool = False
eday: int = 0
wheel_of_fortune: bool
debug: bool = False
config: classes.Config = None
energy: classes.Energy = None
details: classes.Details = None
politics: classes.Politics = None
my_companies: classes.MyCompanies = None
reporter: classes.Reporter = None
stop_threads: Event = None
telegram: classes.TelegramReporter = None
r: Response = None
name: str = "Not logged in!"
logged_in: bool = False
restricted_ip: bool = False
def __init__(self, email: str = "", password: str = ""):
super().__init__()
self.config = classes.Config()
self.energy = classes.Energy()
self.details = classes.Details()
self.politics = classes.Politics()
self.my_companies = classes.MyCompanies(self)
self.reporter = classes.Reporter(self)
self.stop_threads = Event()
self.telegram = classes.TelegramReporter(stop_event=self.stop_threads)
self.config.email = email
self.config.password = password
self._inventory = classes.Inventory()
self.wheel_of_fortune = False
def get_csrf_token(self):
"""
get_csrf_token is the function which logs you in, and updates csrf tokens
(after 15min time of inactivity opening page in eRepublik.com redirects to home page),
by explicitly requesting homepage.
"""
resp = self._req.get(self.url)
self.r = resp
if self._errors_in_response(resp):
self.get_csrf_token()
return
html = resp.text
self._check_response_for_medals(html)
re_token = re.search(r'var csrfToken = \'(\w{32})\'', html)
re_login_token = re.search(r'', html)
if re_token:
self.token = re_token.group(1)
elif re_login_token:
self.token = re_login_token.group(1)
self._login()
else:
raise classes.ErepublikException("Something went wrong! Can't find token in page! Exiting!")
try:
self.update_citizen_info(resp.text)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
def get(self, url: str, **kwargs) -> Response:
if (self.now - self._req.last_time).seconds >= 15 * 60:
self.get_csrf_token()
if "params" in kwargs:
if "_token" in kwargs["params"]:
kwargs["params"]["_token"] = self.token
if url == self.r.url and not url == self.url: # Don't duplicate requests, except for homepage
response = self.r
else:
try:
response = super().get(url, **kwargs)
except RequestException as e:
self.write_log("Network error while issuing GET request", e)
self.sleep(60)
return self.get(url, **kwargs)
try:
self.update_citizen_info(response.text)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
if self._errors_in_response(response):
self.get_csrf_token()
self.get(url, **kwargs)
else:
self._check_response_for_medals(response.text)
self.r = response
return response
def post(self, url: str, data: dict = None, json: dict = None, **kwargs) -> Response:
if json is None:
json = {}
if data is None:
data = {}
if (self.now - self._req.last_time).seconds >= 14 * 60:
self.get_csrf_token()
if "_token" in data:
data["_token"] = self.token
if "_token" in json:
json["_token"] = self.token
try:
response = super().post(url, data=data, json=json, **kwargs)
except RequestException as e:
self.write_log("Network error while issuing POST request", e)
self.sleep(60)
return self.post(url, data=data, json=json, **kwargs)
try:
r_json = response.json()
if (r_json.get("error") or not r_json.get("status")) and r_json.get("message", "") == "captcha":
utils.send_email(self.name, [response.text, ], player=self, captcha=True)
except (AttributeError, utils.json.JSONDecodeError, ValueError, KeyError):
pass
if self._errors_in_response(response):
self.get_csrf_token()
if data:
data.update({"_token": self.token})
elif json:
json.update({"_token": self.token})
response = self.post(url, data=data, json=json, **kwargs)
else:
self._check_response_for_medals(response.text)
self.r = response
return response
def update_citizen_info(self, html: str = None):
"""
Gets main page and updates most information about player
"""
if html is None:
self._get_main()
return
ugly_js = re.search(r'"promotions":\s*(\[{?.*?}?])', html).group(1)
promos = utils.json.loads(utils.normalize_html_json(ugly_js))
if self.promos is None:
self.promos = {}
else:
self.promos = {k: v for k, v in self.promos.items() if v > self.now}
send_mail = False
for promo in promos:
promo_name = promo.get("id")
expire = utils.localize_timestamp(int(promo.get("expiresAt")))
if promo_name not in self.promos:
send_mail = True
self.promos.update({promo_name: expire})
if send_mail:
active_promos = []
for kind, time_until in self.promos.items():
active_promos.append(f"{kind} active until {time_until}")
self.reporter.report_promo(kind, time_until)
utils.send_email(self.name, active_promos, player=self, promo=True)
new_date = re.search(r"var new_date = '(\d*)';", html)
if new_date:
self.energy.set_reference_time(
utils.good_timedelta(self.now, timedelta(seconds=int(new_date.group(1))))
)
ugly_js = re.search(r"var erepublik = ({.*}),\s+", html).group(1)
citizen_js = utils.json.loads(ugly_js)
citizen = citizen_js.get("citizen", {})
self.eday = citizen_js.get("settings").get("eDay")
self.division = int(citizen.get("division", 0))
self.energy.interval = citizen.get("energyPerInterval", 0)
self.energy.limit = citizen.get("energyToRecover", 0)
self.energy.recovered = citizen.get("energy", 0)
self.energy.recoverable = citizen.get("energyFromFoodRemaining", 0)
self.details.current_region = citizen.get("regionLocationId", 0)
self.details.current_country = constants.COUNTRIES.get(
citizen.get("countryLocationId", 0)) # country where citizen is located
self.details.residence_region = citizen.get("residence", {}).get("regionId", 0)
self.details.residence_country = constants.COUNTRIES.get(citizen.get("residence", {}).get("countryId", 0))
self.details.citizen_id = citizen.get("citizenId", 0)
self.details.citizenship = constants.COUNTRIES.get(int(citizen.get("country", 0)))
self.details.xp = citizen.get("currentExperiencePoints", 0)
self.details.daily_task_done = citizen.get("dailyTasksDone", False)
self.details.daily_task_reward = citizen.get("hasReward", False)
self.maverick = citizen.get("canSwitchDivisions", False)
if citizen.get("dailyOrderDone", False) and not citizen.get("hasDailyOrderReward", False):
self._post_military_group_missions()
self.details.next_pp.sort()
for skill in citizen.get("terrainSkills", {}).values():
self.details.mayhem_skills.update({int(skill["terrain_id"]): int(skill["skill_points"])})
if citizen.get('party', []):
party = citizen.get('party')
self.politics.is_party_member = True
self.politics.party_id = party.get('party_id')
self.politics.is_party_president = bool(party.get('is_party_president'))
self.politics.party_slug = f"{party.get('stripped_title')}-{party.get('party_id')}"
self.wheel_of_fortune = bool(
re.search(r'