import datetime
import re
import sys
import threading
import time
from json import loads, dumps
from typing import Dict, List, Tuple, Any, Union
import requests
from requests import Response, RequestException
from erepublik_script import classes, utils
class Citizen(classes.CitizenAPI):
url: str = "https://www.erepublik.com/en"
division = 0
all_battles: Dict[int, classes.Battle] = dict()
countries: Dict[int, Dict[str, Union[str, List[int]]]] = dict()
__last_war_update_data = {}
__last_full_update: datetime.datetime
active_fs = False
food = {"q1": 0, "q2": 0, "q3": 0, "q4": 0, "q5": 0, "q6": 0, "q7": 0, "total": 0}
inventory = {"used": 0, "total": 0}
boosters = {
"100_damageBoosters_5_7200": 0, # 2h × 60min × 60sec, +50%
"100_damageBoosters_5_28800": 0, # 8h × 60min × 60sec, +50%
"100_damageBoosters_5_86400": 0, # 1d × 24h × 60min × 60sec, +50%
"100_speedBoosters_1_180": 0, # 3min × 60sec, ×2 kills
"100_speedBoosters_2_600": 0, # 10min × 60sec, ×5 kills
"100_catchupBoosters_30_60": 0, # 60sec, +30%
}
candies_normal = 0
candies_double = 0
candies_small = 0
tickets = 0
work_units = 0
ot_points = 0
tg_contract = {}
promos = {}
eday = 0
r: requests.Response
reporter: classes.Reporter
token = ""
name = "Not logged in!"
debug = False
__registered = False
logged_in = False
def __init__(self, email: str = "", password: str = ""):
super().__init__()
self.commit_id = utils.COMMIT_ID
self.config = classes.Config()
self.config.email = email
self.config.password = password
self.energy = classes.Energy()
self.details = classes.Details()
self.politics = classes.Politics()
self.my_companies = classes.MyCompanies()
self.set_debug(True)
self.reporter = classes.Reporter()
self.get_csrf_token()
self.update_citizen_info()
self.reporter.do_init(self.name, email, self.details.citizen_id)
self.stop_threads = threading.Event()
self.__last_full_update = utils.good_timedelta(self.now, - datetime.timedelta(minutes=5))
def write_log(self, *args, **kwargs):
if self.config.interactive:
utils.write_interactive_log(*args, **kwargs)
else:
utils.write_silent_log(*args, **kwargs)
def sleep(self, seconds: int):
if self.config.interactive:
utils.interactive_sleep(seconds)
else:
time.sleep(seconds)
def __str__(self) -> str:
return "Citizen {}".format(self.name)
@property
def __dict__(self):
ret = super().__dict__.copy()
ret.pop('reporter', None)
ret.pop('stop_threads', None)
return ret
def set_debug(self, debug: bool):
self.debug = debug
self._req.debug = debug
def set_pin(self, pin: int):
self.details.pin = pin
def get_csrf_token(self):
"""
get_csrf_token is the function which logs you in, and updates csrf tokens
(after 15min time of inactivity opening page in eRepublik.com redirects to home page),
by explicitly requesting homepage.
"""
resp = self._req.get(self.url)
self.r = resp
try:
self.update_citizen_info(resp.text)
except:
pass
if self._errors_in_response(resp):
self.get_csrf_token()
return
html = resp.text
self.check_for_new_medals(html)
re_token = re.search(r'var csrfToken = \'(\w{32})\'', html)
re_login_token = re.search(r'', html)
if re_token:
self.token = re_token.group(1)
elif re_login_token:
self.token = re_login_token.group(1)
self._login()
else:
raise classes.ErepublikException("Something went wrong! Can't find token in page! Exiting!")
def _login(self):
# MUST BE CALLED TROUGH self.get_csrf_token()
r = self.post_login(self.token, self.config.email, self.config.password)
self.r = r
if r.url == "{}/login".format(self.url):
self.write_log("Citizen email and/or password is incorrect!")
raise KeyboardInterrupt
else:
re_name_id = re.search(r'', r.text)
self.name = re_name_id.group(2)
self.details.citizen_id = re_name_id.group(1)
self.write_log("Logged in as: {}".format(self.name))
self.get_csrf_token()
self.logged_in = True
def _errors_in_response(self, response: requests.Response):
if response.status_code >= 400:
self.r = response
if response.status_code >= 500:
self.write_log("eRepublik servers are having internal troubles. Sleeping for 5 minutes")
self.sleep(5 * 60)
else:
raise classes.ErepublikException("HTTP {} error!".format(response.status_code))
return bool(re.search(r'body id="error"|Internal Server Error|'
r'CSRF attack detected|meta http-equiv="refresh"|not_authenticated', response.text))
def get(self, url: str, *args, **kwargs) -> Response:
if (self.now - self._req.last_time).seconds >= 15 * 60:
self.get_csrf_token()
if "params" in kwargs:
if "_token" in kwargs["params"]:
kwargs["params"]["_token"] = self.token
if url == self.r.url and not url == self.url: # Don't duplicate requests, except for homepage
response = self.r
else:
try:
response = super().get(url, **kwargs)
except RequestException as e:
self.write_log("Network error while issuing GET request", e)
self.sleep(60)
return self.get(url, *args, **kwargs)
try:
self.update_citizen_info(html=response.text)
except:
pass
if self._errors_in_response(response):
self.get_csrf_token()
self.get(url, **kwargs)
else:
self.check_for_new_medals(response.text)
self.r = response
return response
def post(self, url: str, data: dict = None, json: dict = None, **kwargs) -> Response:
if (self.now - self._req.last_time).seconds >= 14 * 60:
self.get_csrf_token()
if "_token" in data:
data["_token"] = self.token
if "_token" in json:
json["_token"] = self.token
try:
response = super().post(url, data=data, json=json, **kwargs)
except RequestException as e:
self.write_log("Network error while issuing POST request", e)
self.sleep(60)
return self.post(url, data, json, **kwargs)
# response = super().post(url, data=data, json=json, **kwargs)
try:
resp_data = response.json()
if (resp_data.get("error") or not resp_data.get("status")) and resp_data.get("message", "") == "captcha":
utils.send_email(self.name, [response.text, ], player=self, captcha=True)
except:
pass
if self._errors_in_response(response):
self.get_csrf_token()
if data:
data.update({"_token": self.token})
elif json:
json.update({"_token": self.token})
response = self.post(url, data=data, json=json, **kwargs)
else:
self.check_for_new_medals(response.text)
self.r = response
return response
def check_for_new_medals(self, html: str):
new_medals = re.findall(r'(New Achievement
.*?
Congratulations, you have reached experience level (\d+)
", html) if levelup: level = levelup.group(1) msg = "Level up! Current level {}".format(level) self.write_log(msg) self.reporter.report_action("LEVEL_UP", value=level) def update_all(self, force_update=False): # Do full update max every 5 min if utils.good_timedelta(self.__last_full_update, datetime.timedelta(minutes=5)) > self.now and not force_update: return else: self.__last_full_update = self.now self.update_citizen_info() self.update_war_info() self.update_inventory() self.update_companies() self.update_money() self.update_weekly_challenge() self.send_state_update() def update_citizen_info(self, html: str = None): """ Gets main page and updates most information about player """ if html is None: self.get_main() html = self.r.text ugly_js = re.search(r"promotions: (\[{?.*}?]),\s+", html).group(1) promos = loads(utils.normalize_html_json(ugly_js)) self.promos = {k: v for k, v in self.promos.items() if v > self.now} send_mail = False for promo in promos: promo_name = promo.get("id") expire = utils.localize_timestamp(int(promo.get("expiresAt"))) if promo_name not in self.promos: send_mail = True self.promos.update({promo_name: expire}) if promo_name == "trainingContract": if not self.tg_contract: self.train() if not self.tg_contract["free_train"] and self.tg_contract.get("active", False): if self.details.gold >= 54: self.buy_tg_contract() else: self.write_log("Training ground contract active but don't have enough gold ({}g {}cc)".format( self.details.gold, self.details.cc )) if send_mail: active_promos = ["{} active until {}".format(k, v.strftime("%F %T")) for k, v in self.promos.items()] utils.send_email(self.name, active_promos, player=self, promo=True) new_date = re.search(r"var new_date = '(\d*)';", html) if new_date: self.energy.set_reference_time( utils.good_timedelta(self.now, datetime.timedelta(seconds=int(new_date.group(1)))) ) ugly_js = re.search(r"var erepublik = ({.*}),\s+", html).group(1) citizen_js = loads(ugly_js) citizen = citizen_js.get("citizen", {}) self.eday = citizen_js.get("settings").get("eDay") self.division = int(citizen.get("division", 0)) self.energy.interval = citizen.get("energyPerInterval", 0) self.energy.limit = citizen.get("energyToRecover", 0) self.energy.recovered = citizen.get("energy", 0) self.energy.recoverable = citizen.get("energyFromFoodRemaining", 0) self.details.current_region = citizen.get("regionLocationId", 0) self.details.current_country = citizen.get("countryLocationId", 0) # country where citizen is located self.details.residence_region = citizen.get("residence", {}).get("regionId", 0) self.details.residence_country = citizen.get("residence", {}).get("countryId", 0) self.details.citizen_id = citizen.get("citizenId", 0) self.details.citizenship = int(citizen.get("country", 0)) self.details.xp = citizen.get("currentExperiencePoints", 0) self.details.daily_task_done = citizen.get("dailyTasksDone", False) self.details.daily_task_reward = citizen.get("hasReward", False) # if citizen.get("dailyOrderDone", False) and not citizen.get("hasDailyOrderReward", False): # self.post_military_group_missions(self.token) # self.get_citizen_daily_assistant() self.details.next_pp.sort() for id_, skill in citizen.get("mySkills", {}).items(): self.details.mayhem_skills.update({int(skill["terrain_id"]): int(skill["skill_points"])}) if citizen.get('party', []): party = citizen.get('party') self.politics.is_party_member = True self.politics.party_id = party.get('party_id') self.politics.is_party_president = bool(party.get('is_party_president')) self.politics.party_slug = "{}-{}".format(party.get("stripped_title"), party.get('party_id')) def update_money(self, page: int = 0, currency: int = 62) -> Response: """ Gets monetary market offers to get exact amount of CC and Gold available """ if currency not in [1, 62]: currency = 62 resp = self.post_economy_exchange_retrieve(self.token, False, page, currency) self.details.cc = float(resp.json().get("ecash").get("value")) self.details.gold = float(resp.json().get("gold").get("value")) return resp def update_job_info(self): ot = self.get_job_data().json().get("overTime", {}) if ot: self.my_companies.next_ot_time = utils.localize_timestamp(int(ot.get("nextOverTime", 0))) self.ot_points = ot.get("points", 0) def update_companies(self): html = self.get_economy_my_companies().text page_details = loads(re.search(r"var pageDetails\s+= ({.*});", html).group(1)) self.my_companies.work_units = int(page_details.get("total_works", 0)) have_holdings = re.search(r"var holdingCompanies\s+= ({.*}});", html) have_companies = re.search(r"var companies\s+= ({.*}});", html) if have_holdings and have_companies: self.my_companies.prepare_companies(loads(have_companies.group(1))) self.my_companies.prepare_holdings(loads(have_holdings.group(1))) self.my_companies.update_holding_companies() def update_inventory(self) -> dict: j = self.get_economy_inventory_items().json() self.inventory.update({"used": j.get("inventoryStatus").get("usedStorage"), "total": j.get("inventoryStatus").get("totalStorage")}) final = j.get("inventoryItems").get("finalProducts").get("items") self.food.update({ "q1": final.get("1_1", {"amount": 0}).get("amount", 0), "q2": final.get("1_2", {"amount": 0}).get("amount", 0), "q3": final.get("1_3", {"amount": 0}).get("amount", 0), "q4": final.get("1_4", {"amount": 0}).get("amount", 0), "q5": final.get("1_5", {"amount": 0}).get("amount", 0), "q6": final.get("1_6", {"amount": 0}).get("amount", 0), "q7": final.get("1_7", {"amount": 0}).get("amount", 0), }) self.boosters.update({ "100_damageBoosters_5_7200": final.get("100_damageBoosters_5_7200", {"amount": 0}).get("amount", 0), "100_damageBoosters_5_28800": final.get("100_damageBoosters_5_28800", {"amount": 0}).get("amount", 0), "100_damageBoosters_5_86400": final.get("100_damageBoosters_5_86400", {"amount": 0}).get("amount", 0), "100_speedBoosters_1_180": final.get("100_speedBoosters_1_180", {"amount": 0}).get("amount", 0), "100_speedBoosters_2_600": final.get("100_speedBoosters_2_600", {"amount": 0}).get("amount", 0), "100_catchupBoosters_30_60": final.get("100_catchupBoosters_30_60", {"amount": 0}).get("amount", 0), }) self.candies_normal = final.get("1_10", {"amount": 0}).get("amount", 0) self.candies_double = final.get("1_11", {"amount": 0}).get("amount", 0) self.candies_small = final.get("1_12", {"amount": 0}).get("amount", 0) self.ot_points = final.get("4_100", {"amount": 0}).get("amount", 0) self.tickets = final.get("3_5", {"amount": 0}).get("amount", 0) self.food["total"] = sum([self.food[q] * utils.FOOD_ENERGY[q] for q in utils.FOOD_ENERGY]) return j def update_weekly_challenge(self): data = self.get_weekly_challenge_data().json() self.details.pp = data.get("player", {}).get("prestigePoints", 0) self.details.next_pp = [] for reward in data.get("rewards", {}).get("normal", {}): status = reward.get("status", "") if status == "rewarded": continue elif status == "completed": data = { "_token": self.token, "rewardId": reward.get("id", 0) } self.post_weekly_challenge_reward(self.token, reward.get("id", 0)) elif reward.get("icon", "") == "energy_booster": pps = re.search(r"Reach (\d+) Prestige Points to unlock the following reward: \+1 Energy", reward.get("tooltip", "")) if pps: self.details.next_pp.append(int(pps.group(1))) def update_war_info(self) -> Dict[Any, Any]: if not self.details.current_country: self.update_citizen_info() resp_json = self.get_military_campaigns().json() if resp_json.get("countries"): for c_id, c_data in resp_json.get("countries").items(): if int(c_id) not in self.countries: self.countries.update({ int(c_id): {"name": c_data.get("name"), "allies": c_data.get("allies")} }) else: self.countries[int(c_id)].update(allies=c_data.get("allies")) self.__last_war_update_data = resp_json if resp_json.get("battles"): for battle_id, battle_data in resp_json.get("battles", {}).items(): self.all_battles.update({int(battle_id): classes.Battle(battle_data)}) return self.__last_war_update_data def eat(self): """ Try to eat food """ self.update_citizen_info() self.update_inventory() if self.details.xp_till_level_up > (self.energy.recovered - 50) // 10: if self.food["total"] > self.energy.interval: if self.energy.limit - self.energy.recovered > self.energy.interval or not self.energy.recoverable % 2: self._eat("blue") else: self.write_log("I don't want to eat right now!") else: self.write_log("I'm out of food! But I'll try to buy some!\n{}".format(self.food)) self.buy_food() self.update_inventory() if self.food["total"] > self.energy.interval: self.eat() else: self.write_log("I failed to buy food") else: self.write_log("I'm not allowed to eat because I have levelup coming up!") self.write_log(self.health_info) def eat_ebs(self): self.write_log("Eating energy bar") self.update_citizen_info() if self.energy.recoverable: self._eat("blue") self._eat("orange") self.write_log(self.health_info) def _eat(self, colour: str = "blue") -> Response: response = self.post_eat(self.token, colour) r_json = response.json() next_recovery = r_json.get("food_remaining_reset").split(":") self.energy.set_reference_time( utils.good_timedelta(self.now, datetime.timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2]))) ) self.energy.recovered = r_json.get("health") self.energy.recoverable = r_json.get("food_remaining") for q, amount in r_json.get("units_consumed").items(): if "q{}".format(q) in self.food: self.food["q{}".format(q)] -= amount elif q == "10": self.candies_normal -= amount elif q == "11": self.candies_double -= amount elif q == "12": self.candies_small -= amount return response @property def health_info(self): self.update_citizen_info() ret = "{}/{} + {}, {}hp/6m. {}xp until level up".format( self.energy.recovered, self.energy.limit, self.energy.recoverable, self.energy.interval, self.details.xp_till_level_up ) return ret @property def now(self) -> datetime.datetime: """ Returns aware datetime object localized to US/Pacific (eRepublik time) :return: datetime.datetime """ return utils.now() def check_epic_battles(self): active_fs = False for battle_id in self.sorted_battles(self.config.sort_battles_time): battle = self.all_battles.get(battle_id) if not battle.is_air: my_div: classes.BattleDivision = battle.div.get(self.division) if my_div.epic and my_div.end > self.now: if self.energy.food_fights > 50: inv_allies = battle.invader.deployed + [battle.invader.id] def_allies = battle.defender.deployed + [battle.defender.id] all_allies = inv_allies + def_allies if self.details.current_country not in all_allies: self._travel(battle.defender.id, self.get_country_travel_region(battle.defender.id)) side = battle.defender.id else: if self.details.current_country in inv_allies: side = battle.invader.id elif self.details.current_country in def_allies: side = battle.defender.id else: self.write_log( "Country {} not in all allies list ({}) and also not in inv allies ({}) nor def " "allies ({})".format(self.details.current_country, all_allies, inv_allies, def_allies)) break error_count = 0 while self.energy.food_fights > 5 and error_count < 20: errors = self.fight(battle_id, side_id=side, is_air=False, count=self.energy.food_fights - 5) if errors: error_count += errors if self.config.epic_hunt_ebs: self.eat_ebs() self.travel_to_residence() break elif bool(my_div.epic): active_fs = True self.active_fs = active_fs def sorted_battles(self, sort_by_time: bool = False) -> List[int]: r = self.update_war_info() cs_battles_air: List[int] = [] cs_battles_ground: List[int] = [] deployed_battles_air: List[int] = [] deployed_battles_ground: List[int] = [] ally_battles_air: List[int] = [] ally_battles_ground: List[int] = [] other_battles_air: List[int] = [] other_battles_ground: List[int] = [] for bid, battle in sorted(self.all_battles.items(), key=lambda b: b[1].start if sort_by_time else b[0], reverse=sort_by_time): battle_sides = [battle.invader.id, battle.defender.id] # CS Battles if self.details.citizenship in battle_sides: if battle.is_air: cs_battles_ground.append(battle.id) else: cs_battles_air.append(battle.id) # Current location battles: elif self.details.current_country in battle_sides: if battle.is_air: deployed_battles_ground.append(battle.id) else: deployed_battles_air.append(battle.id) # Deployed battles and allied battles: elif self.details.current_country in battle.invader.allies + battle.defender.allies + battle_sides: if self.details.current_country in battle.invader.deployed + battle.defender.deployed: if battle.is_air: deployed_battles_ground.append(battle.id) else: deployed_battles_air.append(battle.id) # Allied battles: else: if battle.is_air: ally_battles_ground.append(battle.id) else: ally_battles_air.append(battle.id) else: if battle.is_air: other_battles_ground.append(battle.id) else: other_battles_air.append(battle.id) ret_battles = [] if r.get("citizen_contribution"): battle_id = r.get("citizen_contribution")[0].get("battle_id", 0) ret_battles.append(battle_id) ret_battles += (cs_battles_air + cs_battles_ground + deployed_battles_air + deployed_battles_ground + ally_battles_air + ally_battles_ground + other_battles_air + other_battles_ground) return ret_battles @property def has_battle_contribution(self): return bool(self.update_war_info().get("citizen_contribution", [])) def find_battle_and_fight(self): if self.should_fight(False): self.write_log("Checking for battles to fight in...") for battle_id in self.sorted_battles(self.config.sort_battles_time): battle = self.all_battles.get(battle_id) div = 11 if battle.is_air else self.division allies = battle.invader.deployed + battle.defender.deployed + [battle.invader.id, battle.defender.id] travel_needed = self.details.current_country not in allies if battle.is_rw: side_id = battle.defender.id if self.config.rw_def_side else battle.invader.id else: side_id = battle.defender.id if (self.details.current_country in battle.defender.allies + [battle.defender.id, ]) else battle.invader.id try: def_points = battle.div.get(div).dom_pts.get('def') inv_points = battle.div.get(div).dom_pts.get('inv') except KeyError: self.report_error(f"Division {div} not available for battle {battle.id}!") def_points = inv_points = 3600 kwargs = { "bid": battle.id, "air": "air" if battle.is_air else "ground", "rw": "True" if battle.is_rw else "False", "def": def_points, "inv": inv_points, "travel": "(TRAVEL)" if travel_needed else "", } self.write_log("Battle {bid}, type: {air:6}, rw: {rw:5}, " "points: {def:4}:{inv:<4} {travel}".format(**kwargs)) points = def_points <= 1700 and inv_points <= 1700 b_type = battle.is_air and self.config.air or not battle.is_air and self.config.ground travel = (self.config.travel_to_fight and self.should_travel_to_fight() or self.config.force_travel) \ if travel_needed else True if not (points and b_type and travel): continue if battle.start > self.now: self.sleep(utils.get_sleep_seconds(battle.start)) if travel_needed: if battle.is_rw: self._travel(battle.defender.id, self.get_country_travel_region(battle.defender.id)) elif self.details.current_country not in battle.invader.allies: self.travel(battle_id=battle.id) side_id = battle.invader.id else: self._travel(battle.defender.id, self.get_country_travel_region(battle.defender.id)) side_id = battle.defender.id self.fight(battle_id, side_id, battle.is_air) self.travel_to_residence() self.collect_weekly_reward() break def fight(self, battle_id: int, side_id: int, is_air: bool = False, count: int = None): data = dict(sideId=side_id, battleId=battle_id, _token=self.token) error_count = 0 ok_to_fight = True if count is None: count = self.should_fight(silent=False) total_damage = 0 total_hits = 0 while ok_to_fight and error_count < 10: while all((count > 0, error_count < 10, self.energy.recovered >= 50)): hits, error, damage = self._shoot(is_air, data) count -= hits total_hits += hits total_damage += damage error_count += error else: self.eat() if self.energy.recovered < 50 or error_count >= 10 or count <= 0: self.write_log("Hits: {:>4} | Damage: {}".format(total_hits, total_damage)) ok_to_fight = False if total_damage: self.reporter.report_action(json_val=dict(battle=battle_id, side=side_id, dmg=total_damage, air=is_air, hits=total_hits), action="FIGHT") if error_count: return error_count def _shoot(self, air: bool, data: dict): if air: response = self.post_military_fight_air(self.token, data['battleId'], data['sideId']) else: response = self.post_military_fight_ground(self.token, data['battleId'], data['sideId']) if "Zone is not meant for " in response.text: self.sleep(5) return 0, 1, 0 try: j_resp = response.json() except: return 0, 10, 0 hits = 0 damage = 0 err = False if j_resp.get("error"): if j_resp.get("message") == "SHOOT_LOCKOUT": pass else: if j_resp.get("message") == "UNKNOWN_SIDE": self._rw_choose_side(data["battleId"], data["sideId"]) err = True elif j_resp.get("message") == "ENEMY_KILLED": hits = (self.energy.recovered - j_resp["details"]["wellness"]) // 10 self.energy.recovered = j_resp["details"]["wellness"] damage = j_resp["user"]["givenDamage"] * (1.1 if j_resp["oldEnemy"]["isNatural"] else 1) else: err = True return hits, err, damage def work_ot(self): # I"m not checking for 1h cooldown. Beware of nightshift work, if calling more than once every 60min self.update_job_info() if self.ot_points >= 24 and self.energy.food_fights > 1: r = self.post_economy_work_overtime(self.token) if not r.json().get("status") and r.json().get("message") == "money": self.resign() self.find_new_job() else: self.reporter.report_action("WORK_OT", r.json()) elif self.energy.food_fights < 1 and self.ot_points >= 24: self._eat("blue") if self.energy.food_fights < 1: large = max(self.energy.reference_time, self.now) small = min(self.energy.reference_time, self.now) self.write_log("I don't have energy to work OT. Will sleep for {}s".format((large - small).seconds)) self.sleep(int((large - small).total_seconds())) self._eat("blue") self.work_ot() def work(self): if self.energy.food_fights >= 1: response = self.post_economy_work(self.token, "work") js = response.json() good_msg = ["already_worked", "captcha"] if not js.get("status") and not js.get("message") in good_msg: self.update_citizen_info() self.work() else: self.reporter.report_action("WORK", json_val=js) else: self._eat("blue") if self.energy.food_fights < 1: large = max(self.energy.reference_time, self.now) small = min(self.energy.reference_time, self.now) self.write_log("I don't have energy to work. Will sleep for {}s".format((large - small).seconds)) self.sleep(int((large - small).total_seconds())) self._eat("blue") self.work() def train(self): r = self.get_training_grounds_json() tg_json = r.json() self.details.gold = tg_json["page_details"]["gold"] self.tg_contract.update({"free_train": tg_json["hasFreeTrain"]}) if tg_json["contracts"]: self.tg_contract.update(**tg_json["contracts"][0]) tgs = [] for data in sorted(tg_json["grounds"], key=lambda k: k["cost"]): if data["default"] and not data["trained"]: tgs.append(data["id"]) if tgs: if self.energy.food_fights >= len(tgs): response = self.post_economy_train(self.token, tgs) if not response.json().get("status"): self.update_citizen_info() self.train() else: self.reporter.report_action("TRAIN", response.json()) else: self._eat("blue") if self.energy.food_fights < len(tgs): large = max(self.energy.reference_time, self.now) small = min(self.energy.reference_time, self.now) self.write_log("I don't have energy to train. Will sleep for {} seconds".format( (large - small).seconds)) self.sleep(int((large - small).total_seconds())) self._eat("blue") self.train() def work_employees(self) -> bool: self.update_companies() ret = True work_units_needed = 0 employee_companies = self.my_companies.get_employable_factories() for c_id, preset_count in employee_companies.items(): work_units_needed += preset_count if work_units_needed: if work_units_needed <= self.my_companies.work_units: self._do_wam_and_employee_work(employee_companies=employee_companies) self.update_companies() if self.my_companies.get_employable_factories(): ret = False else: ret = True return ret def work_wam(self) -> bool: self.update_citizen_info() self.update_companies() # Prevent messing up levelup with wam if not (self.is_levelup_close and self.config.fight) or self.config.force_wam: # Check for current region regions = {} for holding_id, holding in self.my_companies.holdings.items(): if self.my_companies.get_holding_wam_companies(holding_id): regions.update({holding["region_id"]: holding_id}) if self.details.current_region in regions: self._do_wam_and_employee_work(regions.pop(self.details.current_region, None)) for holding_id in regions.values(): self._do_wam_and_employee_work(holding_id) self.travel_to_residence() else: self.write_log("Did not wam because I would mess up levelup!") self.update_companies() return not self.my_companies.get_total_wam_count() def _do_wam_and_employee_work(self, wam_holding_id: int = 0, employee_companies: dict = None) -> bool: self.update_citizen_info() if employee_companies is None: employee_companies = {} data = { "action_type": "production", "_token": self.token, } extra = {} wam_list = [] if wam_holding_id: raw_count = self.my_companies.get_holding_wam_count(wam_holding_id, raw_factory=True) fab_count = self.my_companies.get_holding_wam_count(wam_holding_id, raw_factory=False) if raw_count + fab_count <= self.energy.food_fights: raw_factories = None elif not raw_count and fab_count <= self.energy.food_fights: raw_factories = False else: raw_factories = True free_inventory = self.inventory["total"] - self.inventory["used"] wam_list = self.my_companies.get_holding_wam_companies(wam_holding_id, raw_factory=raw_factories)[:self.energy.food_fights] has_space = False while not has_space and wam_list: extra_needed = self.my_companies.get_needed_inventory_usage(companies=wam_list) has_space = extra_needed < free_inventory if not has_space: inv_w = len(str(self.inventory["total"])) self.write_log( "Inv: {:{inv_w}}/{:{inv_w}} ({:4.2f}), Energy: {}/{} + {} (+{}hp/6min) WAM count {:3}".format( self.inventory["used"], self.inventory["total"], extra_needed, self.energy.recovered, self.energy.limit, self.energy.recoverable, self.energy.interval, len(wam_list), inv_w=inv_w )) wam_list.pop(-1) if wam_list or employee_companies: data.update(extra) if wam_list: wam_holding = self.my_companies.holdings.get(wam_holding_id) if not self.details.current_region == wam_holding['region_id']: self.travel(holding_id=wam_holding_id, region_id=wam_holding['region_id']) response = self.post_economy_work(self.token, "production", wam=wam_list, employ=employee_companies).json() self.reporter.report_action("WORK_WAM_EMPLOYEES", response) if response.get("status"): if self.config.auto_sell: for kind, data in response.get("result", {}).get("production", {}).items(): if kind in self.config.auto_sell and data: if kind in ["food", "weapon", "house", "airplane"]: for quality, amount in data.items(): self.sell_produced_product(kind, quality) elif kind.endswith("Raw"): self.sell_produced_product(kind, 1) else: raise classes.ErepublikException("Unknown kind produced '{kind}'".format(kind=kind)) elif self.config.auto_buy_raw and re.search(r"not_enough_[^_]*_raw", response.get("message")): raw_kind = re.search(r"not_enough_(\w+)_raw", response.get("message")) if raw_kind: raw_kind = raw_kind.group(1) result = response.get("result", {}) amount_remaining = round(result.get("consume") + 0.49) - round(result.get("stock") - 0.49) industry = "{}Raw".format(raw_kind) while amount_remaining > 0: amount = amount_remaining best_offer = self.get_market_offers(self.details.citizenship, industry, 1) amount = best_offer['amount'] if amount >= best_offer['amount'] else amount rj = self.buy_from_market(amount=best_offer['amount'], offer=best_offer['offer_id']).json() if not rj.get('error'): amount_remaining -= amount else: self.write_log(rj.get('message', "")) break else: return self._do_wam_and_employee_work(wam_holding_id, employee_companies) else: self.write_log("I was not able to wam and or employ because:\n{}".format(response)) wam_count = self.my_companies.get_total_wam_count() if wam_count: self.write_log("Wam ff lockdown is now {}, was {}".format(wam_count, self.my_companies.ff_lockdown)) self.my_companies.ff_lockdown = wam_count return bool(wam_count) def sell_produced_product(self, kind: str, quality: int = 1, amount: int = 0): if not amount: inv_resp = self.update_inventory() category = "rawMaterials" if kind.endswith("Raw") else "finalProducts" item = "{}_{}".format(self.available_industries[kind], quality) amount = inv_resp.get("inventoryItems").get(category).get("items").get(item).get("amount", 0) if amount >= 1: lowest_price = self.get_market_offers(country_id=self.details.citizenship, product=kind, quality=int(quality)) if lowest_price["citizen_id"] == self.details.citizen_id: price = lowest_price["price"] else: price = lowest_price["price"] - 0.01 self.post_market_offer(industry=self.available_industries[kind], amount=int(amount), quality=int(quality), price=price) def travel_to_residence(self): self.update_citizen_info() res_r = self.details.residence_region if self.details.residence_country and res_r and not res_r == self.details.current_region: self._travel(self.details.residence_country, self.details.residence_region) def get_country_travel_region(self, country_id: int) -> int or None: r = self.get_travel_regions(country_id=country_id).json() regions = r.get("regions") regs = [] if regions: for region in regions.values(): if region['countryId'] == country_id: # Is not occupied by other country regs.append((region['id'], region['distanceInKm'])) if regs: return min(regs, key=lambda _: int(_[1]))[0] else: return None def travel(self, holding_id=0, battle_id=0, region_id=0): r = self.get_travel_regions(holding_id, battle_id, region_id) regions = r.json()["regions"] closest_region = 99999 country_id = int(r.json()["preselectCountryId"]) region_id = int(r.json()["preselectRegionId"]) if not any((region_id, country_id)): for rid, info in regions.items(): if info.get("distanceInKm", 99999) < closest_region: closest_region = info.get("distanceInKm") country_id = info.get("countryId") region_id = rid self._travel(country_id, region_id) def _travel(self, country_id: int, region_id: int = 0) -> Response: data = { "toCountryId": country_id, "inRegionId": region_id, "battleId": 0, } return self.post_travel(self.token, "moveAction", **data) def get_travel_regions(self, holding_id: int = 0, battle_id: int = 0, region_id: int = 0, country_id: int = 0) -> Response: data = { "holdingId": holding_id, "battleId": battle_id, "regionId": region_id, } data.update(countryId=country_id) return self.post_travel_data(self.token, **data) def parse_notifications(self, page: int = 1) -> list: response = self.get_message_alerts(page) notifications = re.findall(r"(.*?)
", response.text, re.M | re.I | re.S) return notifications def delete_notifications(self): def notification_ids(html): return re.findall(r"id=\"delete_alert_(\d+)\"", html) response = self.get_message_alerts() while notification_ids(response.text): response = self.post_messages_alert(self.token, notification_ids(response.text)) def collect_weekly_reward(self): self.update_weekly_challenge() def collect_daily_task(self) -> Response or None: self.update_citizen_info() if self.details.daily_task_done and not self.details.daily_task_reward: return self.post_daily_task_reward(self.token) def send_mail_to_owner(self) -> Response or None: if not self.details.citizen_id == 1620414: self.send_mail("Started", "time {}".format(self.now.strftime("%Y-%m-%d %H-%M-%S")), [1620414, ]) self.sleep(1) msg_id = re.search(r"", self.r.text).group(1) return self.post_delete_message(self.token, [msg_id]) def get_market_offers(self, country_id: int = None, product: str = None, quality: int = None) -> dict: ret = dict() raw_short_names = dict(frm="foodRaw", wrm="weaponRaw", hrm="houseRaw", arm="airplaneRaw") q1_industries = ["aircraft"] + list(raw_short_names.values()) if product in raw_short_names: quality = 1 product = raw_short_names.get(product) item_data = dict(price=999999., country=0, amount=0, offer_id=0, citizen_id=0) items = {"food": dict(q1=item_data.copy(), q2=item_data.copy(), q3=item_data.copy(), q4=item_data.copy(), q5=item_data.copy(), q6=item_data.copy(), q7=item_data.copy()), "weapon": dict(q1=item_data.copy(), q2=item_data.copy(), q3=item_data.copy(), q4=item_data.copy(), q5=item_data.copy(), q6=item_data.copy(), q7=item_data.copy()), "house": dict(q1=item_data.copy(), q2=item_data.copy(), q3=item_data.copy(), q4=item_data.copy(), q5=item_data.copy()), "aircraft": dict(q1=item_data.copy()), "foodRaw": dict(q1=item_data.copy()), "weaponRaw": dict(q1=item_data.copy()), "houseRaw": dict(q1=item_data.copy()), "airplaneRaw": dict(q1=item_data.copy())} countries = [country_id] if country_id else self.countries if product not in self.available_industries: self.write_log("Industry '{}' not implemented".format(product)) return ret start_dt = self.now for country in countries: if not country_id and not self.get_country_travel_region(country): continue for industry in [product] or items: for q in [quality] if quality else range(1, 8): if (q > 1 and industry in q1_industries) or (q > 5 and industry == "house"): break str_q = "q%i" % q data = {'country': country, 'industry': self.available_industries[industry], 'quality': q} r = self.post_economy_marketplace(self.token, **data) rjson = r.json() obj = items[industry][str_q] if not rjson.get("error", False): for offer in rjson["offers"]: if obj["price"] > float(offer["priceWithTaxes"]): obj["price"] = float(offer["priceWithTaxes"]) obj["country"] = int(offer["country_id"]) obj["amount"] = int(offer["amount"]) obj["offer_id"] = int(offer["id"]) obj["citizen_id"] = int(offer["citizen_id"]) elif obj["price"] == float(offer["priceWithTaxes"]) \ and obj["amount"] < int(offer["amount"]): obj["country"] = int(offer["country_id"]) obj["amount"] = int(offer["amount"]) obj["offer_id"] = int(offer["id"]) self.write_log("Scraped market in {}!".format(self.now - start_dt)) if quality: ret = items[product]["q%i" % quality] elif product: if product in raw_short_names.values(): ret = items[product]["q1"] else: ret = items[product] else: ret = items return ret def buy_food(self) -> Response or None: self.update_money() hp_per_quality = {"q1": 2, "q2": 4, "q3": 6, "q4": 8, "q5": 10, "q6": 12, "q7": 20} hp_needed = 24 * self.energy.interval * 10 - self.food["total"] local_offers = self.get_market_offers(country_id=self.details.current_country, product="food") cheapest_q, cheapest = sorted(local_offers.items(), key=lambda v: v[1]["price"] / hp_per_quality[v[0]])[0] if cheapest["amount"] * hp_per_quality[cheapest_q] < hp_needed: amount = cheapest["amount"] else: amount = hp_needed // hp_per_quality[cheapest_q] if amount * cheapest["price"] < self.details.cc: data = dict(offer=cheapest["offer_id"], amount=amount, price=cheapest["price"], cost=amount * cheapest["price"], quality=cheapest_q, energy=amount * hp_per_quality[cheapest_q]) self.reporter.report_action("BUY_FOOD", json_val=data) return self.buy_from_market(cheapest["offer_id"], amount) else: s = "Don't have enough money! Needed: {}cc, Have: {}cc".format(amount * cheapest["price"], self.details.cc) self.write_log(s) self.reporter.report_action("BUY_FOOD", value=s) return None def get_monetary_offers(self, currency: int = 62) -> list: if currency not in [1, 62]: currency = 62 resp = self.update_money(0, currency).json() ret = [] offers = re.findall(r"id='purchase_(\d+)' data-i18n='Buy for' data-currency='GOLD' " r"data-price='(\d+\.\d+)' data-max='(\d+\.\d+)' trigger='purchase'", resp["buy_mode"], re.M | re.I | re.S) for offer_id, price, amount in offers: ret.append({"offer_id": int(offer_id), "price": float(price), "amount": float(amount)}) return sorted(ret, key=lambda o: (o["price"], -o["amount"])) def buy_monetary_market_offer(self, offer: int, amount: float, currency: int) -> bool: response = self.post_economy_exchange_purchase(self.token, amount, currency, offer) self.details.cc = float(response.json().get("ecash").get("value")) self.details.gold = float(response.json().get("gold").get("value")) self.reporter.report_action("BUY_GOLD", json_val=response.json(), value="New amount {o.cc}cc, {o.gold}g".format(o=self.details)) return not response.json().get("error", False) def activate_dmg_booster(self, battle_id: int) -> Response or None: if self.config.boosters: duration = 0 if self.boosters.get("100_damageBoosters_5_600", 0) > 0: duration = 600 elif self.boosters.get("100_damageBoosters_5_7200", 0) > 1: duration = 7200 elif self.boosters.get("100_damageBoosters_5_28800", 0) > 2: duration = 28800 elif self.boosters.get("100_damageBoosters_5_86400", 0) > 2: duration = 86400 return self.post_fight_activate_booster(self.token, battle_id, 5, duration, "damage") def activate_battle_effect(self, battle_id: int, kind: str): return self.post_activate_battle_effect(self.token, battle_id, kind, self.details.citizen_id) def activate_pp_booster(self, battle_id: int): return self.post_fight_activate_booster(self.token, battle_id, 1, 180, "prestige_points") def donate_money(self, citizen_id: int = 1620414, amount: float = 0.0, currency: int = 62): """ currency: gold = 62, cc = 1 """ return self.post_economy_donate_money_action(self.token, citizen_id, amount, currency) def donate_items(self, citizen_id: int = 1620414, amount: int = 0, industry_id: int = 1, quality: int = 1) -> Response: ind = {v: k for k, v in self.available_industries.items()} self.write_log("D,{},q{},{},{}".format(amount, quality, ind[industry_id], citizen_id)) return self.post_economy_donate_items_action(self.token, citizen_id, amount, industry_id, quality) def candidate_for_congress(self, presentation: str = "") -> Response: return self.post_candidate_for_congress(self.token, presentation) def candidate_for_party_presidency(self) -> Response: return self.get_candidate_party(self.politics.party_slug) def accept_money_donations(self): for notification in self.parse_notifications(): don_id = re.search(r"erepublik.functions.acceptRejectDonation\(\"accept\", (\d+)\)", notification) if don_id: self.get_money_donation_accept(self.token, int(don_id.group(1))) self.sleep(5) def reject_money_donations(self) -> int: r = self.get_message_alerts() count = 0 donation_ids = re.findall(r"erepublik.functions.acceptRejectDonation\(\"reject\", (\d+)\)", r.text) while donation_ids: for don_id in donation_ids: self.get_money_donation_reject(self.token, int(don_id)) count += 1 self.sleep(5) r = self.get_message_alerts() donation_ids = re.findall(r"erepublik.functions.acceptRejectDonation\(\"reject\", (\d+)\)", r.text) return count def _rw_choose_side(self, battle_id: int, side_id: int) -> Response: return self.get_battlefield_choose_side(battle_id, side_id) def should_travel_to_fight(self) -> bool: ret = False if self.config.always_travel: return True if self.should_do_levelup: # Do levelup ret = True elif self.config.all_in and self.energy.available > self.energy.limit * 2 - self.energy.interval * 3: ret = True # Get to next Energy +1 elif self.next_reachable_energy and self.config.next_energy: ret = True # 1h worth of energy elif self.energy.available + self.energy.interval * 3 >= self.energy.limit * 2: ret = True return ret def should_fight(self, silent: bool = True) -> int: if not self.config.fight: return 0 count = 0 log_msg = "" force_fight = False # Do levelup if self.is_levelup_reachable: log_msg = "Level up" if self.should_do_levelup: count = (self.energy.limit * 3) // 10 force_fight = True else: self.write_log("Waiting for fully recovered energy before leveling up.", False) # Levelup reachable elif self.is_levelup_close: count = self.details.xp_till_level_up - (self.energy.limit // 10) + 5 log_msg = "Fighting for close Levelup. Doing %i hits" % count force_fight = True elif self.details.pp < 75: count = 75 - self.details.pp log_msg = "Obligatory fighting for at least 75pp" force_fight = True elif self.config.continuous_fighting and self.has_battle_contribution: count = self.energy.food_fights log_msg = "Continuing to fight in previous battle" # All-in (type = all-in and full ff) elif self.config.all_in and self.energy.available + self.energy.interval * 3 >= self.energy.limit * 2: count = self.energy.food_fights log_msg = "Fighting all-in. Doing %i hits" % count # All-in for AIR battles elif all([self.config.air, self.config.all_in, self.energy.available >= self.energy.limit]): count = self.energy.food_fights log_msg = "Fighting all-in in AIR. Doing %i hits" % count # Get to next Energy +1 elif self.next_reachable_energy and self.config.next_energy: count = self.next_reachable_energy log_msg = "Fighting for +1 energy. Doing %i hits" % count # 1h worth of energy elif self.energy.available + self.energy.interval * 3 >= self.energy.limit * 2: count = self.energy.interval log_msg = "Fighting for 1h energy. Doing %i hits" % count if count > 0 and not force_fight: if self.my_companies.ff_lockdown and self.details.pp > 75: if self.energy.food_fights - self.my_companies.ff_lockdown > 0: log_msg = ("Fight count modified (old count: {} | FF: {} | " "WAM ff_lockdown: {} | New count: {})").format( count, self.energy.food_fights, self.my_companies.ff_lockdown, self.energy.food_fights - self.my_companies.ff_lockdown) count = self.energy.food_fights - self.my_companies.ff_lockdown else: count = 0 if count <= 0: log_msg = "Not fighting because WAM needs {} food fights".format(self.my_companies.ff_lockdown) if self.max_time_till_full_ff > self.time_till_week_change: max_count = int((self.time_till_week_change - self.time_till_full_ff).total_seconds()) // 60 log_msg = "End for Weekly challenge is near ({} | {})".format(max_count, count) count = count if max_count > count else max_count if not silent: self.write_log(log_msg, False) return count if count > 0 else 0 @property def next_reachable_energy(self) -> int: # Return pps for furthest __reachable__ +1 energy else 0 max_pp = 0 for pp_milestone in self.details.next_pp: pp_milestone = int(pp_milestone) if self.details.pp + self.energy.food_fights > pp_milestone: # if reachable set max pp max_pp = pp_milestone else: # rest are only bigger no need break return max_pp - self.details.pp if max_pp else 0 @property def next_wc_start(self) -> datetime.datetime: days = 1 - self.now.weekday() if 1 - self.now.weekday() > 0 else 1 - self.now.weekday() + 7 return utils.good_timedelta(self.now.replace(hour=0, minute=0, second=0, microsecond=0), datetime.timedelta(days=days)) @property def time_till_week_change(self) -> datetime.timedelta: return self.next_wc_start - self.now @property def time_till_full_ff(self) -> datetime.timedelta: energy = self.energy.recoverable + self.energy.recovered if energy >= self.energy.limit * 2: return datetime.timedelta(0) minutes_needed = round((self.energy.limit * 2 - energy) / self.energy.interval) * 6 return (self.energy.reference_time - self.now) + datetime.timedelta(minutes=minutes_needed) @property def max_time_till_full_ff(self) -> datetime.timedelta: """ Max required time for 0 to full energy (0/0 -> limit/limit) (last interval rounded up) :return: """ return datetime.timedelta(minutes=round((self.energy.limit * 2 / self.energy.interval) + 0.49) * 6) @property def is_levelup_close(self) -> bool: """ If Energy limit * 2 >= xp till levelup * 10 :return: bool """ return self.energy.limit * 2 >= self.details.xp_till_level_up * 10 @property def is_levelup_reachable(self) -> bool: """ If Energy limit >= xp till levelup * 10 :return: bool """ return self.energy.limit >= self.details.xp_till_level_up * 10 @property def should_do_levelup(self) -> bool: """ If Energy limit >= xp till levelup * 10 :return: bool """ if self.energy.limit - self.energy.interval <= self.energy.recoverable: if self.is_levelup_reachable: # 45xp till levelup, 50hp/6min, 500+500/500 energy = 500+450 >= 1000 # 40xp till levelup, 50hp/6min, 450+500/500 energy = 500+400 >= 950 # 25xp till levelup, 50hp/6min, 300+500/500 energy = 500+250 >= 800 # 25xp till levelup, 50hp/6min, 300+200/500 energy = 500+250 >= 500 return self.energy.limit + self.details.xp_till_level_up * 10 <= self.energy.available return False def get_article_comments(self, article_id: int = 2645676, page_id: int = 0): return self.post_article_comments(self.token, article_id, page_id) def comment_article(self, article_id: int = 2645676, msg: str = None) -> Response: if msg is None: msg = self.eday r = self.get_article_comments(article_id, 2) r = self.get_article_comments(article_id, r.json()["pages"]) comments = r.json()["comments"] if not comments[max(comments.keys())]["isMyComment"]: r = self.write_article_comment(msg, article_id) return r def write_article_comment(self, message: str, article_id: int, parent_id: int = None): return self.post_article_comments_create(self.token, message, article_id, parent_id) def publish_article(self, title: str, content: str, kind: int): kinds = {1: "First steps in eRepublik", 2: "Battle orders", 3: "Warfare analysis", 4: "Political debates and analysis", 5: "Financial business", 6: "Social interactions and entertainment"} if kind in kinds: return self.post_write_article(self.token, title, content, self.details.citizenship, kind) else: raise classes.ErepublikException( "Article kind must be one of:\n{}\n'{}' is not supported".format( "\n".join(["{}: {}".format(k, v) for k, v in kinds.items()]), kind ) ) def post_market_offer(self, industry: int, quality: int, amount: int, price: float) -> Response: if industry not in self.available_industries.values(): self.write_log("Trying to sell unsupported industry {}".format(industry)) data = { "token": self.token, "country": self.details.citizenship, "industry": industry, "quality": quality, "amount": amount, "price": price, "buy": False, } ret = self.post_economy_marketplace_actions(**data) self.reporter.report_action("SELL_PRODUCT", ret.json()) return ret def buy_from_market(self, offer: int, amount: int) -> Response: ret = self.post_economy_marketplace_actions(self.token, amount, True, offer=offer) json_ret = ret.json() if json_ret.get('error'): return ret else: self.details.cc = ret.json()['currency'] self.details.gold = ret.json()['gold'] r_json = ret.json() r_json.pop("offerUpdate", None) self.reporter.report_action("BUY_PRODUCT", ret.json()) return ret def get_raw_surplus(self) -> (float, float): frm = 0.00 wrm = 0.00 self.update_companies() for cdata in sorted(self.my_companies.companies.values()): if cdata["industry_token"] == "FOOD": raw = frm elif cdata["industry_token"] == "WEAPON": raw = wrm else: continue if cdata["is_raw"]: raw += float(cdata["base_production"]) * cdata["effective_bonus"] / 100 else: raw -= cdata["effective_bonus"] / 100 * cdata["base_production"] * \ cdata["upgrades"][str(cdata["quality"])]["raw_usage"] if cdata["industry_token"] == "FOOD": frm = raw elif cdata["industry_token"] == "WEAPON": wrm = raw return frm, wrm def assign_factory_to_holding(self, factory_id: int, holding_id: int) -> Response: """ Assigns factory to new holding :type factory_id: int :type holding_id: int :return: Response object """ return self.post_economy_assign_to_holding(self.token, factory_id, holding_id) def upgrade_factory(self, factory_id: int, level: int) -> Response: return self.post_economy_upgrade_company(self.token, factory_id, level, self.details.pin) def create_factory(self, industry_id: int, building_type: int = 1) -> Response: """ param industry_ids: FRM={q1:7, q2:8, q3:9, q4:10, q5:11} WRM={q1:12, q2:13, q3:14, q4:15, q5:16} HRM={q1:18, q2:19, q3:20, q4:21, q5:22} ARM={q1:24, q2:25, q3:26, q4:27, q5:28} Factories={Food:1, Weapons:2, House:4, Aircraft:23} <- Building_type 1 Storage={1000: 1, 2000: 2} <- Building_type 2 """ return self.post_economy_create_company(self.token, industry_id, building_type) def dissolve_factory(self, factory_id: int) -> Response: return self.post_economy_sell_company(self.token, factory_id, self.details.pin, sell=False) @property def available_industries(self) -> Dict[str, int]: """ Returns currently available industries as dict(name: id) :return: dict """ return {"food": 1, "weapon": 2, "house": 4, "aircraft": 23, "foodRaw": 7, "weaponRaw": 12, "houseRaw": 17, "airplaneRaw": 24} def get_industry_id(self, industry_name: str) -> int: """ Returns industry id :type industry_name: str :return: int """ return self.available_industries.get(industry_name, 0) def buy_tg_contract(self) -> Response: ret = self.post_buy_gold_items(self.token, 'gold', "TrainingContract2", 1) self.reporter.report_action("BUY_TG_CONTRACT", ret.json()) return ret def resign(self) -> Response or None: self.update_job_info() if self.r.json().get("isEmployee"): self.reporter.report_action("RESIGN", self.r.json()) return self.post_economy_resign(self.token) return None def find_new_job(self) -> Response: r = self.get_economy_job_market_json(self.details.current_country) jobs = r.json().get("jobs") data = dict(token=self.token, citizen=0, salary=10) for posting in jobs: salary = posting.get("netSalary") limit = posting.get("salaryLimit", 0) userid = posting.get("citizen").get("id") if (not limit or salary * 3 < limit) and salary > data["salary"]: data.update({"citizen": userid, "salary": salary}) self.reporter.report_action("APPLYING_FOR_JOB", jobs, str(data['citizen'])) return self.post_economy_job_market_apply(**data) def add_friend(self, player_id: int) -> Response: resp = self.get_citizen_hovercard(player_id) rjson = resp.json() if not any([rjson["isBanned"], rjson["isDead"], rjson["isFriend"], rjson["isOrg"], rjson["isSelf"]]): r = self.post_citizen_add_remove_friend(self.token, int(player_id), True) self.write_log("{:<64} (id:{:>11}) added as friend".format(rjson["name"], player_id)) return r return resp def get_country_parties(self, country_id: int = None) -> dict: if country_id is None: country_id = self.details.citizenship r = self.get_rankings_parties(country_id) ret = {} for name, id_ in re.findall(r'', r.text): ret.update({int(id_): name}) return ret def get_party_members(self, party_id: int) -> Dict[int, str]: ret = {} r = super().get_party_members(party_id) for id_, name in re.findall(r'', r.text): ret.update({id_: name}) return ret def get_country_mus(self, country_id: int) -> Dict[int, str]: ret = {} r = self.get_leaderboards_damage_rankings(country_id) for data in r.json()["mu_filter"]: if data["id"]: ret.update({data["id"]: data["name"]}) r = self.get_leaderboards_damage_aircraft_rankings(country_id) for data in r.json()["mu_filter"]: if data["id"]: ret.update({data["id"]: data["name"]}) return ret def get_mu_members(self, mu_id: int) -> Dict[int, str]: ret = {} r = self.get_military_unit_data(mu_id) for page in range(1, int(r.json()["panelContents"]["pages"]) + 1): r = self.get_military_unit_data(mu_id, page) for user in r.json()["panelContents"]["members"]: if not user["isDead"]: ret.update({user["citizenId"]: user["name"]}) return ret def send_mail(self, subject: str, msg: str, ids: List[int] = None): if ids is None: ids = [1620414, ] for player_id in ids: self.post_messages_compose(self.token, subject, msg, [player_id]) def add_every_player_as_friend(self): cities = [] cities_dict = {} self.write_log("WARNING! This will take a lot of time.") rj = self.post_travel_data(self.token, regionId=662, check="getCountryRegions").json() for region_data in rj.get("regions", {}).values(): cities.append(region_data['cityId']) cities_dict.update({region_data['cityId']: region_data['cityName']}) cities.sort(key=int) for city_id in cities: self.write_log("Adding friends from {} (id: {})".format(cities_dict[city_id], city_id)) resp = self.get_city_data_residents(city_id).json() for resident in resp["widgets"]["residents"]["residents"]: self.add_friend(resident["citizenId"]) for page in range(2, resp["widgets"]["residents"]["numResults"] // 10 + 2): r = self.get_city_data_residents(city_id, page) resp = r.json() for resident in resp["widgets"]["residents"]["residents"]: self.add_friend(resident["citizenId"]) def schedule_attack(self, war_id: int, region_id: int, at_time: datetime) -> None: if at_time: self.sleep(utils.get_sleep_seconds(at_time)) self.get_csrf_token() self._launch_battle(war_id, region_id) def get_active_wars_with_regions(self): self.get_country_military(self.countries.get(self.details.citizen_id)["name"]) raise NotImplementedError def _launch_battle(self, war_id: int, region_id: int) -> Response: return self.post_wars_attack_region(self.token, war_id, region_id) def state_update_repeater(self): try: start_time = self.now.replace(second=0, microsecond=0) if start_time.minute <= 30: start_time = start_time.replace(minute=30) else: start_time = start_time.replace(minute=0) while not self.stop_threads.is_set(): self.update_citizen_info() start_time = utils.good_timedelta(start_time, datetime.timedelta(minutes=30)) self.send_state_update() self.send_inventory_update() sleep_seconds = (start_time - self.now).total_seconds() self.stop_threads.wait(sleep_seconds if sleep_seconds > 0 else 0) except: self.report_error() def send_state_update(self): data = dict(xp=self.details.xp, cc=self.details.cc, gold=self.details.gold, pp=self.details.pp, inv_total=self.inventory['total'], inv=self.inventory['used'], hp_limit=self.energy.limit, hp_interval=self.energy.interval, hp_available=self.energy.available, food=self.food['total'], ) self.reporter.send_state_update(**data) def send_inventory_update(self): j = self.update_inventory() active_items = {} for item in j.get("inventoryItems", {}).get("activeEnhancements", {}).get("items", {}).values(): active_items.update({item['name']: item['active']['time_left']}) final_items = {} for item in j.get("inventoryItems", {}).get("finalProducts", {}).get("items", {}).values(): name = item['name'] if item.get('type') == 'damageBoosters': delta = item['duration'] if delta // 3600: name += f" {delta // 3600}h" if delta // 60 % 60: name += f" {delta // 60 % 60}m" if delta % 60: name += f" {delta % 60}s" if item['amount']: final_items.update({name: item['amount']}) raw_materials = {} for item in j.get("inventoryItems", {}).get("rawMaterials", {}).get("items", {}).values(): name = item['name'] if item['isPartial']: continue if item['amount']: raw_materials.update({name: item['amount']}) to_report = dict(items=dict(active=active_items, final=final_items, raw=raw_materials), status=self.inventory) self.reporter.report_action("INVENTORY", json_val=to_report) def check_house_durability(self) -> Dict[int, datetime.datetime]: ret = {} inv = self.update_inventory() if "activeEnhancements" in inv.get("inventoryItems", {}): active = inv.get("inventoryItems", {}).get("activeEnhancements", {}).get("items", {}) for q in range(1, 6): if "4_%i_active" % q in active: till = utils.good_timedelta(self.now, datetime.timedelta( seconds=active["4_%i_active" % q]["active"]["time_left"])) ret.update({q: till}) return ret def buy_and_activate_house(self, q: int) -> Dict[int, datetime.datetime]: inventory = self.update_inventory() ok_to_activate = False if not inventory.get("inventoryItems").get("finalProducts", {}).get("items", {}).get("4_{}".format(q)): offers = [] countries = [self.details.citizenship, ] if self.details.current_country != self.details.citizenship: countries.append(self.details.current_country) for country in countries: offers += [self.get_market_offers(country, "house", q)] global_cheapest = self.get_market_offers(product="house", quality=q) cheapest_offer = sorted(offers, key=lambda o: o["price"])[0] region = self.get_country_travel_region(global_cheapest['country']) if global_cheapest['price'] + 200 < cheapest_offer['price'] and region: self._travel(global_cheapest['country'], region) buy = self.buy_from_market(global_cheapest['offer_id'], 1) else: buy = self.buy_from_market(cheapest_offer['offer_id'], 1) if buy.json()["error"]: msg = "Unable to buy q{} house! \n{}".format(q, buy.json()['message']) self.write_log(msg) else: ok_to_activate = True else: ok_to_activate = True if ok_to_activate: self.activate_house(q) return self.check_house_durability() def renew_houses(self, forced: bool = False) -> Dict[int, datetime.datetime]: """ Renew all houses which endtime is in next 48h :param forced: if true - renew all houses :return: """ house_durability = self.check_house_durability() for q, active_till in house_durability.items(): if utils.good_timedelta(active_till, - datetime.timedelta(hours=48)) <= self.now or forced: house_durability = self.buy_and_activate_house(q) self.travel_to_residence() return house_durability def activate_house(self, quality: int) -> datetime.datetime: active_until = self.now r = self.post_economy_activate_house(self.token, quality) if r.json().get("status") and not r.json().get("error"): house = r.json()["inventoryItems"]["activeEnhancements"]["items"]["4_%i_active" % quality] active_until = utils.good_timedelta(active_until, datetime.timedelta(seconds=house["active"]["time_left"])) return active_until def collect_anniversary_reward(self) -> Response: return self.post_collect_anniversary_reward(self.token) def get_battle_round_data(self, battle_id: int, round_id: int, division: int = None) -> dict: battle = self.all_battles.get(battle_id) if not battle: return {} r = self.post_battle_console(self.token, battle_id, battle.zone_id, round_id, division, 1, True) return {battle.invader.id: r.json().get(str(battle.invader.id)).get("fighterData"), battle.defender.id: r.json().get(str(battle.defender.id)).get("fighterData")} def contribute_cc_to_country(self, amount: int or float = 0) -> bool: self.update_money() amount = int(amount) if self.details.cc < amount or amount < 20: return False json = dict(country=71, action='currency', value=amount) self.reporter.report_action("CONTRIBUTE_CC", json) r = self.post_country_donate(self.token, **json) return r.json().get('status') or not r.json().get('error') def contribute_food_to_country(self, amount: int = 0, quality: int = 1) -> bool: self.update_inventory() amount = amount // 1 if self.food["q" + str(quality)] < amount or amount < 10: return False json = dict(country=71, action='food', value=amount, quality=quality) self.reporter.report_action("CONTRIBUTE_FOOD", json) r = self.post_country_donate(self.token, **json) return r.json().get('status') or not r.json().get('error') def contribute_gold_to_country(self, amount: int) -> bool: self.update_money() if self.details.cc < amount: return False json = dict(country=71, action='gold', value=amount) self.reporter.report_action("CONTRIBUTE_GOLD", json) r = self.post_country_donate(self.token, **json) return r.json().get('status') or not r.json().get('error') def write_on_country_wall(self, message: str) -> bool: self.get_main() post_to_wall_as = re.findall(r"""id="post_to_country_as".*?.*""", self.r.text, re.S | re.M) if post_to_wall_as: self.post_country_post_create(self.token, message, max(post_to_wall_as)) return True return False def report_error(self, msg: str = ""): utils.process_error(msg, self.name, sys.exc_info(), self, self.commit_id, False) def get_battle_top_10(self, battle_id: int) -> Dict[int, List[Tuple[int, int]]]: battle = self.all_battles.get(battle_id) round_id = battle.get('zone_id') division = self.division if round_id % 4 else 11 resp = self.post_military_battle_console(self.token, battle_id, round_id, division).json() resp.pop('rounds', None) ret = dict() for country_id, data in resp.items(): ret.update({int(country_id): []}) for place in sorted(data.get("fighterData", {}).values(), key=lambda _: -_['raw_value']): ret[int(country_id)].append((place['citizenId'], place['raw_value'])) return ret def to_json(self, indent: bool = False) -> str: return dumps(self.__dict__, cls=utils.MyJSONEncoder, indent=4 if indent else None, sort_keys=True)