Compare commits

...

22 Commits

Author SHA1 Message Date
76bd40c655 Bump version: 0.19.0 → 0.19.1 2020-01-13 21:33:58 +02:00
15e6deebda Battle initialization without valid data should be avoided to not run into strange and hard to trace bugs.
Jsonification updates - if simplejson is available some packages are importing simplejson with try-except and later throwing simplejson errors which should be cought when calling .json() on every request.
Fixed error logging
2020-01-13 21:33:50 +02:00
69d0e7df0a Bump version: 0.18.3 → 0.19.0 2020-01-13 10:40:33 +02:00
4f92894ab6 DocUpdate 2020-01-13 10:40:22 +02:00
9c64bfac0f Merge branch 'inventory_updates'
* inventory_updates:
  Python 3.8, isort, requirement update
  Representation of Citizen class
  Created method for current products on sale. Updated inventory to also include products on sale

# Conflicts:
#	erepublik/citizen.py
2020-01-13 10:31:05 +02:00
1f07f2e270 Update:
Citizen.set_default_weapon() - eRepublik should return list with all available weapon qualities, but when a battle is just launched, they return only dict with barehands
Citizen.fight() - no longer calls self.set_default_weapon()
Citizen.find_battle_and_fight() - now calls self.set_default_weapon() just before fighting
Citizen.update_war_info() - returns previous battle list if responses 'last_updated' isn't more than 30s old

New:
Citizen.get_battle_for_war(war_id) - returns Battle instance for specific war, if battle is active for given war
2020-01-13 10:28:42 +02:00
71d204843d Python 3.8, isort, requirement update 2020-01-09 12:03:11 +02:00
d9305214eb Representation of Citizen class 2020-01-07 19:55:31 +02:00
5556d5f772 Created method for current products on sale. Updated inventory to also include products on sale 2020-01-07 16:28:42 +02:00
1c47d169d2 Bump version: 0.18.2 → 0.18.3 2020-01-07 11:32:59 +02:00
ef44787bad make clean-pyc removes log/ and debug/ run 'artifacts' 2020-01-07 11:32:48 +02:00
42431134e1 UA update 2020-01-07 11:30:40 +02:00
bedaeeefd1 Battle division update 2020-01-07 11:15:40 +02:00
bbf304aa99 Bump version: 0.18.1 → 0.18.2 2020-01-05 10:53:39 +02:00
a2447959e7 fight must receive battle id as int, added warnings support 2020-01-05 10:53:26 +02:00
700bd8d98e Bump version: 0.18.0 → 0.18.1 2020-01-02 22:43:01 +02:00
3599dc40fc More logging, Citizen.get_raw_surplus() fixed and moved to Citizen.my_companies.get_wam_raw_usage() 2020-01-02 22:42:40 +02:00
6ba727a781 promo spam loop 2020-01-02 18:49:38 +02:00
7f1829a5d7 Bump version: 0.17.3 → 0.18.0 2019-12-18 16:26:10 +02:00
e374aa8a54 Implemented division switching,
improved multi bomb deploy with auto traveling,
Citizen.fight() simplified battle data gathering logic -> Citizen.shoot logic improved
Citizen.all_battles are now defaultdict with default value of empty/invalid battle, for times when trying to do things with battle which is not in all_battle dict
2019-12-18 16:25:52 +02:00
7edfa3b004 Bump version: 0.17.2 → 0.17.3 2019-12-18 11:45:02 +02:00
12aee23739 Variable and method redeclaration 2019-12-18 11:44:18 +02:00
15 changed files with 686 additions and 211 deletions

View File

@ -19,3 +19,8 @@ insert_final_newline = false
[Makefile]
indent_style = tab
[*.py]
line_length=120
multi_line_output=0
balanced_wrapping=True

2
.gitignore vendored
View File

@ -85,7 +85,7 @@ celerybeat-schedule
# virtualenv
.venv
venv/
venv*/
ENV/
# Spyder project settings

View File

@ -5,7 +5,7 @@ python:
- 3.7
- 3.6
# Command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors
# Command to install dependencies, e.g. pip install -r requirements_dev.txt --use-mirrors
install: pip install -U tox-travis
# Command to run tests, e.g. python setup.py test

View File

@ -23,12 +23,6 @@ If you are reporting a bug, please include:
* Any details about your local setup that might be helpful in troubleshooting.
* Detailed steps to reproduce the bug.
Fix Bugs
~~~~~~~~
Look through the GitHub issues for bugs. Anything tagged with "bug" and "help
wanted" is open to whoever wants to implement it.
Implement Features
~~~~~~~~~~~~~~~~~~
@ -59,7 +53,7 @@ Get Started!
Ready to contribute? Here's how to set up `erepublik` for local development.
1. Fork the `erepublik_script` repo on GitHub.
1. Fork the `erepublik` repo on GitHub.
2. Clone your fork locally::
$ git clone git@github.com:your_name_here/erepublik.git
@ -103,7 +97,7 @@ Before you submit a pull request, check that it meets these guidelines:
your new functionality into a function with a docstring, and add the
feature to the list in README.rst.
3. The pull request should work for Python 3.7.1. Check
https://travis-ci.org/eeriks/erepublik_script/pull_requests
https://travis-ci.org/eeriks/erepublik/pull_requests
and make sure that the tests pass for all supported Python versions.
Tips
@ -112,7 +106,7 @@ Tips
To run a subset of tests::
$ python -m unittest tests.test_erepublik_script
$ python -m unittest tests.test_erepublik
Deploying
---------

View File

@ -2,6 +2,20 @@
History
=======
0.19.0 (2020-01-13)
-------------------
* Created method for current products on sale.
* Updated inventory to also include products on sale
* set_default_weapon() - eRepublik should return list with all available weapon qualities, but when a battle is just launched, they return only dict with barehands
* fight() - no longer calls self.set_default_weapon()
* find_battle_and_fight() - now calls self.set_default_weapon() just before fighting
* update_war_info() - returns previous battle list if responses 'last_updated' isn't more than 30s old
* get_battle_for_war(war_id) - returns Battle instance for specific war, if battle is active for given war
* Citizen.get_raw_surplus() fixed and moved to Citizen.my_companies.get_wam_raw_usage()
* Implemented division switching
* improved multi bomb deploy with auto traveling,
* Citizen.fight() simplified battle data gathering logic -> Citizen.shoot logic improved
0.17.0 (2019-11-21)
-------------------

View File

@ -43,6 +43,8 @@ clean-pyc: ## remove Python file artifacts
find . -name '*.pyo' -exec rm -f {} +
find . -name '*~' -exec rm -f {} +
find . -name '__pycache__' -exec rm -fr {} +
rm -rf debug/
rm -rf log/
clean-test: ## remove test and coverage artifacts
rm -fr .tox/

View File

@ -4,7 +4,7 @@
__author__ = """Eriks Karls"""
__email__ = 'eriks@72.lv'
__version__ = '0.17.2'
__version__ = '0.19.1'
from erepublik import classes, utils
from erepublik.citizen import Citizen

View File

@ -1,20 +1,22 @@
import re
import sys
from threading import Event
from itertools import product
from datetime import datetime, timedelta
from json import loads, dumps
from itertools import product
from threading import Event
from time import sleep
from typing import Dict, List, Tuple, Any, Union, Set
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from requests import Response, RequestException
from erepublik.classes import (CitizenAPI, Battle, Reporter, Config, Energy, Details, Politics, MyCompanies,
TelegramBot, ErepublikException, BattleDivision, MyJSONEncoder)
from requests import RequestException, Response
from erepublik.classes import (Battle, BattleDivision, CitizenAPI, Config, Details, Energy, ErepublikException,
MyCompanies, MyJSONEncoder, Politics, Reporter, TelegramBot)
from erepublik.utils import *
try:
import simplejson as json
except ImportError:
import json
class Citizen(CitizenAPI):
division = 0
@ -38,11 +40,19 @@ class Citizen(CitizenAPI):
ot_points = 0
tg_contract = None
promos = None
promos: Dict[str, datetime] = None
eday = 0
r: Response
energy: Energy = None
details: Details = None
politics: Politics = None
my_companies: MyCompanies = None
reporter: Reporter = None
stop_threads: Event = None
telegram: TelegramBot = None
r: Response = None
name = "Not logged in!"
debug = False
__registered = False
@ -103,6 +113,10 @@ class Citizen(CitizenAPI):
def __str__(self) -> str:
return f"Citizen {self.name}"
def __repr__(self):
return self.__str__()
@property
def __dict__(self):
ret = super().__dict__.copy()
ret.pop('stop_threads', None)
@ -169,7 +183,7 @@ class Citizen(CitizenAPI):
if j['error'] and j["message"] == 'Too many requests':
self.write_log("Made too many requests! Sleeping for 30 seconds.")
self.sleep(30)
except:
except (json.JSONDecodeError, KeyError, TypeError):
pass
if response.status_code >= 400:
self.r = response
@ -319,8 +333,11 @@ class Citizen(CitizenAPI):
self._get_main()
return
ugly_js = re.search(r'"promotions":\s*(\[{?.*?}?])', html).group(1)
promos = loads(normalize_html_json(ugly_js))
self.promos = {k: v for k, v in (self.promos.items() if self.promos else {}) if v > self.now}
promos = json.loads(normalize_html_json(ugly_js))
if self.promos is None:
self.promos = {}
else:
self.promos = {k: v for k, v in self.promos.items() if v > self.now}
send_mail = False
for promo in promos:
promo_name = promo.get("id")
@ -351,7 +368,7 @@ class Citizen(CitizenAPI):
)
ugly_js = re.search(r"var erepublik = ({.*}),\s+", html).group(1)
citizen_js = loads(ugly_js)
citizen_js = json.loads(ugly_js)
citizen = citizen_js.get("citizen", {})
self.eday = citizen_js.get("settings").get("eDay")
@ -407,14 +424,14 @@ class Citizen(CitizenAPI):
def update_companies(self):
html = self._get_economy_my_companies().text
page_details = loads(re.search(r"var pageDetails\s+= ({.*});", html).group(1))
page_details = json.loads(re.search(r"var pageDetails\s+= ({.*});", html).group(1))
self.my_companies.work_units = int(page_details.get("total_works", 0))
have_holdings = re.search(r"var holdingCompanies\s+= ({.*}});", html)
have_companies = re.search(r"var companies\s+= ({.*}});", html)
if have_holdings and have_companies:
self.my_companies.prepare_companies(loads(have_companies.group(1)))
self.my_companies.prepare_holdings(loads(have_holdings.group(1)))
self.my_companies.prepare_companies(json.loads(have_companies.group(1)))
self.my_companies.prepare_holdings(json.loads(have_holdings.group(1)))
self.my_companies.update_holding_companies()
def update_inventory(self) -> Dict[str, Any]:
@ -493,7 +510,21 @@ class Citizen(CitizenAPI):
if kind not in final_items:
final_items[kind] = {}
icon = item['icon'] if item['icon'] else "//www.erepublik.net/images/modules/manager/tab_storage.png"
if item['icon']:
icon = item['icon']
else:
if item['type'] == 'damageBoosters':
icon = "/images/modules/pvp/damage_boosters/damage_booster.png"
elif item['type'] == 'aircraftDamageBoosters':
icon = "/images/modules/pvp/damage_boosters/air_damage_booster.png"
elif item['type'] == 'prestigePointsBoosters':
icon = "/images/modules/pvp/prestige_points_boosters/prestige_booster.png"
elif item['type'] == 'speedBoosters':
icon = "/images/modules/pvp/speed_boosters/speed_booster.png"
elif item['type'] == 'catchupBoosters':
icon = "/images/modules/pvp/ghost_boosters/icon_booster_30_60.png"
else:
icon = "//www.erepublik.net/images/modules/manager/tab_storage.png"
data = dict(kind=kind, quality=item.get('quality', 0), amount=item.get('amount', 0),
durability=item.get('duration', 0), icon=icon, name=name)
if item.get('type') in ('damageBoosters', "aircraftDamageBoosters"):
@ -521,16 +552,29 @@ class Citizen(CitizenAPI):
icon=icon)
)
offers = {}
for offer in self._get_economy_my_market_offers().json():
kind = self.get_industry_name(offer['industryId'])
data = dict(quality=offer.get('quality', 0), amount=offer.get('amount', 0), icon=offer.get('icon'),
kind=kind, name=kind)
data = {data['quality']: data}
if kind not in offers:
offers[kind] = {}
offers[kind].update(data)
self.inventory.update({"used": j.get("inventoryStatus").get("usedStorage"),
"total": j.get("inventoryStatus").get("totalStorage")})
inventory = dict(items=dict(active=active_items, final=final_items, raw=raw_materials), status=self.inventory)
inventory = dict(items=dict(active=active_items, final=final_items,
raw=raw_materials, offers=offers), status=self.inventory)
self.food["total"] = sum([self.food[q] * FOOD_ENERGY[q] for q in FOOD_ENERGY])
return inventory
def update_weekly_challenge(self):
data = self._get_main_weekly_challenge_data().json()
self.details.pp = data.get("player", {}).get("prestigePoints", 0)
self.details.next_pp = []
self.details.next_pp.clear()
for reward in data.get("rewards", {}).get("normal", {}):
status = reward.get("status", "")
if status == "rewarded":
@ -547,10 +591,21 @@ class Citizen(CitizenAPI):
if not self.details.current_country:
self.update_citizen_info()
resp_json = self._get_military_campaigns_json_list().json()
if self.__last_war_update_data and self.__last_war_update_data.get('last_updated', 0) + 30 > self.now.timestamp():
resp_json = self.__last_war_update_data
else:
resp_json = self._get_military_campaigns_json_list().json()
if resp_json.get("countries"):
self.all_battles = {}
self.countries = {}
if self.all_battles is None:
self.all_battles = {}
else:
self.all_battles.clear()
if self.countries is None:
self.countries = {}
else:
self.countries.clear()
for c_id, c_data in resp_json.get("countries").items():
if int(c_id) not in self.countries:
self.countries.update({
@ -561,7 +616,7 @@ class Citizen(CitizenAPI):
self.__last_war_update_data = resp_json
if resp_json.get("battles"):
for battle_data in resp_json.get("battles", {}).values():
self.all_battles.update({battle_data.get('id'): Battle(battle_data)})
self.all_battles[battle_data.get('id')] = Battle(battle_data)
def eat(self):
"""
@ -593,8 +648,7 @@ class Citizen(CitizenAPI):
r_json = response.json()
next_recovery = r_json.get("food_remaining_reset").split(":")
self.energy.set_reference_time(
good_timedelta(self.now,
timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2])))
good_timedelta(self.now, timedelta(seconds=int(next_recovery[1]) * 60 + int(next_recovery[2])))
)
self.energy.recovered = r_json.get("health")
self.energy.recoverable = r_json.get("food_remaining")
@ -619,7 +673,7 @@ class Citizen(CitizenAPI):
def now(self) -> datetime:
"""
Returns aware datetime object localized to US/Pacific (eRepublik time)
:return: datetime.datetime
:return: datetime
"""
return now()
@ -695,35 +749,35 @@ class Citizen(CitizenAPI):
# CS Battles
elif self.details.citizenship in battle_sides:
if battle.is_air:
cs_battles_ground.append(battle.id)
else:
cs_battles_air.append(battle.id)
else:
cs_battles_ground.append(battle.id)
# Current location battles:
elif self.details.current_country in battle_sides:
if battle.is_air:
deployed_battles_ground.append(battle.id)
else:
deployed_battles_air.append(battle.id)
else:
deployed_battles_ground.append(battle.id)
# Deployed battles and allied battles:
elif self.details.current_country in battle.invader.allies + battle.defender.allies + battle_sides:
if self.details.current_country in battle.invader.deployed + battle.defender.deployed:
if battle.is_air:
deployed_battles_ground.append(battle.id)
else:
deployed_battles_air.append(battle.id)
else:
deployed_battles_ground.append(battle.id)
# Allied battles:
else:
if battle.is_air:
ally_battles_ground.append(battle.id)
else:
ally_battles_air.append(battle.id)
else:
ally_battles_ground.append(battle.id)
else:
if battle.is_air:
other_battles_ground.append(battle.id)
else:
other_battles_air.append(battle.id)
else:
other_battles_ground.append(battle.id)
ret_battles += (cs_battles_air + cs_battles_ground +
deployed_battles_air + deployed_battles_ground +
@ -792,17 +846,30 @@ class Citizen(CitizenAPI):
if not self.travel_to_battle(battle_id, country_ids_to_travel):
break
self.set_default_weapon(battle_id)
self.fight(battle_id, side_id)
self.travel_to_residence()
self.collect_weekly_reward()
break
def fight(self, battle_id: int, side_id: int, count: int = None):
battle = self.all_battles[battle_id]
def fight(self, battle_id: int, side_id: int = None, count: int = None) -> int:
"""Fight in a battle.
Will auto activate booster and travel if allowed to do it.
:param battle_id: int BattleId - battle to fight in
:param side_id: int or None. Battle side to fight in, If side_id not == invader id or not in invader deployed allies list, then defender's side is chosen
:param count: How many hits to do, if not specified self.should_fight() is called.
:return: None if no errors while fighting, otherwise error count.
"""
if not isinstance(battle_id, int):
self.report_error(f"WARNINNG! Parameter battle_id should be 'int', but it is '{type(battle_id).__name__}'")
battle_id = int(battle_id)
if battle_id not in self.all_battles:
self.update_war_info()
battle = self.all_battles.get(battle_id)
zone_id = battle.div[11 if battle.is_air else self.division].battle_zone_id
if not battle.is_air and self.config.boosters:
self.activate_dmg_booster()
self.set_default_weapon(battle_id)
error_count = 0
ok_to_fight = True
if count is None:
@ -810,10 +877,10 @@ class Citizen(CitizenAPI):
total_damage = 0
total_hits = 0
side = battle.invader.id == side_id or side_id in battle.invader.deployed
while ok_to_fight and error_count < 10 and count > 0:
while all((count > 0, error_count < 10, self.energy.recovered >= 50)):
hits, error, damage = self._shoot(battle.is_air, battle_id, side_id, zone_id)
hits, error, damage = self._shoot(battle_id, side, zone_id)
count -= hits
total_hits += hits
total_damage += damage
@ -824,13 +891,14 @@ class Citizen(CitizenAPI):
self.write_log("Hits: {:>4} | Damage: {}".format(total_hits, total_damage))
ok_to_fight = False
if total_damage:
self.reporter.report_action(json_val=dict(battle=battle_id, side=side_id, dmg=total_damage,
air=battle.is_air, hits=total_hits), action="FIGHT")
if error_count:
return error_count
self.reporter.report_action("FIGHT", dict(battle=battle_id, side=side_id, dmg=total_damage,
air=battle.is_air, hits=total_hits))
return error_count
def _shoot(self, air: bool, battle_id: int, side_id: int, zone_id: int):
if air:
def _shoot(self, battle_id: int, inv_side: bool, zone_id: int):
battle = self.all_battles.get(battle_id)
side_id = battle.invader.id if inv_side else battle.defender.id
if battle.is_air:
response = self._post_military_fight_air(battle_id, side_id, zone_id)
else:
response = self._post_military_fight_ground(battle_id, side_id, zone_id)
@ -853,6 +921,9 @@ class Citizen(CitizenAPI):
else:
if j_resp.get("message") == "UNKNOWN_SIDE":
self._rw_choose_side(battle_id, side_id)
elif j_resp.get("message") == "CHANGE_LOCATION":
countries = [side_id] + battle.invader.deployed if inv_side else battle.defender.deployed
self.travel_to_battle(battle_id, countries)
err = True
elif j_resp.get("message") == "ENEMY_KILLED":
hits = (self.energy.recovered - j_resp["details"]["wellness"]) // 10
@ -864,9 +935,53 @@ class Citizen(CitizenAPI):
return hits, err, damage
def deploy_bomb(self, battle_id: int, bomb_id: int):
r = self._post_military_deploy_bomb(battle_id, bomb_id).json()
return not r.get('error')
def deploy_bomb(self, battle_id: int, bomb_id: int, inv_side: bool = None, count: int = 1) -> int:
"""Deploy bombs in a battle for given side.
:param battle_id: int battle id
:param bomb_id: int bomb id
:param inv_side: should deploy on invader side, if None then will deploy in currently available side
:param count: int how many bombs to deploy
:return: Deployed count
:rtype: int
"""
if not isinstance(count, int) or count < 1:
count = 1
has_traveled = False
battle = self.all_battles.get(battle_id)
if inv_side:
good_countries = [battle.invader.id] + battle.invader.deployed
if self.details.current_country not in good_countries:
has_traveled = self.travel_to_battle(battle_id, good_countries)
elif inv_side is not None:
good_countries = [battle.defender.id] + battle.defender.deployed
if self.details.current_country not in good_countries:
has_traveled = self.travel_to_battle(battle_id, good_countries)
else:
involved = [battle.invader.id, battle.defender.id] + battle.invader.deployed + battle.defender.deployed
if self.details.current_country not in involved:
count = 0
errors = deployed_count = 0
while (not deployed_count == count) and errors < 10:
r = self._post_military_deploy_bomb(battle_id, bomb_id).json()
if not r.get('error'):
deployed_count += 1
elif r.get('message') == 'LOCKED':
sleep(0.5)
if has_traveled:
self.travel_to_residence()
return deployed_count
def change_division(self, battle_id: int, division_to: int):
"""Change division.
:param battle_id: int battle id
:param division_to: int target division to switch to
:return:
"""
battle = self.all_battles.get(battle_id)
self._post_main_battlefield_change_division(battle_id, battle.div[division_to].battle_zone_id)
def work_ot(self):
# I"m not checking for 1h cooldown. Beware of nightshift work, if calling more than once every 60min
@ -1029,8 +1144,8 @@ class Citizen(CitizenAPI):
if not self.travel_to_region(wam_holding['region_id']):
return False
response = self._post_economy_work("production", wam=wam_list, employ=employee_companies).json()
self.reporter.report_action("WORK_WAM_EMPLOYEES", response)
if response.get("status"):
self.reporter.report_action("WORK_WAM_EMPLOYEES", response)
if self.config.auto_sell:
for kind, data in response.get("result", {}).get("production", {}).items():
if kind in self.config.auto_sell and data:
@ -1066,7 +1181,9 @@ class Citizen(CitizenAPI):
self.buy_food()
return self._do_wam_and_employee_work(wam_holding_id, employee_companies)
else:
self.write_log("I was not able to wam and or employ because:\n{}".format(response))
msg = "I was not able to wam and or employ because:\n{}".format(response)
self.reporter.report_action("WORK_WAM_EMPLOYEES", response, msg)
self.write_log(msg)
wam_count = self.my_companies.get_total_wam_count()
if wam_count:
self.write_log("Wam ff lockdown is now {}, was {}".format(wam_count, self.my_companies.ff_lockdown))
@ -1488,7 +1605,7 @@ class Citizen(CitizenAPI):
if self.max_time_till_full_ff > self.time_till_week_change:
max_count = (int(self.time_till_week_change.total_seconds()) // 360 * self.energy.interval) // 10
log_msg = ("End for Weekly challenge is near "
log_msg = ("End for Weekly challenge is near "
f"(Recoverable until WC end {max_count}hp | want to do {count}hits)")
count = count if max_count > count else max_count
@ -1511,8 +1628,7 @@ class Citizen(CitizenAPI):
@property
def next_wc_start(self) -> datetime:
days = 1 - self.now.weekday() if 1 - self.now.weekday() > 0 else 1 - self.now.weekday() + 7
return good_timedelta(self.now.replace(hour=0, minute=0, second=0, microsecond=0),
timedelta(days=days))
return good_timedelta(self.now.replace(hour=0, minute=0, second=0, microsecond=0), timedelta(days=days))
@property
def time_till_week_change(self) -> timedelta:
@ -1582,12 +1698,17 @@ class Citizen(CitizenAPI):
if kind in kinds:
return self._post_main_write_article(title, content, self.details.citizenship, kind)
else:
raise ErepublikException(
"Article kind must be one of:\n{}\n'{}' is not supported".format(
"\n".join(["{}: {}".format(k, v) for k, v in kinds.items()]),
kind
)
)
raise ErepublikException("Article kind must be one of:\n{}\n'{}' is not supported".format(
"\n".join(["{}: {}".format(k, v) for k, v in kinds.items()]), kind
))
def get_my_market_offers(self) -> List[Dict[str, Union[int, float, str]]]:
ret = []
for offer in self._get_economy_my_market_offers().json():
line = offer.copy()
line.pop('icon', None)
ret.append(line)
return ret
def post_market_offer(self, industry: int, quality: int, amount: int, price: float) -> Response:
if industry not in self.available_industries.values():
@ -1618,32 +1739,15 @@ class Citizen(CitizenAPI):
self.reporter.report_action("BUY_PRODUCT", ret.json())
return json_ret
def get_raw_surplus(self) -> (float, float):
frm = 0.00
wrm = 0.00
for cdata in sorted(self.my_companies.companies.values()):
if cdata["industry_token"] == "FOOD":
raw = frm
elif cdata["industry_token"] == "WEAPON":
raw = wrm
else:
continue
effective_bonus = cdata["effective_bonus"]
base_prod = float(cdata["base_production"])
if cdata["is_raw"]:
raw += base_prod * effective_bonus / 100
else:
raw -= effective_bonus / 100 * base_prod * cdata["upgrades"][str(cdata["quality"])]["raw_usage"]
if cdata["industry_token"] == "FOOD":
frm = raw
elif cdata["industry_token"] == "WEAPON":
wrm = raw
return frm, wrm
def assign_factory_to_holding(self, factory_id: int, holding_id: int) -> Response:
"""
Assigns factory to new holding
"""
company = self.my_companies.companies[factory_id]
company_name = self.factories[company['industry_id']]
if not company['is_raw']:
company_name += f" q{company['quality']}"
self.write_log(f"{company_name} moved to {holding_id}")
return self._post_economy_assign_to_holding(factory_id, holding_id)
def upgrade_factory(self, factory_id: int, level: int) -> Response:
@ -1657,9 +1761,18 @@ class Citizen(CitizenAPI):
Storage={1000: 1, 2000: 2} <- Building_type 2
"""
company_name = self.factories[industry_id]
if building_type == 2:
company_name = f"Storage"
self.write_log(f"{company_name} created!")
return self._post_economy_create_company(industry_id, building_type)
def dissolve_factory(self, factory_id: int) -> Response:
company = self.my_companies.companies[factory_id]
company_name = self.factories[company['industry_id']]
if not company['is_raw']:
company_name += f" q{company['quality']}"
self.write_log(f"{company_name} dissolved!")
return self._post_economy_sell_company(factory_id, self.details.pin, sell=False)
@property
@ -1671,14 +1784,38 @@ class Citizen(CitizenAPI):
return {"food": 1, "weapon": 2, "house": 4, "aircraft": 23,
"foodRaw": 7, "weaponRaw": 12, "houseRaw": 17, "airplaneRaw": 24}
def get_industry_id(self, industry_name: str) -> int:
@property
def factories(self) -> Dict[int, str]:
"""Returns factory industries as dict(id: name)
:return: Factory id:name dict
":rtype: Dict[int, str]
"""
Returns industry id
return {1: "Food", 2: "Weapons", 4: "House", 23: "Aircraft",
7: "FRM q1", 8: "FRM q2", 9: "FRM q3", 10: "FRM q4", 11: "FRM q5",
12: "WRM q1", 13: "WRM q2", 14: "WRM q3", 15: "WRM q4", 16: "WRM q5",
18: "HRM q1", 19: "HRM q2", 20: "HRM q3", 21: "HRM q4", 22: "HRM q5",
24: "ARM q1", 25: "ARM q2", 26: "ARM q3", 27: "ARM q4", 28: "ARM q5", }
def get_industry_id(self, industry_name: str) -> int:
"""Returns industry id
:type industry_name: str
:return: int
"""
return self.available_industries.get(industry_name, 0)
def get_industry_name(self, industry_id: int) -> str:
"""Returns industry name from industry ID
:type industry_id: int
:return: industry name
:rtype: str
"""
for iname, iid in self.available_industries.items():
if iid == industry_id:
return iname
return ""
def buy_tg_contract(self) -> Response:
ret = self._post_main_buy_gold_items('gold', "TrainingContract2", 1)
self.reporter.report_action("BUY_TG_CONTRACT", ret.json())
@ -1801,12 +1938,12 @@ class Citizen(CitizenAPI):
if reg_re.findall(html):
ret.update(regions={}, can_attack=True)
for reg in reg_re.findall(html):
ret["regions"].update({str(reg[0]): reg[1]})
ret["regions"].update({int(reg[0]): reg[1]})
elif re.search(r'<a href="//www.erepublik.com/en/military/battlefield/(\d+)" '
r'class="join" title="Join"><span>Join</span></a>', html):
battle_id = re.search(r'<a href="//www.erepublik.com/en/military/battlefield/(\d+)" '
r'class="join" title="Join"><span>Join</span></a>', html).group(1)
ret.update(can_attack=False, battle_id=battle_id)
ret.update(can_attack=False, battle_id=int(battle_id))
elif re.search(r'This war is no longer active.', html):
ret.update(can_attack=False, ended=True)
else:
@ -1924,34 +2061,33 @@ class Citizen(CitizenAPI):
return {battle.invader.id: r.json().get(str(battle.invader.id)).get("fighterData"),
battle.defender.id: r.json().get(str(battle.defender.id)).get("fighterData")}
def contribute_cc_to_country(self, amount=0.) -> bool:
def contribute_cc_to_country(self, amount=0., country_id: int = 71) -> bool:
self.update_money()
amount = int(amount)
if self.details.cc < amount or amount < 20:
return False
data = dict(country=71, action='currency', value=amount)
self.telegram.send_message(f"Donated {amount}cc to {COUNTRIES[71]}")
self.reporter.report_action("CONTRIBUTE_CC", data)
data = dict(country=country_id, action='currency', value=amount)
self.reporter.report_action("CONTRIBUTE_CC", data, str(amount))
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
def contribute_food_to_country(self, amount: int = 0, quality: int = 1) -> bool:
def contribute_food_to_country(self, amount: int = 0, quality: int = 1, country_id: int = 71) -> bool:
self.update_inventory()
amount = amount // 1
if self.food["q" + str(quality)] < amount or amount < 10:
return False
data = dict(country=71, action='food', value=amount, quality=quality)
self.reporter.report_action("CONTRIBUTE_FOOD", data)
data = dict(country=country_id, action='food', value=amount, quality=quality)
self.reporter.report_action("CONTRIBUTE_FOOD", data, FOOD_ENERGY[quality] * amount)
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
def contribute_gold_to_country(self, amount: int) -> bool:
def contribute_gold_to_country(self, amount: int, country_id: int = 71) -> bool:
self.update_money()
if self.details.cc < amount:
return False
data = dict(country=71, action='gold', value=amount)
self.reporter.report_action("CONTRIBUTE_GOLD", data)
data = dict(country=country_id, action='gold', value=amount)
self.reporter.report_action("CONTRIBUTE_GOLD", data, str(amount))
r = self._post_main_country_donate(**data)
return r.json().get('status') or not r.json().get('error')
@ -1962,26 +2098,29 @@ class Citizen(CitizenAPI):
r = self._post_main_country_post_create(message, max(post_to_wall_as, key=int) if post_to_wall_as else 0)
return r.json()
def report_error(self, msg: str = ""):
process_error(msg, self.name, sys.exc_info(), self, self.commit_id, False)
def report_error(self, msg: str = "", is_warning: bool = False):
if is_warning:
process_warning(msg, self.name, sys.exc_info(), self, self.commit_id)
else:
process_error(msg, self.name, sys.exc_info(), self, self.commit_id, None)
def get_battle_top_10(self, battle_id: int) -> Dict[int, List[Tuple[int, int]]]:
battle = self.all_battles.get(battle_id)
round_id = battle.get('zone_id')
division = self.division if round_id % 4 else 11
resp = self._post_military_battle_console(battle_id, round_id, division).json()
resp.pop('rounds', None)
ret = dict()
for country_id, data in resp.items():
ret.update({int(country_id): []})
for place in sorted(data.get("fighterData", {}).values(), key=lambda _: -_['raw_value']):
ret[int(country_id)].append((place['citizenId'], place['raw_value']))
return ret
return {}
# battle = self.all_battles.get(battle_id)
# round_id = battle.zone_id
# division = self.division if round_id % 4 else 11
#
# resp = self._post_military_battle_console(battle_id, 'battleStatistics', round_id, division).json()
# resp.pop('rounds', None)
# ret = dict()
# for country_id, data in resp.items():
# ret.update({int(country_id): []})
# for place in sorted(data.get("fighterData", {}).values(), key=lambda _: -_['raw_value']):
# ret[int(country_id)].append((place['citizenId'], place['raw_value']))
# return ret
def to_json(self, indent: bool = False) -> str:
return dumps(self.__dict__, cls=MyJSONEncoder, indent=4 if indent else None, sort_keys=True)
return json.dumps(self.__dict__, cls=MyJSONEncoder, indent=4 if indent else None, sort_keys=True)
def get_game_token_offers(self):
r = self._post_economy_game_tokens_market('retrieve').json()
@ -2051,9 +2190,10 @@ class Citizen(CitizenAPI):
return self._get_military_show_weapons(battle_id).json()
def set_default_weapon(self, battle_id: int) -> int:
battle = self.all_battles[battle_id]
battle_zone = battle.div[11 if battle.is_air else self.division].battle_zone_id
battle = self.all_battles.get(battle_id)
available_weapons = self._get_military_show_weapons(battle_id).json()
while not isinstance(available_weapons, list):
available_weapons = self._get_military_show_weapons(battle_id).json()
weapon_quality = -1
if not battle.is_air:
for weapon in available_weapons:
@ -2063,7 +2203,12 @@ class Citizen(CitizenAPI):
return self.change_weapon(battle_id, weapon_quality)
def change_weapon(self, battle_id: int, weapon_quality: int) -> int:
battle = self.all_battles[battle_id]
battle = self.all_battles.get(battle_id)
battle_zone = battle.div[11 if battle.is_air else self.division].battle_zone_id
r = self._post_military_change_weapon(battle_id, battle_zone, weapon_quality)
return r.json().get('weaponInfluence')
def get_battle_for_war(self, war_id: int) -> Optional[Battle]:
self.update_war_info()
war_info = self.get_war_status(war_id)
return self.all_battles.get(war_info.get("battle_id"), None)

View File

@ -4,14 +4,18 @@ import hashlib
import random
import threading
import time
from collections import deque
from json import JSONDecodeError, loads, JSONEncoder
from typing import Any, Dict, List, Union, Mapping, Iterable, Tuple
from collections import defaultdict, deque
from typing import Any, Dict, Iterable, List, Mapping, Tuple, Union
from requests import Response, Session, post
from erepublik import utils
try:
import simplejson as json
except ImportError:
import json
class ErepublikException(Exception):
def __init__(self, message):
@ -40,7 +44,7 @@ class MyCompanies:
"""
:param holdings: Parsed JSON to dict from en/economy/myCompanies
"""
self.holdings = {}
self.holdings.clear()
template = dict(id=0, num_factories=0, region_id=0, companies=[])
for holding_id, holding in holdings.items():
@ -57,18 +61,20 @@ class MyCompanies:
"""
:param companies: Parsed JSON to dict from en/economy/myCompanies
"""
self.companies = {}
self.companies.clear()
template = dict(id=None, quality=0, is_raw=False, resource_bonus=0, effective_bonus=0, raw_usage=0,
production=0, base_production=0, wam_enabled=False, can_work_as_manager=False,
base_production=0, wam_enabled=False, can_work_as_manager=False, industry_id=0, todays_works=0,
preset_own_work=0, already_worked=False, can_assign_employees=False, preset_works=0,
todays_works=0, holding_company_id=None, is_assigned_to_holding=False,
cannot_work_as_manager_reason=False, industry_id=0)
holding_company_id=None, is_assigned_to_holding=False, cannot_work_as_manager_reason=False)
for c_id, company in companies.items():
tmp = {}
for key in template.keys():
if key in ['id', 'holding_company_id']:
company[key] = int(company[key])
elif key == "raw_usage":
if not company.get("is_raw") and company.get('upgrades'):
company[key] = company.get('upgrades').get(str(company["quality"])).get('raw_usage')
tmp.update({key: company[key]})
self.companies.update({int(c_id): tmp})
@ -76,9 +82,8 @@ class MyCompanies:
for company_id, company_data in self.companies.items():
if company_id not in self.holdings[company_data['holding_company_id']]['companies']:
self.holdings[company_data['holding_company_id']]['companies'].append(company_id)
else:
for holding_id in self.holdings:
self.holdings[holding_id]['companies'].sort()
for holding_id in self.holdings:
self.holdings[holding_id]['companies'].sort()
def get_employable_factories(self) -> Dict[int, int]:
ret = {}
@ -160,6 +165,31 @@ class MyCompanies:
raise ErepublikException("Wrong function call")
def get_wam_raw_usage(self) -> Dict[str, float]:
frm = 0.00
wrm = 0.00
for company in self.companies.values():
if company['wam_enabled']:
effective_bonus = float(company["effective_bonus"])
base_prod = float(company["base_production"])
raw = base_prod * effective_bonus / 100
if not company["is_raw"]:
raw *= -company["raw_usage"]
if company["industry_id"] in [1, 7, 8, 9, 10, 11]:
frm += raw
elif company["industry_id"] in [2, 12, 13, 14, 15, 16]:
wrm += raw
return {'frm': int(frm * 1000) / 1000, 'wrm': int(wrm * 1000) / 1000}
def __str__(self):
name = []
for holding_id in sorted(self.holdings.keys()):
if not holding_id:
name.append(f"Unassigned - {len(self.holdings[0]['companies'])}")
else:
name.append(f"{holding_id} - {len(self.holdings[holding_id]['companies'])}")
return " | ".join(name)
# @property
# def __dict__(self):
# ret = {}
@ -174,19 +204,19 @@ class SlowRequests(Session):
timeout = datetime.timedelta(milliseconds=500)
uas = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36',
# FireFox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:67.0) Gecko/20100101 Firefox/67.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0',
'Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0',
]
debug = False
@ -255,9 +285,9 @@ class SlowRequests(Session):
}
try:
loads(resp.text)
json.loads(resp.text)
file_data.update({"ext": "json"})
except JSONDecodeError:
except json.JSONDecodeError:
file_data.update({"ext": "html"})
filename = 'debug/requests/{time}_{name}{extra}.{ext}'.format(**file_data)
@ -330,6 +360,18 @@ class Config:
self.telegram_chat_id = 0
self.telegram_token = ""
@property
def __dict__(self):
return dict(email=self.email, work=self.work, train=self.train, wam=self.wam,
auto_sell=self.auto_sell, auto_sell_all=self.auto_sell_all, employees=self.employees,
fight=self.fight, air=self.air, ground=self.ground, all_in=self.all_in,
next_energy=self.next_energy, boosters=self.boosters, travel_to_fight=self.travel_to_fight,
always_travel=self.always_travel, epic_hunt=self.epic_hunt, epic_hunt_ebs=self.epic_hunt_ebs,
rw_def_side=self.rw_def_side, interactive=self.interactive,
continuous_fighting=self.continuous_fighting, auto_buy_raw=self.auto_buy_raw,
force_wam=self.force_wam, sort_battles_time=self.sort_battles_time, force_travel=self.force_travel,
telegram=self.telegram, telegram_chat_id=self.telegram_chat_id, telegram_token=self.telegram_token)
class Energy:
limit = 500 # energyToRecover
@ -541,10 +583,6 @@ Class for unifying eRepublik known endpoints and their required/optional paramet
def _get_military_campaigns_json_list(self) -> Response:
return self.get("{}/military/campaignsJson/list".format(self.url))
def _get_military_show_weapons(self, battle_id: int) -> Response:
params = {"_token": self.token, "battleId": battle_id}
return self.get("{}/military/show-weapons".format(self.url), params=params)
def _get_military_unit_data(self, unit_id: int, **kwargs) -> Response:
params = {"groupId": unit_id, "panel": "members", **kwargs}
return self.get("{}/military/military-unit-data/".format(self.url), params=params)
@ -599,6 +637,10 @@ Class for unifying eRepublik known endpoints and their required/optional paramet
data = dict(_token=self.token, sideCountryId=side_id, battleId=battle_id)
return self.post("{}/main/battlefieldTravel".format(self.url), data=data)
def _post_main_battlefield_change_division(self, battle_id: int, division_id: int) -> Response:
data = dict(_token=self.token, battleZoneId=division_id, battleId=battle_id)
return self.post("{}/main/battlefieldTravel".format(self.url), data=data)
def _post_main_buy_gold_items(self, currency: str, item: str, amount: int) -> Response:
data = dict(itemId=item, currency=currency, amount=amount, _token=self.token)
return self.post("{}/main/buyGoldItems".format(self.url), data=data)
@ -796,11 +838,6 @@ Class for unifying eRepublik known endpoints and their required/optional paramet
data.update(page=page)
return self.post("{}/military/battle-console".format(self.url), data=data)
def _post_military_change_weapon(self, battle_id: int, battle_zone_id: int, customization_level: int) -> Response:
data = dict(_token=self.token, battleZoneId=battle_zone_id, battleId=battle_id,
customizationLevel=customization_level)
return self.post("{}/military/change-weapon".format(self.url), data=data)
def _post_military_deploy_bomb(self, battle_id: int, bomb_id: int) -> Response:
data = dict(battleId=battle_id, bombId=bomb_id, _token=self.token)
return self.post("{}/military/deploy-bomb".format(self.url), data=data)
@ -1024,16 +1061,16 @@ class Reporter:
self.__to_update.append(json_data)
class MyJSONEncoder(JSONEncoder):
class MyJSONEncoder(json.JSONEncoder):
def default(self, o):
from erepublik.citizen import Citizen
if isinstance(o, decimal.Decimal):
return float("{:.02f}".format(o))
elif isinstance(o, datetime.datetime):
return dict(__type__='datetime', year=o.year, month=o.month, day=o.day, hour=o.hour, minute=o.minute,
second=o.second, microsecond=o.microsecond, tzinfo=o.tzinfo.zone if o.tzinfo else None)
return dict(__type__='datetime', date=o.strftime("%Y-%m-%d"), time=o.strftime("%H:%M:%S"),
tzinfo=o.tzinfo.zone if o.tzinfo else None)
elif isinstance(o, datetime.date):
return dict(__type__='date', year=o.year, month=o.month, day=o.day)
return dict(__type__='date', date=o.strftime("%Y-%m-%d"))
elif isinstance(o, datetime.timedelta):
return dict(__type__='timedelta', days=o.days, seconds=o.seconds,
microseconds=o.microseconds, total_seconds=o.total_seconds())
@ -1074,17 +1111,25 @@ class BattleDivision:
def div_end(self) -> bool:
return utils.now() >= self.end
def __init__(
self, div_id: int, end: datetime.datetime, epic: bool, inv_pts: int, def_pts: int,
wall_for: int, wall_dom: float, def_medal: Tuple[int, int], inv_medal: Tuple[int, int]
):
self.battle_zone_id = div_id
self.end = end
self.epic = epic
self.dom_pts = dict({"inv": inv_pts, "def": def_pts})
self.wall = dict({"for": wall_for, "dom": wall_dom})
self.def_medal = {"id": def_medal[0], "dmg": def_medal[1]}
self.inv_medal = {"id": inv_medal[0], "dmg": inv_medal[1]}
def __init__(self, **kwargs):
"""Battle division helper class
:param kwargs: must contain keys:
div_id: int, end: datetime.datetime, epic: bool, inv_pts: int, def_pts: int,
wall_for: int, wall_dom: float, def_medal: Tuple[int, int], inv_medal: Tuple[int, int]
"""
self.battle_zone_id = kwargs.get("div_id", 0)
self.end = kwargs.get("end", 0)
self.epic = kwargs.get("epic", 0)
self.dom_pts = dict({"inv": kwargs.get("inv_pts", 0), "def": kwargs.get("def_pts", 0)})
self.wall = dict({"for": kwargs.get("wall_for", 0), "dom": kwargs.get("wall_dom", 0)})
self.def_medal = {"id": kwargs.get("def_medal", 0)[0], "dmg": kwargs.get("def_medal", 0)[1]}
self.inv_medal = {"id": kwargs.get("inv_medal", 0)[0], "dmg": kwargs.get("inv_medal", 0)[1]}
@property
def id(self):
return self.battle_zone_id
class Battle:
@ -1103,23 +1148,32 @@ class Battle:
return not bool(self.zone_id % 4)
def __init__(self, battle: Dict[str, Any]):
self.id = int(battle.get('id', 0))
self.war_id = int(battle.get('war_id', 0))
self.zone_id = int(battle.get('zone_id', 0))
"""Object representing eRepublik battle.
:param battle: Dict object for single battle from '/military/campaignsJson/list' response's 'battles' object
"""
self.id = int(battle.get('id'))
self.war_id = int(battle.get('war_id'))
self.zone_id = int(battle.get('zone_id'))
self.is_rw = bool(battle.get('is_rw'))
self.is_as = bool(battle.get('is_as'))
self.is_dict_lib = bool(battle.get('is_dict')) or bool(battle.get('is_lib'))
self.start = datetime.datetime.fromtimestamp(int(battle.get('start', 0)), tz=utils.erep_tz)
self.invader = BattleSide(battle.get('inv', {}).get('id'), battle.get('inv', {}).get('points'),
[row.get('id') for row in battle.get('inv', {}).get('ally_list')],
[row.get('id') for row in battle.get('inv', {}).get('ally_list') if row['deployed']])
self.invader = BattleSide(
battle.get('inv', {}).get('id'), battle.get('inv', {}).get('points'),
[row.get('id') for row in battle.get('inv', {}).get('ally_list')],
[row.get('id') for row in battle.get('inv', {}).get('ally_list') if row['deployed']]
)
self.defender = BattleSide(battle.get('def', {}).get('id'), battle.get('def', {}).get('points'),
[row.get('id') for row in battle.get('def', {}).get('ally_list')],
[row.get('id') for row in battle.get('def', {}).get('ally_list') if row['deployed']])
self.defender = BattleSide(
battle.get('def', {}).get('id'), battle.get('def', {}).get('points'),
[row.get('id') for row in battle.get('def', {}).get('ally_list')],
[row.get('id') for row in battle.get('def', {}).get('ally_list') if row['deployed']]
)
self.div = {}
self.div = defaultdict(BattleDivision)
for div, data in battle.get('div', {}).items():
div = int(data.get('div'))
if data.get('end'):
@ -1146,13 +1200,14 @@ class Battle:
now = utils.now()
is_started = self.start < utils.now()
if is_started:
time_part = "{}".format(now - self.start)
time_part = " {}".format(now - self.start)
else:
time_part = "- {}".format(self.start - now)
time_part = "-{}".format(self.start - now)
return f"Battle {self.id} | " \
f"{utils.COUNTRIES[self.invader.id]:>21.21}:{utils.COUNTRIES[self.defender.id]:<21.21} | " \
f"{utils.ISO_CC[self.invader.id]} : {utils.ISO_CC[self.defender.id]} | " \
f"Round {self.zone_id:2} | " \
f"Time since start {time_part}"
f"Round time {time_part}"
class EnergyToFight:
@ -1203,6 +1258,7 @@ class TelegramBot:
self.__queue = []
self.__thread_stopper = threading.Event() if stop_event is None else stop_event
@property
def __dict__(self):
return dict(chat_id=self.chat_id, api_url=self.api_url, player=self.player_name, last_time=self._last_time,
next_time=self._next_time, queue=self.__queue, initialized=self.__initialized,

View File

@ -1,6 +1,5 @@
import datetime
import inspect
import json
import os
import re
import sys
@ -8,16 +7,26 @@ import time
import traceback
import unicodedata
from pathlib import Path
from typing import Union, Any, List, NoReturn, Mapping
from typing import Any, List, Mapping, NoReturn, Optional, Union
import pytz
import requests
try:
import simplejson as json
except ImportError:
import json
__all__ = ["FOOD_ENERGY", "COMMIT_ID", "COUNTRIES", "erep_tz", 'COUNTRY_LINK',
"now", "localize_dt", "localize_timestamp", "good_timedelta", "eday_from_date", "date_from_eday",
"get_sleep_seconds", "interactive_sleep", "silent_sleep",
"write_silent_log", "write_interactive_log", "get_file", "write_file",
"send_email", "normalize_html_json", "process_error", 'report_promo', 'calculate_hit']
"send_email", "normalize_html_json", "process_error", "process_warning", 'report_promo', 'calculate_hit']
if not sys.version_info >= (3, 7):
raise AssertionError('This script requires Python version 3.7 and higher\n'
'But Your version is v{}.{}.{}'.format(*sys.version_info))
FOOD_ENERGY = dict(q1=2, q2=4, q3=6, q4=8, q5=10, q6=12, q7=20)
COMMIT_ID = "7b92e19"
@ -93,6 +102,15 @@ COUNTRY_LINK = {1: 'Romania', 9: 'Brazil', 11: 'France', 12: 'Germany', 13: 'Hun
81: 'Republic-of-China-Taiwan', 166: 'United-Arab-Emirates', 167: 'Albania', 69: 'Bosnia-Herzegovina',
169: 'Armenia', 83: 'Belarus', 84: 'New-Zealand', 164: 'Saudi-Arabia', 170: 'Nigeria', }
ISO_CC = {1: 'ROU', 9: 'BRA', 10: 'ITA', 11: 'FRA', 12: 'DEU', 13: 'HUN', 14: 'CHN', 15: 'ESP', 23: 'CAN', 24: 'USA',
26: 'MEX', 27: 'ARG', 28: 'VEN', 29: 'GBR', 30: 'CHE', 31: 'NLD', 32: 'BEL', 33: 'AUT', 34: 'CZE', 35: 'POL',
36: 'SVK', 37: 'NOR', 38: 'SWE', 39: 'FIN', 40: 'UKR', 41: 'RUS', 42: 'BGR', 43: 'TUR', 44: 'GRC', 45: 'JPN',
47: 'KOR', 48: 'IND', 49: 'IDN', 50: 'AUS', 51: 'ZAF', 52: 'MDA', 53: 'PRT', 54: 'IRL', 55: 'DNK', 56: 'IRN',
57: 'PAK', 58: 'ISR', 59: 'THA', 61: 'SVN', 63: 'HRV', 64: 'CHL', 65: 'SRB', 66: 'MYS', 67: 'PHL', 68: 'SGP',
69: 'BiH', 70: 'EST', 71: 'LVA', 72: 'LTU', 73: 'PRK', 74: 'URY', 75: 'PRY', 76: 'BOL', 77: 'PER', 78: 'COL',
79: 'MKD', 80: 'MNE', 81: 'TWN', 82: 'CYP', 83: 'BLR', 84: 'NZL', 164: 'SAU', 165: 'EGY', 166: 'UAE',
167: 'ALB', 168: 'GEO', 169: 'ARM', 170: 'NGA', 171: 'CUB'}
def now() -> datetime.datetime:
return datetime.datetime.now(erep_tz).replace(microsecond=0)
@ -113,6 +131,15 @@ def localize_dt(dt: Union[datetime.date, datetime.datetime]) -> datetime.datetim
def good_timedelta(dt: datetime.datetime, td: datetime.timedelta) -> datetime.datetime:
"""Normalize timezone aware datetime object after timedelta to correct jumps over DST switches
:param dt: Timezone aware datetime object
:type dt: datetime.datetime
:param td: timedelta object
:type td: datetime.timedelta
:return: datetime object with correct timezone when jumped over DST
:rtype: datetime.datetime
"""
return erep_tz.normalize(dt + td)
@ -288,10 +315,43 @@ def normalize_html_json(js: str) -> str:
def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None,
interactive: bool = False):
interactive: Optional[bool] = None):
"""
Process error logging and email sending to developer
:param interactive: Should print interactively
:type interactive: bool
:param log_info: String to be written in output
:type log_info: str
:param name: String Instance name
:type name: str
:param exc_info: tuple output from sys.exc_info()
:type exc_info: tuple
:param citizen: Citizen instance
:type citizen: Citizen
:param commit_id: Code's version by commit id
:type commit_id: str
"""
type_, value_, traceback_ = exc_info
content = [log_info]
if commit_id:
content += ["Commit id: %s" % commit_id]
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
if interactive:
write_interactive_log(log_info)
elif interactive is not None:
write_silent_log(log_info)
trace = inspect.trace()
if trace:
trace = trace[-1][0].f_locals
else:
trace = dict()
send_email(name, content, citizen, local_vars=trace)
def process_warning(log_info: str, name: str, exc_info: tuple, citizen=None, commit_id: str = None):
"""
Process error logging and email sending to developer
:param log_info: String to be written in output
:param name: String Instance name
:param exc_info: tuple output from sys.exc_info()
@ -299,19 +359,17 @@ def process_error(log_info: str, name: str, exc_info: tuple, citizen=None, commi
:param commit_id: Code's version by commit id
"""
type_, value_, traceback_ = exc_info
bugtrace = [] if not commit_id else ["Commit id: %s" % commit_id, ]
bugtrace += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
content = [log_info]
if commit_id:
content += ["Commit id: %s" % commit_id]
content += [str(value_), str(type_), ''.join(traceback.format_tb(traceback_))]
if interactive:
write_interactive_log(log_info)
else:
write_silent_log(log_info)
trace = inspect.trace()
if trace:
trace = trace[-1][0].f_locals
else:
trace = dict()
send_email(name, bugtrace, citizen, local_vars=trace)
send_email(name, content, citizen, local_vars=trace)
def report_promo(kind: str, time_untill: datetime.datetime) -> NoReturn:

View File

@ -0,0 +1,96 @@
import threading
from datetime import timedelta
from erepublik import Citizen, utils
CONFIG = {
'email': 'player@email.com',
'password': 'Pa$5w0rd!',
'interactive': True,
'fight': True,
'debug': True,
'start_battles': {
121672: {"auto_attack": False, "regions": [661]},
125530: {"auto_attack": False, "regions": [259]},
125226: {"auto_attack": True, "regions": [549]},
124559: {"auto_attack": True, "regions": [176]}
}
}
def _battle_launcher(player: Citizen):
"""Launch battles. Check every 5th minute (0,5,10...45,50,55) if any battle could be started on specified regions
and after launching wait for 90 minutes before starting next attack so that all battles aren't launched at the same
time. If player is allowed to fight, do 100 hits on the first round in players division.
:param player: Logged in Citizen instance
":type player: Citizen
"""
global CONFIG
finished_war_ids = {*[]}
war_data = CONFIG.get('start_battles', {})
war_ids = {int(war_id) for war_id in war_data.keys()}
next_attack_time = player.now
next_attack_time = next_attack_time.replace(minute=next_attack_time.minute // 5 * 5, second=0)
while not player.stop_threads.is_set():
try:
attacked = False
player.update_war_info()
running_wars = {b.war_id for b in player.all_battles.values()}
for war_id in war_ids - finished_war_ids - running_wars:
war = war_data[str(war_id)]
war_regions = set(war.get('regions'))
auto_attack = war.get('auto_attack')
status = player.get_war_status(war_id)
if status.get('ended', False):
CONFIG['start_battles'].pop(str(war_id), None)
finished_war_ids.add(war_id)
continue
elif not status.get('can_attack'):
continue
if auto_attack or (player.now.hour > 20 or player.now.hour < 2):
for reg in war_regions:
if attacked:
break
if reg in status.get('regions', {}).keys():
player.launch_attack(war_id, reg, status.get('regions', {}).get(reg))
attacked = True
hits = 100
if player.energy.food_fights >= hits and player.config.fight:
for _ in range(120):
player.update_war_info()
battle_id = player.get_war_status(war_id).get("battle_id")
if battle_id is not None and battle_id in player.all_battles:
player.fight(battle_id, player.details.citizenship, hits)
break
player.sleep(1)
if attacked:
break
if attacked:
break
war_ids -= finished_war_ids
if attacked:
next_attack_time = utils.good_timedelta(next_attack_time, timedelta(hours=1, minutes=30))
else:
next_attack_time = utils.good_timedelta(next_attack_time, timedelta(minutes=5))
player.stop_threads.wait(utils.get_sleep_seconds(next_attack_time))
except:
player.report_error("Task error: start_battles")
def main():
player = Citizen(email=CONFIG['email'], password=CONFIG['password'], auto_login=False)
player.config.interactive = CONFIG['interactive']
player.config.fight = CONFIG['fight']
player.set_debug(CONFIG.get('debug', False))
player.login()
if CONFIG.get('start_battles'):
name = "{}-start_battles-{}".format(player.name, threading.active_count() - 1)
state_thread = threading.Thread(target=_battle_launcher, args=(player,), name=name)
state_thread.start()
if __name__ == "__main__":
main()

102
examples/eat_work_train.py Normal file
View File

@ -0,0 +1,102 @@
from datetime import timedelta
from erepublik import Citizen, utils
CONFIG = {
'email': 'player@email.com',
'password': 'Pa$5w0rd!',
'interactive': True,
'debug': True
}
def main():
player = Citizen(email=CONFIG['email'], password=CONFIG['password'], auto_login=False)
player.config.interactive = CONFIG['interactive']
player.config.fight = CONFIG['fight']
player.set_debug(CONFIG.get('debug', False))
player.login()
now = player.now.replace(second=0, microsecond=0)
dt_max = now.replace(year=9999)
tasks = {
'eat': now,
}
if player.config.work:
tasks.update({'work': now})
if player.config.train:
tasks.update({'train': now})
if player.config.ot:
tasks.update({'ot': now})
if player.config.wam:
tasks.update({'wam': now.replace(hour=14, minute=0)})
while True:
player.update_all()
if tasks.get('work', dt_max) <= now:
player.write_log("Doing task: work")
player.update_citizen_info()
player.work()
if player.config.ot:
tasks['ot'] = now
player.collect_daily_task()
next_time = utils.good_timedelta(now.replace(hour=0, minute=0, second=0), timedelta(days=1))
tasks.update({'work': next_time})
if tasks.get('train', dt_max) <= now:
player.write_log("Doing task: train")
player.update_citizen_info()
player.train()
player.collect_daily_task()
next_time = utils.good_timedelta(now.replace(hour=0, minute=0, second=0), timedelta(days=1))
tasks.update({'train': next_time})
if tasks.get('wam', dt_max) <= now:
player.write_log("Doing task: Work as manager")
success = player.work_wam()
player.eat()
if success:
next_time = utils.good_timedelta(now.replace(hour=14, minute=0, second=0, microsecond=0),
timedelta(days=1))
else:
next_time = utils.good_timedelta(now.replace(second=0, microsecond=0), timedelta(minutes=30))
tasks.update({'wam': next_time})
if tasks.get('eat', dt_max) <= now:
player.write_log("Doing task: eat")
player.eat()
if player.energy.food_fights > player.energy.limit // 10:
next_minutes = 12
else:
next_minutes = (player.energy.limit - 5 * player.energy.interval) // player.energy.interval * 6
next_time = player.energy.reference_time + timedelta(minutes=next_minutes)
tasks.update({'eat': next_time})
if tasks.get('ot', dt_max) <= now:
player.write_log("Doing task: ot")
if now > player.my_companies.next_ot_time:
player.work_ot()
next_time = now + timedelta(minutes=60)
else:
next_time = player.my_companies.next_ot_time
tasks.update({'ot': next_time})
closest_next_time = dt_max
next_tasks = []
for task, next_time in sorted(tasks.items(), key=lambda s: s[1]):
next_tasks.append("{}: {}".format(next_time.strftime('%F %T'), task))
if next_time < closest_next_time:
closest_next_time = next_time
sleep_seconds = int(utils.get_sleep_seconds(closest_next_time))
if sleep_seconds <= 0:
player.write_log(f"Loop detected! Offending task: '{next_tasks[0]}'")
player.write_log("My next Tasks and there time:\n" + "\n".join(sorted(next_tasks)))
player.write_log("Sleeping until (eRep): {} (sleeping for {}s)".format(
closest_next_time.strftime("%F %T"), sleep_seconds))
seconds_to_sleep = sleep_seconds if sleep_seconds > 0 else 0
player.sleep(seconds_to_sleep)
if __name__ == "__main__":
main()

View File

@ -1,14 +1,16 @@
pip==19.1.1
bumpversion==0.5.3
wheel==0.33.4
watchdog==0.9.0
flake8==3.7.8
tox==3.13.2
coverage==4.5.3
Sphinx==2.2.0
twine==2.0.0
ipython
PyInstaller
pytz==2019.1
coverage==5.0.2
edx-sphinx-theme==1.5.0
flake8==3.7.9
ipython==7.11.1
isort==4.3.21
pip==19.3.1
PyInstaller==3.5
pytz==2019.3
requests==2.22.0
edx-sphinx-theme
setuptools==44.0.0
Sphinx==2.3.1
tox==3.14.3
twine==3.1.1
watchdog==0.9.0
wheel==0.33.6

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.17.2
current_version = 0.19.1
commit = True
tag = True

View File

@ -11,7 +11,7 @@ with open('README.rst') as readme_file:
with open('HISTORY.rst') as history_file:
history = history_file.read()
requirements = ['pytz>=2019.2', 'requests>=2.22']
requirements = ['pytz==2019.3', 'requests==2.22.0']
setup_requirements = []
@ -27,6 +27,7 @@ setup(
'Natural Language :: English',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
description="Python package for automated eRepublik playing",
entry_points={},
@ -42,6 +43,6 @@ setup(
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/eeriks/erepublik/',
version='0.17.2',
version='0.19.1',
zip_safe=False,
)