Compare commits

...

9 Commits

7 changed files with 181 additions and 95 deletions

View File

@ -43,6 +43,8 @@ clean-pyc: ## remove Python file artifacts
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +
rm -rf debug/
rm -rf log/
clean-test: ## remove test and coverage artifacts
rm -fr .tox/

View File

@ -4,7 +4,7 @@
__author__ = """Eriks Karls"""
__email__ = 'eriks@72.lv'
__version__ = '0.18.0'
__version__ = '0.18.3'
from erepublik import classes, utils
from erepublik.citizen import Citizen

View File

@ -1,5 +1,6 @@
import re
import sys
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import product
@ -13,6 +14,7 @@ from requests import Response, RequestException
from erepublik.classes import (CitizenAPI, Battle, Reporter, Config, Energy, Details, Politics, MyCompanies,
TelegramBot, ErepublikException, BattleDivision, MyJSONEncoder)
from erepublik.utils import *
from erepublik.utils import process_warning
class Citizen(CitizenAPI):
@ -41,7 +43,15 @@ class Citizen(CitizenAPI):
eday = 0
r: Response
energy: Energy = None
details: Details = None
politics: Politics = None
my_companies: MyCompanies = None
reporter: Reporter = None
stop_threads: Event = None
telegram: TelegramBot = None
r: Response = None
name = "Not logged in!"
debug = False
__registered = False
@ -322,8 +332,7 @@ class Citizen(CitizenAPI):
if self.promos is None:
self.promos = {}
else:
self.promos.clear()
self.promos.update({k: v for k, v in self.promos.items() if v > self.now})
self.promos = {k: v for k, v in self.promos.items() if v > self.now}
send_mail = False
for promo in promos:
promo_name = promo.get("id")
@ -705,35 +714,35 @@ class Citizen(CitizenAPI):
# CS Battles
elif self.details.citizenship in battle_sides:
if battle.is_air:
cs_battles_ground.append(battle.id)
else:
cs_battles_air.append(battle.id)
else:
cs_battles_ground.append(battle.id)
# Current location battles:
elif self.details.current_country in battle_sides:
if battle.is_air:
deployed_battles_ground.append(battle.id)
else:
deployed_battles_air.append(battle.id)
else:
deployed_battles_ground.append(battle.id)
# Deployed battles and allied battles:
elif self.details.current_country in battle.invader.allies + battle.defender.allies + battle_sides:
if self.details.current_country in battle.invader.deployed + battle.defender.deployed:
if battle.is_air:
deployed_battles_ground.append(battle.id)
else:
deployed_battles_air.append(battle.id)
else:
deployed_battles_ground.append(battle.id)
# Allied battles:
else:
if battle.is_air:
ally_battles_ground.append(battle.id)
else:
ally_battles_air.append(battle.id)
else:
ally_battles_ground.append(battle.id)
else:
if battle.is_air:
other_battles_ground.append(battle.id)
else:
other_battles_air.append(battle.id)
else:
other_battles_ground.append(battle.id)
ret_battles += (cs_battles_air + cs_battles_ground +
deployed_battles_air + deployed_battles_ground +
@ -807,7 +816,7 @@ class Citizen(CitizenAPI):
self.collect_weekly_reward()
break
def fight(self, battle_id: int, side_id: int = None, count: int = None):
def fight(self, battle_id: int, side_id: int = None, count: int = None) -> int:
"""Fight in a battle.
Will auto activate booster and travel if allowed to do it and
@ -817,6 +826,11 @@ class Citizen(CitizenAPI):
:param count: How many hits to do, if not specified self.should_fight() is called.
:return: None if no errors while fighting, otherwise error count.
"""
if not isinstance(battle_id, int):
self.report_error(f"WARNINNG! Parameter battle_id should be 'int', but it is '{type(battle_id).__name__}'")
battle_id = int(battle_id)
if battle_id not in self.all_battles:
self.update_war_info()
battle = self.all_battles[battle_id]
zone_id = battle.div[11 if battle.is_air else self.division].battle_zone_id
if not battle.is_air and self.config.boosters:
@ -843,10 +857,9 @@ class Citizen(CitizenAPI):
self.write_log("Hits: {:>4} | Damage: {}".format(total_hits, total_damage))
ok_to_fight = False
if total_damage:
self.reporter.report_action(json_val=dict(battle=battle_id, side=side_id, dmg=total_damage,
air=battle.is_air, hits=total_hits), action="FIGHT")
if error_count:
return error_count
self.reporter.report_action("FIGHT", dict(battle=battle_id, side=side_id, dmg=total_damage,
air=battle.is_air, hits=total_hits))
return error_count
def _shoot(self, battle_id: int, inv_side: bool, zone_id: int):
battle = self.all_battles[battle_id]
@ -888,14 +901,15 @@ class Citizen(CitizenAPI):
return hits, err, damage
def deploy_bomb(self, battle_id: int, bomb_id: int, inv_side: bool = None, count: int = 1):
def deploy_bomb(self, battle_id: int, bomb_id: int, inv_side: bool = None, count: int = 1) -> int:
"""Deploy bombs in a battle for given side.
:param battle_id: int battle id
:param bomb_id: int bomb id
:param inv_side: should deploy on invader side, if None then will deploy in currently available side
:param count: int how many bombs to deploy
:return:
:return: Deployed count
:rtype: int
"""
if not isinstance(count, int) or count < 1:
count = 1
@ -926,7 +940,7 @@ class Citizen(CitizenAPI):
return deployed_count
def change_division(self, battle_id: int, division_to: int):
"""Deploy bombs in a battle for given side.
"""Change division.
:param battle_id: int battle id
:param division_to: int target division to switch to
@ -1096,8 +1110,8 @@ class Citizen(CitizenAPI):
if not self.travel_to_region(wam_holding['region_id']):
return False
response = self._post_economy_work("production", wam=wam_list, employ=employee_companies).json()
self.reporter.report_action("WORK_WAM_EMPLOYEES", response)
if response.get("status"):
self.reporter.report_action("WORK_WAM_EMPLOYEES", response)
if self.config.auto_sell:
for kind, data in response.get("result", {}).get("production", {}).items():
if kind in self.config.auto_sell and data:
@ -1133,7 +1147,9 @@ class Citizen(CitizenAPI):
self.buy_food()
return self._do_wam_and_employee_work(wam_holding_id, employee_companies)
else:
self.write_log("I was not able to wam and or employ because:\n{}".format(response))
msg = "I was not able to wam and or employ because:\n{}".format(response)
self.reporter.report_action("WORK_WAM_EMPLOYEES", response, msg)
self.write_log(msg)
wam_count = self.my_companies.get_total_wam_count()
if wam_count:
self.write_log("Wam ff lockdown is now {}, was {}".format(wam_count, self.my_companies.ff_lockdown))
@ -1648,12 +1664,9 @@ class Citizen(CitizenAPI):
if kind in kinds:
return self._post_main_write_article(title, content, self.details.citizenship, kind)
else:
raise ErepublikException(
"Article kind must be one of:\n{}\n'{}' is not supported".format(
"\n".join(["{}: {}".format(k, v) for k, v in kinds.items()]),
kind
)
)
raise ErepublikException("Article kind must be one of:\n{}\n'{}' is not supported".format(
"\n".join(["{}: {}".format(k, v) for k, v in kinds.items()]), kind
))
def post_market_offer(self, industry: int, quality: int, amount: int, price: float) -> Response:
if industry not in self.available_industries.values():
@ -1684,32 +1697,15 @@ class Citizen(CitizenAPI):
self.reporter.report_action("BUY_PRODUCT", ret.json())
return json_ret
def get_raw_surplus(self) -> (float, float):
frm = 0.00
wrm = 0.00
for cdata in sorted(self.my_companies.companies.values()):
if cdata["industry_token"] == "FOOD":
raw = frm
elif cdata["industry_token"] == "WEAPON":
raw = wrm
else:
continue
effective_bonus = cdata["effective_bonus"]
base_prod = float(cdata["base_production"])
if cdata["is_raw"]:
raw += base_prod * effective_bonus / 100
else:
raw -= effective_bonus / 100 * base_prod * cdata["upgrades"][str(cdata["quality"])]["raw_usage"]
if cdata["industry_token"] == "FOOD":
frm = raw
elif cdata["industry_token"] == "WEAPON":
wrm = raw
return frm, wrm
def assign_factory_to_holding(self, factory_id: int, holding_id: int) -> Response:
"""
Assigns factory to new holding
"""
company = self.my_companies.companies[factory_id]
company_name = self.factories[company['industry_id']]
if not company['is_raw']:
company_name += f" q{company['quality']}"
self.write_log(f"{company_name} moved to {holding_id}")
return self._post_economy_assign_to_holding(factory_id, holding_id)
def upgrade_factory(self, factory_id: int, level: int) -> Response:
@ -1723,9 +1719,18 @@ class Citizen(CitizenAPI):
Storage={1000: 1, 2000: 2} <- Building_type 2
"""
company_name = self.factories[industry_id]
if building_type == 2:
company_name = f"Storage"
self.write_log(f"{company_name} created!")
return self._post_economy_create_company(industry_id, building_type)
def dissolve_factory(self, factory_id: int) -> Response:
company = self.my_companies.companies[factory_id]
company_name = self.factories[company['industry_id']]
if not company['is_raw']:
company_name += f" q{company['quality']}"
self.write_log(f"{company_name} dissolved!")
return self._post_economy_sell_company(factory_id, self.details.pin, sell=False)
@property
@ -1737,6 +1742,17 @@ class Citizen(CitizenAPI):
return {"food": 1, "weapon": 2, "house": 4, "aircraft": 23,
"foodRaw": 7, "weaponRaw": 12, "houseRaw": 17, "airplaneRaw": 24}
@property
def factories(self) -> Dict[int, str]:
"""Returns factory industries as dict(id: name)
:return: dict
"""
return {1: "Food", 2: "Weapons", 4: "House", 23: "Aircraft",
7: "FRM q1", 8: "FRM q2", 9: "FRM q3", 10: "FRM q4", 11: "FRM q5",
12: "WRM q1", 13: "WRM q2", 14: "WRM q3", 15: "WRM q4", 16: "WRM q5",
18: "HRM q1", 19: "HRM q2", 20: "HRM q3", 21: "HRM q4", 22: "HRM q5",
24: "ARM q1", 25: "ARM q2", 26: "ARM q3", 27: "ARM q4", 28: "ARM q5", }
def get_industry_id(self, industry_name: str) -> int:
"""
Returns industry id
@ -1872,7 +1888,7 @@ class Citizen(CitizenAPI):
r'class="join" title="Join"><span>Join</span></a>', html):
battle_id = re.search(r'<a href="//www.erepublik.com/en/military/battlefield/(\d+)" '
r'class="join" title="Join"><span>Join</span></a>', html).group(1)
ret.update(can_attack=False, battle_id=battle_id)
ret.update(can_attack=False, battle_id=int(battle_id))
elif re.search(r'This war is no longer active.', html):
ret.update(can_attack=False, ended=True)
else:
@ -1997,7 +2013,7 @@ class Citizen(CitizenAPI):
return False
data = dict(country=71, action='currency', value=amount)
self.telegram.send_message(f"Donated {amount}cc to {COUNTRIES[71]}")
self.reporter.report_action("CONTRIBUTE_CC", data)
self.reporter.report_action("CONTRIBUTE_CC", data, str(amount))
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
@ -2007,7 +2023,7 @@ class Citizen(CitizenAPI):
if self.food["q" + str(quality)] < amount or amount < 10:
return False
data = dict(country=71, action='food', value=amount, quality=quality)
self.reporter.report_action("CONTRIBUTE_FOOD", data)
self.reporter.report_action("CONTRIBUTE_FOOD", data, FOOD_ENERGY[quality] * amount)
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
@ -2017,7 +2033,7 @@ class Citizen(CitizenAPI):
if self.details.cc < amount:
return False
data = dict(country=71, action='gold', value=amount)
self.reporter.report_action("CONTRIBUTE_GOLD", data)
self.reporter.report_action("CONTRIBUTE_GOLD", data, str(amount))
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
@ -2028,8 +2044,11 @@ class Citizen(CitizenAPI):
r = self._post_main_country_post_create(message, max(post_to_wall_as, key=int) if post_to_wall_as else 0)
return r.json()
def report_error(self, msg: str = ""):
process_error(msg, self.name, sys.exc_info(), self, self.commit_id, False)
def report_error(self, msg: str = "", is_warning: bool = False):
if is_warning:
process_warning(msg, self.name, sys.exc_info(), self, self.commit_id)
else:
process_error(msg, self.name, sys.exc_info(), self, self.commit_id, None)
def get_battle_top_10(self, battle_id: int) -> Dict[int, List[Tuple[int, int]]]:
return {}

View File

@ -4,7 +4,7 @@ import hashlib
import random
import threading
import time
from collections import deque
from collections import deque, defaultdict
from json import JSONDecodeError, loads, JSONEncoder
from typing import Any, Dict, List, Union, Mapping, Iterable, Tuple
@ -40,7 +40,7 @@ class MyCompanies:
"""
:param holdings: Parsed JSON to dict from en/economy/myCompanies
"""
self.holdings = {}
self.holdings.clear()
template = dict(id=0, num_factories=0, region_id=0, companies=[])
for holding_id, holding in holdings.items():
@ -57,18 +57,20 @@ class MyCompanies:
"""
:param companies: Parsed JSON to dict from en/economy/myCompanies
"""
self.companies = {}
self.companies.clear()
template = dict(id=None, quality=0, is_raw=False, resource_bonus=0, effective_bonus=0, raw_usage=0,
production=0, base_production=0, wam_enabled=False, can_work_as_manager=False,
base_production=0, wam_enabled=False, can_work_as_manager=False, industry_id=0, todays_works=0,
preset_own_work=0, already_worked=False, can_assign_employees=False, preset_works=0,
todays_works=0, holding_company_id=None, is_assigned_to_holding=False,
cannot_work_as_manager_reason=False, industry_id=0)
holding_company_id=None, is_assigned_to_holding=False, cannot_work_as_manager_reason=False)
for c_id, company in companies.items():
tmp = {}
for key in template.keys():
if key in ['id', 'holding_company_id']:
company[key] = int(company[key])
elif key == "raw_usage":
if not company.get("is_raw") and company.get('upgrades'):
company[key] = company.get('upgrades').get(str(company["quality"])).get('raw_usage')
tmp.update({key: company[key]})
self.companies.update({int(c_id): tmp})
@ -76,9 +78,8 @@ class MyCompanies:
for company_id, company_data in self.companies.items():
if company_id not in self.holdings[company_data['holding_company_id']]['companies']:
self.holdings[company_data['holding_company_id']]['companies'].append(company_id)
else:
for holding_id in self.holdings:
self.holdings[holding_id]['companies'].sort()
for holding_id in self.holdings:
self.holdings[holding_id]['companies'].sort()
def get_employable_factories(self) -> Dict[int, int]:
ret = {}
@ -160,6 +161,31 @@ class MyCompanies:
raise ErepublikException("Wrong function call")
def get_wam_raw_usage(self) -> Dict[str, float]:
frm = 0.00
wrm = 0.00
for company in self.companies.values():
if company['wam_enabled']:
effective_bonus = float(company["effective_bonus"])
base_prod = float(company["base_production"])
raw = base_prod * effective_bonus / 100
if not company["is_raw"]:
raw *= -company["raw_usage"]
if company["industry_id"] in [1, 7, 8, 9, 10, 11]:
frm += raw
elif company["industry_id"] in [2, 12, 13, 14, 15, 16]:
wrm += raw
return {'frm': int(frm * 1000) / 1000, 'wrm': int(wrm * 1000) / 1000}
def __str__(self):
name = []
for holding_id in sorted(self.holdings.keys()):
if not holding_id:
name.append(f"Unassigned - {len(self.holdings[0]['companies'])}")
else:
name.append(f"{holding_id} - {len(self.holdings[holding_id]['companies'])}")
return " | ".join(name)
# @property
# def __dict__(self):
# ret = {}
@ -174,19 +200,19 @@ class SlowRequests(Session):
timeout = datetime.timedelta(milliseconds=500)
uas = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36',
# FireFox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:67.0) Gecko/20100101 Firefox/67.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0',
]
debug = False
@ -1073,17 +1099,25 @@ class BattleDivision:
def div_end(self) -> bool:
return utils.now() >= self.end
def __init__(
self, div_id: int, end: datetime.datetime, epic: bool, inv_pts: int, def_pts: int,
wall_for: int, wall_dom: float, def_medal: Tuple[int, int], inv_medal: Tuple[int, int]
):
self.battle_zone_id = div_id
self.end = end
self.epic = epic
self.dom_pts = dict({"inv": inv_pts, "def": def_pts})
self.wall = dict({"for": wall_for, "dom": wall_dom})
self.def_medal = {"id": def_medal[0], "dmg": def_medal[1]}
self.inv_medal = {"id": inv_medal[0], "dmg": inv_medal[1]}
def __init__(self, **kwargs):
"""Battle division helper class
:param kwargs: must contain keys:
div_id: int, end: datetime.datetime, epic: bool, inv_pts: int, def_pts: int,
wall_for: int, wall_dom: float, def_medal: Tuple[int, int], inv_medal: Tuple[int, int]
"""
self.battle_zone_id = kwargs.get("div_id", 0)
self.end = kwargs.get("end", 0)
self.epic = kwargs.get("epic", 0)
self.dom_pts = dict({"inv": kwargs.get("inv_pts", 0), "def": kwargs.get("def_pts", 0)})
self.wall = dict({"for": kwargs.get("wall_for", 0), "dom": kwargs.get("wall_dom", 0)})
self.def_medal = {"id": kwargs.get("def_medal", 0)[0], "dmg": kwargs.get("def_medal", 0)[1]}
self.inv_medal = {"id": kwargs.get("inv_medal", 0)[0], "dmg": kwargs.get("inv_medal", 0)[1]}
@property
def id(self):
return self.battle_zone_id
class Battle:
@ -1138,7 +1172,7 @@ class Battle:
[row.get('id') for row in battle.get('def', {}).get('ally_list') if row['deployed']]
)
self.div = {}
self.div = defaultdict(BattleDivision)
for div, data in battle.get('div', {}).items():
div = int(data.get('div'))
if data.get('end'):

View File

@ -8,7 +8,7 @@ import time
import traceback
import unicodedata
from pathlib import Path
from typing import Union, Any, List, NoReturn, Mapping
from typing import Union, Any, List, NoReturn, Mapping, Optional
import pytz
import requests
@ -17,7 +17,7 @@ __all__ = ["FOOD_ENERGY", "COMMIT_ID", "COUNTRIES", "erep_tz", 'COUNTRY_LINK',
"now", "localize_dt", "localize_timestamp", "good_timedelta", "eday_from_date", "date_from_eday",
"get_sleep_seconds", "interactive_sleep", "silent_sleep",
"write_silent_log", "write_interactive_log", "get_file", "write_file",
"send_email", "normalize_html_json", "process_error", 'report_promo', 'calculate_hit']
"send_email", "normalize_html_json", "process_error", "process_warning", 'report_promo', 'calculate_hit']
FOOD_ENERGY = dict(q1=2, q2=4, q3=6, q4=8, q5=10, q6=12, q7=20)
COMMIT_ID = "7b92e19"
@ -288,10 +288,43 @@ def normalize_html_json(js: str) -> str:
def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None,
interactive: bool = False):
interactive: Optional[bool] = None):
"""
Process error logging and email sending to developer
:param interactive: Should print interactively
:type interactive: bool
:param log_info: String to be written in output
:type log_info: str
:param name: String Instance name
:type name: str
:param exc_info: tuple output from sys.exc_info()
:type exc_info: tuple
:param citizen: Citizen instance
:type citizen: Citizen
:param commit_id: Code's version by commit id
:type commit_id: str
"""
type_, value_, traceback_ = exc_info
content = [log_info]
if commit_id:
content += ["Commit id: %s" % commit_id]
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
if interactive:
write_interactive_log(log_info)
elif interactive is not None:
write_silent_log(log_info)
trace = inspect.trace()
if trace:
trace = trace[-1][0].f_locals
else:
trace = dict()
send_email(name, content, citizen, local_vars=trace)
def process_warning(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None):
"""
Process error logging and email sending to developer
:param log_info: String to be written in output
:param name: String Instance name
:param exc_info: tuple output from sys.exc_info()
@ -299,19 +332,17 @@ def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commi
:param commit_id: Code's version by commit id
"""
type_, value_, traceback_ = exc_info
bugtrace = [] if not commit_id else ["Commit id: %s" % commit_id, ]
bugtrace += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
content = [log_info]
if commit_id:
content += ["Commit id: %s" % commit_id]
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
if interactive:
write_interactive_log(log_info)
else:
write_silent_log(log_info)
trace = inspect.trace()
if trace:
trace = trace[-1][0].f_locals
else:
trace = dict()
send_email(name, bugtrace, citizen, local_vars=trace)
send_email(name, content, citizen, local_vars=trace)
def report_promo(kind: str, time_untill: datetime.datetime) -> NoReturn:

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.18.0
current_version = 0.18.3
commit = True
tag = True

View File

@ -42,6 +42,6 @@ setup(
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/eeriks/erepublik/',
version='0.18.0',
version='0.18.3',
zip_safe=False,
)