This commit is contained in:
2022-07-04 10:44:45 +03:00
commit 270a9bea57
35 changed files with 8449 additions and 0 deletions

512
ebot/__deprecated.py Normal file
View File

@ -0,0 +1,512 @@
# def task_fight(self):
# if self.restricted_ip:
# self.tasks.pop("fight")
# return
# elif self.now > utils.localize_dt(datetime(2021, 2, 8)):
# self.write_warning("Fight is now disabled")
# self.tasks.pop("fight")
# return
#
# self.write_log(self.health_info)
#
# count, log_msg, force_fight = self.should_fight(False)
#
# if count or force_fight:
# fought = False
# if self.tasks.get_default("cheap_medals", False):
# cheap_medals = self.get_cheap_tp_divisions()
# air_div = ground_div = None
# if self.config.air and cheap_medals["air"]:
# air_div = cheap_medals["air"][0]
# if self.config.ground and cheap_medals["ground"]:
# ground_div = cheap_medals["ground"][0]
# if air_div and (not air_div[0] or not self.config.ground):
# div_data = air_div
# else:
# div_data = ground_div
# if div_data:
# medal_damage, division = div_data
# self.write_log(f"Chose {division} with {medal_damage}dmg medal")
# if self.change_division(division.battle, division):
# side: BattleSide
# if division.battle.defender.country == self.details.citizenship:
# side = division.battle.defender
# else:
# side = division.battle.invader
#
# air = int(division.is_air)
# tgt_dmg = DMG_MAP[division.div][side.is_defender]
# hit = self.get_air_hit_dmg_value() if air else self.get_ground_hit_dmg_value()
#
# tgt_dmg = tgt_dmg if tgt_dmg > medal_damage else medal_damage
#
# hits = min((self.energy.food_fights - 5, max((count, int(tgt_dmg / hit) + 1))))
# self.set_default_weapon(division.battle, division)
#
# if self.details.current_country not in side.deployed + [side.country]:
# self.travel_to_battle(division.battle, side.deployed + [side.country])
# fought = bool(self.fight(division.battle, division, side, hits + 2))
# battle_count = len(cheap_medals["air"]) + len(cheap_medals["ground"])
# self.report_action(
# "CHEAP MEDAL CHOOSER",
# f"Had {battle_count} options and chose {div_data}",
# kwargs=dict(cheap_medals=cheap_medals),
# )
# if not fought:
# self.find_battle_and_fight()
#
# self.travel_to_residence()
# self.update_weekly_challenge()
#
# self.tasks["fight"] = self._get_next_fight_time()
# def _get_required_fight_energy(self) -> int:
# def _check_energy(__ec: List[Tuple[int, str]], __ne: int, __msg: str) -> int:
# __ec.append((__ne, __msg))
# return min(__ec, key=lambda _x: _x[0])[0]
#
# e_checks: List[Tuple[int, str]] = []
# # Default - fight when full energy
# energy = self.energy.limit * 2 - self.energy.interval * 2
# e_checks.append((energy, f"Full energy: {self.energy.limit} * 2 - {self.energy.interval} * 2"))
#
# if self.is_levelup_reachable:
# energy = 2 * (self.energy.limit - self.energy.interval)
# msg = f"Levelup reachable: 2 * ({self.energy.limit} - {self.energy.interval})"
# e_checks.append((energy, msg))
# elif self.is_levelup_close:
# energy = (self.details.xp_till_level_up + 5) * 10 - self.energy.limit
# msg = f"Near level up: ({self.details.xp_till_level_up} + 5) * 10 - {self.energy.limit}"
# e_checks.append((energy, msg))
# else:
# # Obligatory need 75pp
# if self.details.pp < 75:
# energy = _check_energy(e_checks, 75 - self.details.pp, f"Obligatory need 75pp: 75 - {self.details.pp}")
#
# # Continuous fighting
# if self.config.continuous_fighting and self.has_battle_contribution:
# energy = _check_energy(e_checks, self.energy.interval, f"Continuous_fighting: {self.energy.interval}")
#
# # All-in for AIR battles
# if self.config.all_in:
# needed = self.energy.limit * 2 - self.energy.interval * 3
# energy = _check_energy(e_checks, needed, f"All-in: {needed}")
#
# # Get to next Energy +1
# if self.config.next_energy and self.details.next_pp:
# self.details.next_pp.sort()
# energy = _check_energy(
# e_checks,
# (self.details.next_pp[0] - self.details.pp) * 10,
# f"Get to next Energy +1: ({self.details.next_pp}[0] - {self.details.pp}) * 10",
# )
# for e, i in e_checks:
# self.logger.debug(f"{e}hp {i}")
# return energy
# def _get_next_fight_time(self) -> datetime:
# needed_energy = self._get_required_fight_energy() - self.energy.energy
# if needed_energy > 0:
# next_minutes = max([6, needed_energy // self.energy.interval * 6])
# else:
# next_minutes = 0
# next_wc_start_fight_time = _norm(self.next_wc_start + timedelta(minutes=6))
# next_fight_time = _norm(self.energy.reference_time + timedelta(minutes=next_minutes))
#
# return min((next_wc_start_fight_time, next_fight_time))
# def task_epic_hunt(self):
# if self.restricted_ip:
# self.tasks.pop("epic_hunt")
# return
# elif self.now > utils.localize_dt(datetime(2021, 2, 8)):
# self.write_warning("Fight is now disabled")
# self.tasks.pop("fight")
# return
#
# self.update_war_info()
# epic_divs: Generator[BattleDivision, Any, None] = (
# div for battle in self.all_battles.values() for div in battle.div.values() if div.epic
# )
#
# for div in epic_divs:
# if div.div_end:
# continue
# air = bool(div.is_air and self.config.air)
# ground = bool(self.config.ground and bool(div.div == self.division or self.maverick))
# if air or ground:
# target_div = div
# battle = target_div.battle
# invaders = [battle.invader.country] + battle.invader.deployed
# side = battle.invader if self.details.citizenship in invaders else battle.defender
# self.change_division(target_div.battle, target_div)
#
# if self.details.current_country not in side.deployed + [side.country]:
# self.travel_to_battle(battle, side.deployed + [side.country])
# self.fight(target_div.battle, target_div, side, self.energy.food_fights)
# self.travel_to_residence()
# break
# def _get_hits_for_dmg(self, is_tp: bool, damage: int) -> int:
# booster = self.get_active_ground_damage_booster()
# booster_kwargs = {}
# if booster:
# booster_kwargs.update({f"booster_{booster}": True})
# hit_dmg = self.get_ground_hit_dmg_value(tp=is_tp, **booster_kwargs)
# return int(damage / hit_dmg + 1)
# def get_medal_damage(self, division: BattleDivision, side: BattleSide):
# medal = self.get_battle_round_data(division)
# medal = medal[side.is_defender]
# if medal:
# return medal.get("1").get("raw_value")
# return 0
# def _pay_for_hit(self, division: BattleDivision, is_defender: bool):
# hit_dmg = self.get_division_max_hit(division)
# max_hit_fighter_id = None
# hit_price = self._default_pay_table[division.div]
# if not division.div == self.division and hit_dmg != 10_000:
# data = self.get_battle_round_data(division)[is_defender]
# for fighter in data.values():
# hover_card_data = self._get_main_citizen_hovercard(fighter.get("citizenId")).json()
# hc_md = hover_card_data["fighterInfo"]["military"]
# if not hc_md["division"] == division.div:
# continue
# base_hit = hc_md["damagePerHit"]
# manual_base_hit = utils.calculate_hit(
# hc_md["strength"],
# hc_md["rank"],
# hover_card_data["citizenship"]["id"] == self.details.citizenship,
# False,
# False,
# )
# if int(hit_dmg) in [int(base_hit), int(manual_base_hit)] or abs(base_hit - hit_dmg) <= 2:
# max_hit_fighter_id = fighter.get("citizenId")
# break
# else:
# for fighter in data.values():
# hover_card_data = self._get_main_citizen_hovercard(fighter.get("citizenId")).json()
# if hover_card_data["fighterInfo"]["military"]["division"] == division.div:
# max_hit_fighter_id = fighter.get("citizenId")
# break
# else:
# self.report_error("Unable to find player to whom I should pay for the hit!")
# if max_hit_fighter_id:
# if max_hit_fighter_id in self._preset_pay_table:
# hit_price = self._preset_pay_table[max_hit_fighter_id]
# else:
# fighter_profile = self.get_citizen_profile(max_hit_fighter_id)
# about_me_price = re.findall(r"(\d{3,})", fighter_profile["aboutMe"])
# if about_me_price:
# hit_price = int(min(about_me_price, key=int))
# if max_hit_fighter_id in self._preset_pay_table:
# if self._hits_to_pay[max_hit_fighter_id] + hit_price < 10 * hit_price and not self.stop_threads.is_set():
# self._hits_to_pay[max_hit_fighter_id] += hit_price
# hit_price = 0
# self.report_action(
# "PAYMENT_AGGREGATION",
# f"Current debt for player #{max_hit_fighter_id} is {self._hits_to_pay[max_hit_fighter_id]}cc",
# _hits_to_pay=self._hits_to_pay,
# )
# else:
# hit_price = self._hits_to_pay[max_hit_fighter_id] + hit_price
# self.report_action(
# "PAYMENT_AGGREGATION",
# f"Debt for player #{max_hit_fighter_id} is cleared ({hit_price}cc)",
# _hits_to_pay=self._hits_to_pay,
# )
# self._hits_to_pay[max_hit_fighter_id] = 0
# if hit_price and max_hit_fighter_id:
# # self.write_log(f"self.donate_money({max_hit_fighter_id}, {hit_price}, 1)")
# self.donate_money(max_hit_fighter_id, hit_price, 1)
# return True
# return False
# def _sub_task_fight_in_empty_battle(self, queue: List[_EmptyMedal], _processed_divisions: Set[int]):
# while not self.stop_threads.is_set():
# try:
# if self.stop_threads.is_set():
# return
# try:
# queue.sort(key=attrgetter("time", "round", "division_id"))
# next_medal = queue.pop(0)
# except IndexError:
# next_medal = None
#
# if next_medal is not None:
# self.stop_threads.wait(utils.get_sleep_seconds(next_medal.time))
# if self.stop_threads.is_set():
# break
# elif self.now > utils.localize_dt(datetime(2021, 2, 8)):
# self.write_warning("Fight is now disabled")
# return
# try:
# self.update_war_info()
# self._update_lock.clear()
# try:
# battle: Optional[Battle] = self.all_battles[next_medal.battle_id]
# except KeyError:
# self.report_error(
# f"Battle '{next_medal.battle_id}' not found in all_battles "
# f"(Medal data: {next_medal})",
# sentry=False,
# )
# self.report_action(
# "EMPTY_BH_ERROR",
# f"Error while checking {next_medal.battle_id}\n"
# f"https://www.erepublik.com/en/military/battlefield/{next_medal.battle_id}",
# )
# raise ErepublikException("No battle data!")
#
# if battle is None:
# raise ErepublikException("No battle or division data!")
# elif battle.zone_id != next_medal.round:
# raise ErepublikException("Wrong battle round!")
#
# try:
# division: Optional[BattleDivision] = battle.div[next_medal.division_id]
# except KeyError:
# self.report_error(
# f"Division '{next_medal.division_id}' not found in "
# f"battle.div (Medal data: {next_medal})",
# sentry=False,
# )
# self.report_action("EMPTY_BH_ERROR", f"Error while checking {repr(battle)}\n{battle.link}")
# raise ErepublikException("No division data!")
#
# if division is None:
# raise ErepublikException("No battle or division data!")
# elif division.is_air:
# raise ErepublikException("Air battle!")
# elif division.div_end:
# raise ErepublikException("Division finished")
#
# if not self.maverick and not division.div == self.division:
# raise ErepublikException("Don't have a MP can't fight in non native divisions!")
# side = battle.defender if next_medal.defender_side else battle.invader
#
# try:
# if self.details.current_country not in side.deployed + [side.country] or battle.is_rw:
# self.change_division(battle, division, side)
# medal_damage = self.get_medal_damage(division, side)
# except AttributeError:
# battle = self.all_battles.get(next_medal.battle_id)
# side = battle.defender if next_medal.defender_side else battle.invader
# if self.details.current_country not in side.deployed + [side.country] or battle.is_rw:
# self.change_division(battle, division, side)
# medal_damage = self.get_medal_damage(division, side)
#
# if medal_damage <= DMG_MAP[division.div][next_medal.defender_side] / 2:
# damage_amount = DMG_MAP[division.div][next_medal.defender_side]
# if division.div != self.division: # Maverick
# hit = self.get_division_max_hit(division)
# if hit > 200_000:
# my_profile = self._get_main_citizen_profile_json(self.details.citizen_id).json()
# rang = my_profile["military"]["militaryData"]["ground"]["rankNumber"]
# active_booster = self.get_active_ground_damage_booster()
# hit_dmg = utils.get_final_hit_dmg(
# hit, rang, True, self.details.is_elite, booster=active_booster
# )
# ground_hits = int(damage_amount / hit_dmg + Decimal("0.49"))
# else:
# ground_hits = self.energy.limit * 2
#
# else:
# ground_hits = self._get_hits_for_dmg(True, damage_amount)
# fight = False
# hits_ok = self.my_companies.ff_lockdown < ground_hits < self.energy.food_fights
# if not hits_ok and division.div != self.division:
# hits_ok = self.my_companies.ff_lockdown < ground_hits < self.energy.food_fights
# if hits_ok:
# self._update_lock.set()
# self.change_division(battle, division, side)
# fight = bool(self.fight(battle, division, side, ground_hits))
# if fight and not division.div == self.division:
# self._pay_for_hit(division, side.is_defender)
# elif next_medal.second_attempt:
# bombs = self.inventory.final.get("bomb", {})
# if 215 in bombs:
# bomb_id = 215
# bomb_data = bombs.get(215)
# for bomb_id, bomb_data in bombs.items():
# if (bomb_id == 21 and division.div != self.division) or bomb_id == 216:
# continue
# if bomb_data.get("fire_power"):
# bombs_required = int(damage_amount / bomb_data.get("fire_power"))
# if bomb_data.get("amount") >= bombs_required:
# self._update_lock.set()
# fight = True
# if self.details.current_country not in side.deployed + [side.country]:
# self.travel_to_battle(battle, side.deployed + [side.country])
# self.change_division(battle, division)
# bombs_deployed = self.deploy_bomb(
# battle, division, bomb_id, not side.is_defender, bombs_required
# )
# self.report_action(
# "CLEARED_MEDAL",
# f"Deployed {bombs_deployed} bombs in "
# f"{division.div} in battle {battle.id}",
# extra_info=dict(
# battle=battle,
# division=division,
# next_medal=next_medal,
# bombs_deployed=bombs_deployed,
# bomb_data=bomb_data,
# ),
# )
# # break
# if fight:
# self.report_action(
# "EMPTY_BH", f"Cleared empty d{division.div} BH in {battle}", battle=battle.as_dict
# )
# elif not next_medal.second_attempt:
# next_medal.second_attempt = True
# next_medal.time = _norm(self.now + timedelta(minutes=10))
# queue.append(next_medal)
# else:
# if division.id in _processed_divisions:
# _processed_divisions.remove(division.id)
#
# except ErepublikException:
# continue
# except: # noqa
# self.report_error(f"Thread {threading.current_thread().name} ran into an error")
# finally:
# self._update_lock.set()
# else:
# self.travel_to_residence()
# self.stop_threads.wait(60)
# battle = division = side = damage_amount = bombs = ground_hits = None # noqa
# bombs_required = bombs = fight = bombs_deployed = next_medal = None # noqa
# except: # noqa
# self.report_error(f"Thread {threading.current_thread().name} ran into an error")
#
#
# def background_task_clear_bhs(self, minute: Union[int, bool]):
# _processed_divisions: Set[int] = set()
# if self.now > utils.localize_dt(datetime(2021, 2, 8)):
# self.write_warning("Fight is now disabled")
# return
# if minute:
# try:
# minute = 15 if isinstance(minute, bool) else int(minute)
# except ValueError:
# return
# else:
# return
# next_check: datetime = self.now
# next_check: datetime = _norm(next_check.replace(minute=next_check.minute // 5 * 5, second=0))
# queue: List[_EmptyMedal] = []
#
# empty_bh_thread = threading.Thread(
# target=self._sub_task_fight_in_empty_battle,
# args=(queue, _processed_divisions),
# name=self.thread_name("empty_bh_queue"),
# )
# empty_bh_thread.start()
# self._bg_task_queue.append(empty_bh_thread)
#
# while not self.stop_threads.is_set():
# if self.now > utils.localize_dt(datetime(2021, 2, 8)):
# self.write_warning("Fight is now disabled")
# return
# try:
# self.update_war_info()
# # Check TP battles
# tp_battles = self.sorted_battles(True, True)
#
# # Removes old division ids from processed division ids set {1,2,3}.inter_update({3,4,5}) == {3}
# _processed_divisions.intersection_update({d for b in tp_battles for d in b.div.keys()})
# for battle in tp_battles:
# if self.details.citizenship.id in [battle.defender.id, battle.invader.id]:
# if battle.has_air:
# continue
# else:
# fight_time = _norm(battle.start + timedelta(minutes=minute))
# defender_side = battle.defender.id == self.details.citizenship.id
# battle_stats = self.get_battle_division_stats(list(battle.div.values())[0])
# try:
# round_stats = battle_stats.get("stats").get("current").get(f"{battle.zone_id}")
# for div in battle.div.values():
# td = (
# round_stats.get(str(div.div))
# .get(str(self.details.citizenship.id))
# .get(str(div.id))
# .get("top_damage")
# )
# if td and not td[0].get("sector", ""):
# td = td[0]
# div = battle.div[td["battle_zone_id"]]
# empty_medal = _EmptyMedal(
# fight_time, battle.id, div.id, defender_side, battle.zone_id
# )
# if (
# td["damage"] <= DMG_MAP[div.div][defender_side] / 2
# and empty_medal not in queue
# ):
# queue.append(empty_medal)
# queue.sort(
# key=attrgetter(
# "time", "defender_side", "round", "battle_id", "division_id"
# )
# )
# except Exception:
# pass
# for div in battle.div.values():
# if div.id in _processed_divisions:
# continue
# else:
# _processed_divisions.add(div.id)
# empty_medal = _EmptyMedal(fight_time, battle.id, div.id, defender_side, battle.zone_id)
# side = battle.defender if defender_side else battle.invader
# if empty_medal in queue:
# continue
# else:
# try:
# medal_damage = self.get_medal_damage(div, side)
# except AttributeError:
# battle = self.all_battles.get(battle.id)
# div: BattleDivision = battle.div[div.id]
# side = battle.defender if defender_side else battle.invader
# medal_damage = self.get_medal_damage(div, side)
# if medal_damage <= DMG_MAP[div.div][defender_side] / 2 and empty_medal not in queue:
# queue.append(empty_medal)
# queue.sort(
# key=attrgetter("time", "defender_side", "round", "battle_id", "division_id")
# )
# else:
# break
#
# next_check = _norm(next_check + timedelta(minutes=5))
# self.stop_threads.wait(utils.get_sleep_seconds(next_check))
# except Exception as e:
# self.report_error(f"Task error: empty_tp_bh_hunter {e.args}")
# empty_bh_thread.join()
# return
# def get_cheap_tp_divisions(self) -> Dict[str, List[Tuple[int, BattleDivision]]]:
# ret = super().get_cheap_tp_divisions()
# real_return = {"ground": [], "air": []}
# for dmg, division in ret["ground"]:
# if division.div == self.division:
# real_return["ground"].append((dmg, division))
# real_return["air"] = ret["air"]
# return real_return
# def fight(self, battle: classes.Battle, division, side=None, count=None, use_ebs=False) -> Optional[int]:
# if self.inventory.final.get("other", {}).get(3, {}).get("amount", 0) > 10:
# self.activate_battle_effect(battle.id, "snowFight")
# if not self.inventory.active.get("prestige_points"):
# ppb = self.inventory.boosters.get("prestige_points", {})
# for _q in sorted(ppb):
# _qd = ppb.get(_q)
# for _dur in sorted(_qd):
# pp_boost: types.InvFinalItem = _qd[_dur]
# if pp_boost.get("expiration") and pp_boost.get("amount") > 10:
# self.activate_pp_booster(pp_boost)
#
# return super().fight(battle, division, side, count, use_ebs)

22
ebot/__init__.py Normal file
View File

@ -0,0 +1,22 @@
"""Package for automating erepublik tasks and whole gameplay"""
__author__ = """Eriks K"""
__email__ = "ebot@72.lv"
__version__ = "2022.3.3"
__copyright__ = """ eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

160
ebot/__main__.py Normal file
View File

@ -0,0 +1,160 @@
""" eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import logging
import os
import signal
import sys
import time
import warnings
from typing import Dict, List, Union
import httpx
import sentry_sdk
from erepublik import utils
from erepublik._logging import ErepublikFileHandler, ErepublikFormatter, ErepublikLogConsoleHandler
from ebot import __version__
from ebot.helpers import BotRestart, BotStop, EbotErrorHttpHandler
from ebot.main import main
from ebot.utils import parse_config, pid_alive
json = utils.json
logger = logging.getLogger("Player")
VERSION = __version__
EVERSION = utils.VERSION
sentry_sdk.init(
"https://8ae666ac1cc344b3ab4a8ba6d3d1c420@o334571.ingest.sentry.io/4686978",
release=f"{VERSION} (using eRepublik {EVERSION})",
with_locals=True,
auto_enabling_integrations=False,
)
sentry_sdk.set_tag("eRepublik", EVERSION)
sentry_sdk.set_tag("eBot", VERSION)
formatter = ErepublikFormatter()
file_handler = ErepublikFileHandler()
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
error_handler = EbotErrorHttpHandler()
error_handler.setFormatter(formatter)
error_handler.setLevel(logging.ERROR)
logger.addHandler(error_handler)
logger.setLevel(logging.INFO)
INTERACTIVE = True
CONFIG: Dict[str, Union[str, int, bool, List[str], Dict[str, Dict[str, Union[bool, List[int]]]]]] = {}
def def_sig(a, b): # noqa
raise BotStop()
signal.signal(signal.SIGINT, def_sig)
signal.signal(signal.SIGTERM, def_sig)
signal.signal(signal.SIGABRT, def_sig)
if sys.platform.startswith("win"):
signal.signal(signal.SIGBREAK, def_sig)
else:
signal.signal(signal.SIGUSR1, def_sig)
signal.signal(signal.SIGHUP, def_sig)
if __name__ == "__main__":
os.makedirs("log", exist_ok=True)
if not sys.version_info >= (3, 8):
raise AssertionError(
"This script requires Python version 3.8 and higher\n"
"But Your version is v{}.{}.{}".format(*sys.version_info)
)
try:
# os.chdir(os.path.dirname(os.path.realpath(__file__)))
if os.path.isfile("bot.pid"):
with open("bot.pid") as f:
old_pid = f.read()
if old_pid.isnumeric():
old_pid = int(old_pid)
try:
os.kill(old_pid, 15)
except: # noqa
pass
while pid_alive(old_pid) and not sys.platform.startswith("win"):
time.sleep(1)
with open("bot.pid", "w") as f:
f.write(f"{os.getpid()}")
config_location = "config.json"
if os.path.isfile(config_location):
with open(config_location, "r") as f:
CONFIG.update(json.load(f))
logger.info("Config file found. Checking...")
should_save = parse_config(CONFIG)
else:
try:
should_save = parse_config(CONFIG)
except EOFError:
logger.error("Unable to read input for config file!\nTerminating...")
raise BotStop()
if should_save:
with open(config_location, "w") as f:
json.dump(CONFIG, f, indent=True)
if CONFIG["interactive"]:
console_handler = ErepublikLogConsoleHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if CONFIG.get("debug"):
logger.setLevel(logging.DEBUG)
for handler in logger.handlers:
if isinstance(handler, (ErepublikLogConsoleHandler, ErepublikFileHandler)):
handler.setLevel(logging.DEBUG)
warnings.simplefilter("default") # Change the filter in this process
os.environ["PYTHONWARNINGS"] = "default" # Also affect subprocesses
logger.info("To quit press [ctrl] + [c]")
logger.info(f"Version: {VERSION} (elib {EVERSION})")
while True:
try:
main(CONFIG)
except BotRestart:
continue
logger.error("Restarting after 1h")
try:
utils.interactive_sleep(60 * 60)
except (OSError, EOFError):
utils.silent_sleep(60 * 60)
except BotStop:
logger.info("Everything is done! Hope You enjoyed!")
utils.silent_sleep(1)
except httpx.ConnectError:
logger.critical("Connection Error! Can't continue, will quit!", exc_info=True)
utils.silent_sleep(1)
except Exception as e: # noqa
logger.critical(
f"Fatal error. {e}",
exc_info=True,
stack_info=True,
extra=dict(ebot_version=VERSION, erep_version=EVERSION),
)
sentry_sdk.capture_exception(e)
finally:
if os.path.isfile("bot.pid"):
os.unlink("bot.pid")
sys.exit(0)

1058
ebot/aviator_support.py Normal file

File diff suppressed because it is too large Load Diff

212
ebot/helpers.py Normal file
View File

@ -0,0 +1,212 @@
""" eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import logging
from datetime import datetime
from operator import attrgetter
from typing import Any, Dict, List, NamedTuple, NoReturn, Optional, Tuple, Union
from erepublik._logging import ErepublikErrorHTTTPHandler
from erepublik.classes import ErepublikException
from erepublik.constants import max_datetime
DMG_MAP: Dict[int, Tuple[int, int]] = {
1: (6_000_000, 12_000_000),
2: (12_000_000, 18_000_000),
3: (18_000_000, 24_000_000),
4: (60_000_000, 90_000_000),
11: (80_000, 100_000),
}
class _EmptyMedal:
def __init__(
self,
time: datetime,
battle_id: int,
division_id: int,
defender_side: bool,
zone: int,
second_attempt: bool = False,
):
self.time: datetime = time
self.division_id: int = division_id
self.battle_id: int = battle_id
self.defender_side: bool = bool(defender_side)
self.round: int = zone
self.second_attempt: bool = second_attempt
@property
def _sort_sequence(self):
return self.time, not self.defender_side, -self.round, self.battle_id, self.division_id
def __hash__(self):
return hash((self.battle_id, self.division_id))
@property
def __dict__(self):
return dict(
time=self.time,
battle_id=self.battle_id,
division_id=self.division_id,
round=self.round,
defender_side=self.defender_side,
second_attempt=self.second_attempt,
)
def __repr__(self):
return self.__str__()
def __str__(self):
return (
f"_EmptyMedal(time={self.time.strftime('%F %T')}, battle_id={self.battle_id}, "
f"division_id={self.division_id}, defender_side={self.defender_side}, round={self.round})"
)
def __eq__(self, other):
try:
return (self.battle_id, self.division_id) == (other.battle_id, other.division_id)
except AttributeError:
return False
def __ne__(self, other):
try:
return not self.__eq__(other)
except AttributeError:
return True
@property
def as_dict(self):
return self.__dict__
class Task(NamedTuple):
name: str
time: datetime
priority: bool = False
def __repr__(self):
return f"{self.time.strftime('%F %T')} {self.human_name}" + (" (P)" if self.priority else "")
@property
def human_name(self) -> str:
return " ".join(self.name.split("_")).capitalize()
class Tasks:
__storage: List[Task]
_defaults: Dict[str, any]
def __repr__(self):
return f"<class Tasks: {len(self.__storage)} in queue>"
def __init__(self):
self.__storage = []
self._defaults = {}
def __getitem__(self, key: Union[str, int]) -> Optional[Task]:
try:
if isinstance(key, int):
return self.__storage[key]
for task in self.__storage:
if task.name == key:
return task
except IndexError:
return Task("Do nothing", max_datetime)
def __setitem__(self, key: Union[str, int], value: Union[datetime, Tuple[datetime, bool]]) -> NoReturn:
priority = False
if isinstance(value, tuple):
value, priority = value
if isinstance(key, int) and isinstance(value, datetime):
old_item = self.__storage.pop(key)
item = Task(old_item.name, value, priority)
elif isinstance(key, str) and isinstance(value, datetime):
item = Task(key, value, priority)
task_idx = self.__get_key_index(key)
if task_idx is not None:
self.__storage.pop(task_idx)
else:
raise TypeError(
f"key, value pairs must be of types (int, datetime) or (str, datetime) "
f"not ({type(key)}, {type(value)})"
)
self.__storage.append(item)
def __get_key_index(self, key: str) -> Optional[int]:
for idx, task in enumerate(self.__storage):
if task.name == key:
return idx
return None
def append(self, __object: Task) -> None:
if not isinstance(__object, Task):
raise TypeError(f"object must be of instance Task, not {type(__object)}")
self.__storage.append(__object)
def sort(self) -> NoReturn:
self.__storage.sort(key=attrgetter("name"))
self.__storage.sort(key=attrgetter("priority"), reverse=True)
self.__storage.sort(key=attrgetter("time"))
def pop(self, key: str) -> Optional[Task]:
_idx = self.__get_key_index(key)
if isinstance(_idx, int):
return self.__storage.pop(_idx)
else:
return
def __iter__(self):
for task in self.__storage:
yield task
def set_default(self, key: str, value: Any) -> NoReturn:
self._defaults[key] = value
def get_default(self, key: str, default: Any = None) -> Any:
return self._defaults.get(key, default)
@property
def as_dict(self):
return {"tasks": self.__storage, "defaults": self._defaults}
class BotStop(SystemExit):
pass
class BotRestart(ErepublikException):
pass
class EbotErrorHttpHandler(ErepublikErrorHTTTPHandler):
def __init__(self):
logging.Handler.__init__(self, level=logging.ERROR)
self._reporter = None
self.host = "erep.lv"
self.url = "/ebot/error/"
self.method = "POST"
self.secure = True
self.credentials = ("0", "changeme")
self.context = None
def _get_last_response(self):
return {}
def _get_instance_json(self):
return ""

153
ebot/main.py Normal file
View File

@ -0,0 +1,153 @@
""" eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import copy
import os
import random
import sys
from datetime import timedelta
from typing import Dict, List, Union
import httpx
import sentry_sdk
from erepublik import Citizen, classes, utils
from ebot import __version__
from ebot.helpers import BotRestart, BotStop
from ebot.player import Player
from ebot.utils import _norm
VERSION = __version__
EVERSION = utils.VERSION
def main(config: Dict[str, Union[int, bool, str, List[str], Dict[str, Union[str, int]]]]):
player = Player("", "", False)
try: # If errors before player is initialized
while True:
has_dump = os.path.isfile("Player__dump.json")
if has_dump:
player = Player.load_from_dump("Player__dump.json")
else:
player = Player(email=config["email"], password=config["password"], auto_login=False)
player.setup_citizen(config)
player.login()
if player.logged_in:
sentry_sdk.set_user(dict(id=player.details.citizen_id, username=player.name, email=player.config.email))
try:
import setproctitle
setproctitle.setproctitle(f"eBot - {player}")
except ImportError:
pass
break
else:
utils.silent_sleep(2)
while True:
try:
player.update_all()
if not player.restricted_ip:
player.dump_instance()
break
except Exception as e: # noqa
player.report_error(f"Error updating all data on initial update {e}")
utils.silent_sleep(2)
player.write_log(
f"View Your stats at:\n"
f"'https://erep.lv/player/{player.name.replace(' ', '%20')}'\n"
f"Your password is '{player.reporter.key}'"
)
# if player.reporter.allowed:
report = copy.deepcopy(config)
report.pop("email", None)
report.pop("password", None)
report.update(VERSION=VERSION, eRepublik_version=EVERSION)
player.reporter.report_action("ACTIVE_CONFIG", json_val=report)
player.setup_tasks(config)
_run_main_loop(player)
player.set_locks()
player.logger.warning("Too many errors.")
except BotStop as bs:
raise bs
except BotRestart:
pass
except httpx.ConnectError as conn_err:
raise conn_err
except Exception as e: # noqa
if isinstance(player, Citizen):
name = player.name
elif config.get("email", None):
name = config["email"]
else:
name = "Uninitialized"
sentry_sdk.capture_exception(e)
player.report_error(
f"Fatal error. {e}", extra=dict(player_name=name, ebot_version=VERSION, erep_version=EVERSION)
)
finally:
if isinstance(player, Citizen):
player.set_locks()
def _run_main_loop(player: Player):
error_count = 0
while error_count < 3 and not player.stop_threads.is_set():
try:
player.update_all()
player.do_tasks()
next_task = player.get_next_task()
player.travel_to_residence()
random_seconds = random.randint(0, 121) if player.tasks.get_default("random_sleep", True) else 0
sleep_seconds = int(utils.get_sleep_seconds(next_task.time))
if sleep_seconds <= 0:
player.write_warning(f"Loop detected! Offending task: '{next_task.name}'")
next_time = _norm(next_task.time + timedelta(seconds=random_seconds)).strftime("%F %T")
tasks = player.tasks.as_dict["tasks"]
player.write_log("My next Tasks and there time:\n" + "\n".join((str(t) for t in tasks)))
player.write_log(
f"Sleeping until (eRep): {next_time} " f"(sleeping for {sleep_seconds}s + random {random_seconds}s)"
)
seconds_to_sleep = sleep_seconds + random_seconds
player.stop_threads.wait(seconds_to_sleep)
except (classes.ErepublikNetworkException, httpx.ConnectError) as exc:
sentry_sdk.capture_exception(exc)
player.write_warning("Network ERROR detected. Sleeping for 1min...")
player.sleep(60)
except BotRestart:
player.set_locks()
return
except classes.ErepublikException as exc:
sentry_sdk.capture_exception(exc)
player.report_error(f"Known error detected! {exc}")
except (KeyboardInterrupt, BotStop):
player.set_locks()
sys.exit(0)
except Exception as exc:
sentry_sdk.capture_exception(exc)
player.report_error(
f"Unknown error! {exc}",
extra=dict(player_name=player.name, ebot_version=VERSION, erep_version=EVERSION),
)
error_count += 1
if error_count < 3:
player.sleep(60)

769
ebot/player.py Normal file
View File

@ -0,0 +1,769 @@
""" eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import os
import re
import signal
import sys
import threading
from collections import defaultdict
from datetime import datetime, timedelta
from random import randint
from typing import Any, Callable, Dict, List, NoReturn, Optional, Set, Tuple, Union
import httpx
import sentry_sdk
from dateutil.parser import parse
from erepublik import Citizen, utils
from erepublik.classes import Battle
from httpx import Response
from ebot import __version__
from ebot.helpers import BotStop, Task, Tasks
from ebot.utils import _norm, jsonify
VERSION = __version__
EVERSION = utils.VERSION
class BasePlayer(Citizen):
do_captcha: bool = False
_last_donation_time: datetime
__last_update_times: Dict[str, Dict[str, Union[datetime, Any]]]
def __init__(self, email, password, auto_login: bool = False):
super().__init__(email, password, auto_login)
self._last_donation_time = self.now
self.__last_update_times = defaultdict(lambda: dict(time=_norm(self.now - timedelta(days=1)), value=None))
def setup_citizen(self, config):
for _key in [
"work",
"train",
"ot",
"wam",
"employees",
"auto_sell_all",
"auto_buy_raw",
"force_wam",
"telegram",
"spin_wheel_of_fortune",
"interactive",
]:
if hasattr(self.config, _key):
setattr(self.config, _key, bool(config.get(_key, False)))
else:
raise AttributeError(f'{self.__class__.__name__}.config has no attribute "{_key}"!')
self.config.auto_sell = config.get("auto_sell", [])
self.config.telegram_token = config.get("telegram_token") or None
self.config.telegram_chat_id = config.get("telegram_chat_id", "")
self.reporter.allowed = not config.get("reporting_is_not_allowed")
self.set_debug(config.get("debug", False))
self.set_interactive(config.get("interactive", False))
if config.get("proxy"):
proxy_data = config.get("proxy")
if hasattr(self, f"set_{proxy_data.get('kind')}_proxy"):
args = (
proxy_data.get("host"),
proxy_data.get("port"),
proxy_data.get("username"),
proxy_data.get("password"),
)
getattr(self, f"set_{proxy_data.get('kind')}_proxy")(*args)
else:
self.logger.error(f"Can't set '{proxy_data.get('kind')}' proxy")
raise LookupError(f"Can't set '{proxy_data.get('kind')}' proxy")
def setup_tasks(self, config: Dict[str, Any]):
pass
def signal_quit(self, sig_num, frame):
if frame:
pass
self.set_locks()
self.sleep(2)
self.logger.debug(f"Received: {sig_num} (Quit)")
if not self.restricted_ip:
self.dump_instance()
raise BotStop(0)
def signal_reload(self, sig_num, frame):
if frame:
pass
self.logger.debug(f"Received: {sig_num} (Update)")
self.logger.warning("Forcing full update...")
self.update_all(True)
self.send_state_update()
self.send_inventory_update()
self.write_log("Forced full update completed!")
def signal_config_reload(self, sig_num, frame):
if frame:
pass
self.logger.debug(f"Received: {sig_num} (Update)")
self.logger.warning("Reloading config file...")
self.load_config()
self.update_all(True)
self.send_state_update()
self.send_inventory_update()
self.write_log("Configs reloaded successfully!")
def report_error(self, msg: str = "", sentry: bool = True, extra: Dict[str, Any] = None):
if sentry:
sentry_sdk.capture_exception()
if extra is None:
extra = {}
extra.update({"ebot_version": VERSION, "erep_version": EVERSION})
super().report_error(msg, extra)
def _report_action(self, action: str, msg: str, **kwargs):
super()._report_action(action, msg, **jsonify(kwargs))
def report_action(self, action: str, msg: str, **kwargs):
self._report_action(action, msg, **jsonify(kwargs))
def _get_main_citizen_profile_json(self, c_id: int):
if _norm(self.__last_update_times[f"q7hit__{c_id}"]["time"] + timedelta(hours=4)) <= self.now:
self.__last_update_times[f"q7hit__{c_id}"]["value"] = super()._get_main_citizen_profile_json(c_id)
self.__last_update_times[f"q7hit__{c_id}"]["time"] = self.now
return self.__last_update_times[f"q7hit__{c_id}"]["value"]
@classmethod
def load_from_dump(cls, dump_name: str = ""):
filename = dump_name if dump_name else f"{cls.__name__}__dump.json"
with open(filename) as f:
data = utils.json.load(f)
instance = cls(data["config"]["email"], "", False)
for cookie in data["cookies"]:
instance._req.cookies.set(cookie["name"], cookie["value"], cookie["domain"], cookie["path"])
if data.get("user_agent"):
instance._req.headers.update({"User-Agent": data["user_agent"]})
instance.load_config("config.json")
instance._resume_session()
instance.login()
return instance
def load_config(self, config_path: str = "config.json"):
if os.path.isfile(config_path):
with open(config_path) as f:
try:
configs = utils.json.load(f)
except utils.json.JSONDecodeError:
self.logger.error(f"Config file '{config_path}' must be a JSON file!")
return
self.setup_citizen(configs)
else:
self.logger.warning(f"Config file '{config_path}' not found!")
def donate_money(self, *args, **kwargs) -> bool:
self.sleep((self._last_donation_time + timedelta(seconds=5) - self.now).total_seconds()) # noqa
ret = super().donate_money(*args, **kwargs)
self._last_donation_time = self.now
return ret
def candidate_for_party_presidency(self) -> Optional[Response]:
if self.politics.is_party_member:
return super().candidate_for_party_presidency()
else:
self.report_action("POLITIC_CONGRESS", "Unable to apply for party president elections - not a party member")
return None
def candidate_for_congress(self, presentation: str = "") -> Optional[Response]:
if self.politics.is_party_member:
return super().candidate_for_congress(presentation)
else:
self.report_action("POLITIC_CONGRESS", "Unable to apply for congress elections - not a party member")
return None
@property
def as_dict(self):
d = super().as_dict
d.update(_last_donation_time=self._last_donation_time, __last_update_times=self.__last_update_times)
return d
def do_captcha_challenge(self, *args, **kwargs):
if self.do_captcha:
return super().do_captcha_challenge(*args, **kwargs)
return False
def solve_captcha(self, src: str) -> List[Dict[str, int]]:
r = self.reporter._req.post(f"{self.reporter.url}/captcha/api", data={"password": "CaptchaDevAPI", "src": src})
r = r.json()
if r["status"]:
return [dict(x=icon["x"] + randint(-4, 4), y=icon["y"] + randint(-4, 4)) for icon in r["result"]["result"]]
return []
class PlayerTasks(BasePlayer):
tasks: Tasks
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tasks = Tasks()
def setup_tasks(self, config):
super().setup_tasks(config)
self._setup_player_tasks(config)
@property
def as_dict(self):
ret = super().as_dict
ret["tasks"] = self.tasks.as_dict
return ret
def do_tasks(self) -> NoReturn:
if self.restricted_ip:
self.tasks.pop("wam")
self.tasks.pop("employ")
self.tasks.sort()
for task in list(self.tasks):
if task.time <= self.now:
task_function_name: str = f"task_{task.name}"
try:
task_function: Callable = getattr(self, task_function_name)
except (AttributeError, TypeError):
self.report_error(f"Task '{task.name}' has no task's function '{task_function_name}' defined!")
raise NotImplementedError(f"Task '{task_function_name}' not implemented!")
if callable(task_function):
self.write_log(f"Doing task: {task.human_name}")
task_function()
else:
raise TypeError(f"Task '{task_function_name}' is not a function!")
def get_next_task(self) -> Task:
self.tasks.sort()
return self.tasks[0]
def enable_task(self, title: str, time: datetime = None, priority: bool = False):
next_time = time if time is not None else self.now
self.tasks[title] = (next_time, priority)
def task_work(self):
self.update_citizen_info()
self.work()
if self.config.ot:
self.task_ot()
self.collect_daily_task()
self.tasks["work"] = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1)), True
def task_ot(self):
self.update_inventory()
if self.ot_points >= 24 and self.now >= self.my_companies.next_ot_time:
self.work_ot()
self.update_inventory()
try:
hours = ((24 - self.ot_points) // len(self.inventory.active["House"])) + 1
except ZeroDivisionError:
hours = 24
self.tasks["ot"] = _norm(self.now + timedelta(hours=hours if hours > 1 else 1)), True
def task_train(self):
self.update_citizen_info()
self.train()
self.collect_daily_task()
next_time = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1))
self.tasks["train"] = next_time, True
def task_wam(self):
if self.restricted_ip:
self.tasks.pop("wam")
return
has_more_to_wam = self.work_as_manager()
if has_more_to_wam:
next_time = _norm(self.tasks["wam"].time + timedelta(hours=1))
else:
next_time = _norm(
self.now.replace(hour=self.tasks.get_default("wam_hour", 14), minute=0, second=0, microsecond=0)
+ timedelta(days=1)
)
self.tasks["wam"] = next_time
def task_employees(self):
if self.restricted_ip:
self.tasks.pop("employees")
return
if self.employ_employees():
_h = self.tasks.get_default("employ_hour", 8)
next_time = _norm(self.now.replace(hour=_h, minute=0, second=0) + timedelta(days=1))
else:
next_time = _norm(self.tasks["employees"].time + timedelta(minutes=30))
self.tasks["employees"] = next_time
def task_buy_gold(self):
for offer in self.get_monetary_offers():
if offer["amount"] >= 10 and self.details.cc >= 20 * offer["price"]:
# TODO: check allowed amount to buy
self.buy_monetary_market_offer(offer=offer["offer_id"], amount=10, currency=62)
break
self.tasks["buy_gold"] = _norm(self.tasks["buy_gold"].time + timedelta(days=1)), True
def task_congress(self):
now = self.now
if 1 <= now.day < 16:
next_time = _norm(now.replace(day=16))
elif 16 <= now.day < 24:
self.candidate_for_congress()
if not now.month == 12:
next_time = _norm(now.replace(month=now.month + 1, day=16))
else:
next_time = _norm(now.replace(year=now.year + 1, month=1, day=16))
else:
if not now.month == 12:
next_time = _norm(now.replace(month=now.month + 1, day=16))
else:
next_time = _norm(now.replace(year=now.year + 1, month=1, day=16))
self.tasks["congress"] = _norm(next_time.replace(hour=1, minute=30, second=0, microsecond=0))
def task_party_president(self):
next_time = self.now
if next_time.day == 15:
pass
elif next_time.day < 15: # now.day ∈[1;15)
self.candidate_for_party_presidency()
else: # now.day ∈ (15;31]
self.candidate_for_party_presidency()
next_time = next_time.replace(day=16)
if self.now.month == 12:
next_time = next_time.replace(year=self.now.year + 1, month=1)
else:
next_time = next_time.replace(month=self.now.month + 1)
self.tasks["party_president"] = _norm(next_time.replace(day=16, hour=0, minute=1, second=0, microsecond=0))
def task_contribute_cc(self):
if not self.now.weekday():
self.update_money()
cc = (self.details.cc // self.tasks.get_default("contribute_cc")) * self.tasks.get_default("contribute_cc")
self.contribute_cc_to_country(cc, self.details.citizenship)
next_time = _norm((self.now + timedelta(days=7 - self.now.weekday())).replace(hour=2, minute=0, second=0))
self.tasks["contribute_cc"] = next_time
def task_renew_houses(self):
end_times = self.renew_houses()
if end_times:
self.tasks["renew_houses"] = _norm(min(end_times.values()) - timedelta(hours=24))
else:
self.logger.warning("No houses found!")
for q in range(1, self.tasks.get_default("renew_houses", 0) + 1):
self.write_log(f"Buying and activating q{q}...")
if not self.buy_and_activate_house(q):
break
end_times = self.check_house_durability()
if end_times:
next_time = _norm(min(end_times.values()) - timedelta(hours=24))
else:
next_time = _norm(self.now + timedelta(hours=6))
self.tasks["renew_houses"] = next_time
def task_spin_wheel_of_fortune(self):
percents_to_lose = self.tasks.get_default("spin_wheel_of_fortune")
self.update_citizen_info()
if self.wheel_of_fortune and percents_to_lose:
if isinstance(percents_to_lose, bool):
percents_to_lose = 10
else:
percents_to_lose = int(percents_to_lose)
max_spend = ((self.details.cc * (percents_to_lose / 100)) // 100) * 100
max_cost = 0
next_cost = max_cost + 100 if max_cost else 500
current_sum = 0
while current_sum < max_spend:
max_cost = next_cost
next_cost += 100
current_sum += max_cost
self.spin_wheel_of_fortune(max_cost)
self.tasks["spin_wheel_of_fortune"] = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1))
def _setup_player_tasks(self, config: Dict[str, Any]):
now = self.now
self.tasks.set_default("random_sleep", config.get("random_sleep"))
for _action in ["work", "train", "ot"]: # "fight", "epic_hunt"
if getattr(self.config, _action):
self.enable_task(_action, now, priority=True)
for _action in ["congress", "party_president"]:
if config.get(_action, True):
self.enable_task(_action)
for _action in ["wam", "employees"]:
if getattr(self.config, _action) and not self.restricted_ip:
_hour = 14
if not isinstance(config[_action], bool):
try:
_hour = abs(int(config[_action])) % 24
except ValueError:
_hour = 14
self.tasks.set_default(f"{_action}_hour", _hour)
self.enable_task(_action, _norm(now.replace(hour=_hour, minute=0, second=0, microsecond=0)))
if config.get("buy_gold"):
self.enable_task("buy_gold", _norm(now.replace(hour=23, minute=57, second=0, microsecond=0)), True)
if config.get("contribute_cc", 0):
self.tasks.set_default("contribute_cc", int(config.get("contribute_cc", 0)))
self.enable_task("contribute_cc", _norm(now.replace(hour=2, minute=0, second=0)))
if config.get("renew_houses"):
self.tasks.set_default("renew_houses", config.get("renew_houses"))
self.enable_task("renew_houses")
if config.get("spin_wheel_of_fortune", False):
spin_value = config.get("spin_wheel_of_fortune")
percent = int(10 if isinstance(spin_value, bool) else spin_value)
self.tasks.set_default("spin_wheel_of_fortune", percent)
self.enable_task("spin_wheel_of_fortune")
class PlayerBackgroundTasks(BasePlayer):
_preset_pay_table: Dict[int, int]
_default_pay_table: Dict[int, int]
_hits_to_pay: Dict[int, int]
_bg_task_queue: List[threading.Thread]
def __init__(self, *args, **kwargs):
self._preset_pay_table = {}
self._default_pay_table = {1: 400, 2: 500, 3: 600}
self._bg_task_queue = []
self._hits_to_pay = defaultdict(int)
super().__init__(*args, **kwargs)
def setup_tasks(self, config):
super().setup_tasks(config)
self._setup_player_bg_tasks(config)
def signal_quit(self, sig_num, frame):
self.set_locks()
if self._hits_to_pay:
self.write_log("Paying for hits, before quitting")
for player_id, amount in self._hits_to_pay.items():
if amount:
self.donate_money(player_id, amount, 1)
self._hits_to_pay[player_id] = 0
for thread in self._bg_task_queue:
if thread.is_alive():
self.write_log(f"Waiting on thread '{thread.name:^32}'")
thread.join()
return super().signal_quit(sig_num, frame)
def signal_reload(self, sig_num, frame):
for thread in self._bg_task_queue:
self.write_log(f"Thread '{thread.name:^32}', alive {thread.is_alive()}")
if self._hits_to_pay:
self.write_log("Clearing hit table...")
for player_id, amount in self._hits_to_pay.items():
if amount:
self.donate_money(player_id, amount, 1)
self._hits_to_pay[player_id] = 0
else:
self._hits_to_pay.clear()
return super().signal_reload(sig_num, frame)
def thread_name(self, name: str) -> str:
return f"{self.name}__{threading.active_count() - 1}__{name}"
def background_task_start_battles(self, wars):
def update_war_info():
rj = self._get_military_campaigns_json_list().json()
if rj.get("countries"):
if rj.get("battles"):
return {battle_data.get("id"): Battle(battle_data) for battle_data in rj.get("battles").values()}
return {}
wars: Dict[str, Dict[str, Union[bool, List[int]]]]
finished_war_ids: Set[int] = {*[]}
war_data: Dict[str, Dict[str, Union[bool, List[int]]]] = wars
war_ids: Set[int] = {int(war_id) for war_id in war_data.keys()}
next_attack_time = self.now
next_attack_time = _norm(next_attack_time.replace(minute=next_attack_time.minute // 5 * 5, second=0))
while not self.stop_threads.is_set():
try:
attacked = False
all_battles = update_war_info()
running_wars = {b.war_id for b in all_battles.values()}
if all_battles:
for war_id in war_ids - finished_war_ids - running_wars:
war = war_data[str(war_id)]
war_regions = set(war.get("regions"))
auto_attack = war.get("auto_attack")
try:
start_h = war.get("attack_time")[0]
end_h = war.get("attack_time")[1]
except (TypeError, IndexError):
start_h = 21
end_h = 3
if start_h <= end_h:
time_check = start_h <= self.now.hour < end_h
else:
time_check = start_h <= self.now.hour or self.now.hour < end_h
status = self.get_war_status(war_id)
if status.get("ended", False):
finished_war_ids.add(war_id)
continue
if not status.get("can_attack"):
continue
if auto_attack or time_check:
for reg in war_regions:
if attacked:
break
if reg in status.get("regions", {}).keys():
reg_name = status.get("regions", {}).get(reg)
self._post_wars_attack_region(war_id, reg, reg_name)
self._report_action("MILITARY_QUEUE_ATTACK", f"Battle for *{reg_name}* queued")
break
if attacked:
break
war_ids -= finished_war_ids
if attacked:
next_attack_time = _norm(next_attack_time + timedelta(hours=1))
else:
next_attack_time = _norm(next_attack_time + timedelta(minutes=5))
self.stop_threads.wait(utils.get_sleep_seconds(next_attack_time))
except Exception as e:
self.report_error(f"Task error: start_battles {e.args}")
def background_task_report_game_token_price(self):
while not self.stop_threads.is_set():
try:
_next_time = _norm(self.now.replace(second=0, microsecond=0))
if _next_time.minute < 50:
_next_time = _norm(_next_time.replace(minute=(_next_time.minute // 10 + 1) * 10))
else:
_next_time = _norm(_next_time.replace(minute=0) + timedelta(hours=1))
offers = self.get_game_token_offers()
httpx.post("https://erep.lv/market/gametoken/add/", json=dict(all_offers=True, **offers))
self.stop_threads.wait(utils.get_sleep_seconds(_next_time))
except Exception as e:
self.report_error(f"Task error: game_token {e}")
def background_task_report_org_accounts(self, org_list: List[int] = None):
if org_list is None:
org_list = []
watch_organisations = org_list
while not self.stop_threads.is_set():
try:
next_report_time = _norm(self.now.replace(minute=0, second=0, microsecond=0))
while not self.stop_threads.is_set():
for organisation_id in watch_organisations:
to_report = {"organisation_id": organisation_id}
account = self.fetch_organisation_account(organisation_id)
if account.get("ok"):
to_report.update(**account)
httpx.post("https://erep.lv/org/account/add/", json=to_report)
next_report_time = _norm(next_report_time + timedelta(hours=1))
to_sleep = (next_report_time - self.now).total_seconds()
self.stop_threads.wait(to_sleep if to_sleep > 0 else 0)
except Exception as e:
self.report_error(f"Task error: report_org_accounts {e}")
def background_task_birthday_journey(self):
priority = [
"anniversary_decoration",
"permanent_energy_house",
"energy_house",
"energy_booster",
"house_pool_bonus",
"ground_vehicle_blueprint",
"air_vehicle_blueprint",
"air_damage_booster_50",
"air_damage_booster_20",
"gold",
"energy_bars",
"winter_treat",
"trump_bomb",
"big_bomb",
"stinger_missile",
"damage_booster_100",
"overtime_points",
"speed_booster_2",
"air_deploy_rank_points_booster_20",
"air_deploy_rank_points_booster_10",
"air_deploy_influence_booster_20",
"air_deploy_influence_booster_10",
"deploy_size_booster_200",
"deploy_size_booster_100",
"ground_deploy_rank_points_booster_20",
"ground_deploy_rank_points_booster_10",
"ground_deploy_influence_booster_20",
"ground_deploy_influence_booster_10",
"vehicle_discharge_document",
"fuel",
]
try:
node_map: Dict[int, Set[int]] = defaultdict(set)
while not self.stop_threads.is_set():
quest_data = self.get_anniversary_quest_data()
# can_claim_extra = quest_data.get("rewards", {}).get("canCollectExtra")
if not node_map:
for pair in quest_data["neighbors"]:
node_map[pair["nodeId"]].add(pair["neighborId"])
node_map[pair["neighborId"]].add(pair["nodeId"])
visited_nodes: List[Tuple[int, datetime, str]] = []
if not quest_data.get("status", {}).get("progress"):
self.start_unlocking_map_quest_node(30219)
quest_data = self.get_anniversary_quest_data()
if quest_data["status"]["progress"]:
for node in quest_data["status"]["progress"].values():
visited_nodes.append((node["nodeId"], parse(node["finishUnlockTime"]), node["nodeStatus"]))
visited_nodes.sort(key=lambda o: o[1])
visited_nodes.reverse()
current_queue = []
available_nodes = {*[]}
for node_id, finish_datetime, status in visited_nodes:
if status in ("claimed", "unclaimed"):
available_nodes.update(node_map[node_id])
# elif status == "unclaimed":
# available_nodes.update(node_map[node_id])
# collection_data = self.collect_map_quest_node(node_id).json()
# if collection_data.get("error"):
# self.logger.error(collection_data.get("error", {}).get("message"))
# claimable_node = quest_data.get("cities").get(str(node_id))
# if quest_data.get("status").get("inventory").get("miles") < claimable_node.get("miles"):
# self.travel_to_region(713)
# self.travel_to_residence()
# else:
# if can_claim_extra:
# self.collect_map_quest_node(node_id, True)
elif status == "unlocking":
current_queue.append(finish_datetime)
if not current_queue:
available_nodes -= {nid for nid, _, __ in visited_nodes}
if available_nodes:
for kind in priority:
for node_id in available_nodes:
if kind in [r["type"] for r in quest_data["cities"][str(node_id)]["rewards"]]:
self.start_unlocking_map_quest_node(node_id)
quest_data = self.get_anniversary_quest_data()
current_queue.append(
parse(quest_data["status"]["progress"][str(node_id)]["finishUnlockTime"])
)
if current_queue:
break
else:
continue
break
current_queue.sort()
if current_queue:
next_finish = min(current_queue)
else:
self.report_error("Task Birthday Journey: Empty queue!", sentry=False)
return
to_sleep = utils.get_sleep_seconds(next_finish)
self.write_log(
f"Task: 'Birthday Journey' sleeping until (eRep): {next_finish} (sleeping for {to_sleep}s)"
)
self.stop_threads.wait(to_sleep if to_sleep > 0 else 0)
except Exception as e:
self.report_error(f"Task error: Birthday Journey {e}")
def _setup_player_bg_tasks(self, config: Dict[str, Any]):
if config.get("state_update_repeater", True):
t = threading.Thread(target=self.state_update_repeater, name=self.thread_name("state_update_repeater"))
t.start()
self._bg_task_queue.append(t)
for bg_task in ["start_battles", "game_tokens", "org_fetcher", "birthday_journey"]: # "clear_bhs",
if config.get(bg_task):
try:
_task_fn = getattr(self, f"background_task_{bg_task}")
except AttributeError:
self.report_error(f"Task '{bg_task}' has no task's function 'background_task_{bg_task}' defined!")
raise NotImplementedError(f"Task 'background_task_{bg_task}' not implemented!")
args = (config.get(bg_task),) if bg_task in ["start_battles", "clear_bhs", "org_fetcher"] else tuple()
t = threading.Thread(target=_task_fn, args=args, name=self.thread_name(bg_task))
t.start()
self._bg_task_queue.append(t)
@property
def as_dict(self):
ret = super().as_dict
ret.update(hits_payment_aggregation=self._hits_to_pay)
return ret
def get_war_status(self, war_id: int) -> Dict[str, Union[bool, Dict[int, str]]]:
r = self._get_wars_show(war_id)
html = r.text
ret = {}
reg_re = re.compile(rf'data-war-id="{war_id}" data-region-id="(\d+)" data-region-name="([- \w]+)"')
if reg_re.findall(html):
ret.update(regions={}, can_attack=True)
for reg in reg_re.findall(html):
ret["regions"].update({int(reg[0]): reg[1]})
elif re.search(
r'<a href="//www.erepublik.com/en/military/battlefield/(\d+)" '
r'class="join" title="Join"><span>Join</span></a>',
html,
):
battle_id = re.search(
r'<a href="//www.erepublik.com/en/military/battlefield/(\d+)" '
r'class="join" title="Join"><span>Join</span></a>',
html,
).group(1)
ret.update(can_attack=False, battle_id=int(battle_id))
elif re.search(r"This war is no longer active.", html):
ret.update(can_attack=False, ended=True)
else:
ret.update(can_attack=False)
return ret
class Player(PlayerTasks, PlayerBackgroundTasks, BasePlayer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
signal.signal(signal.SIGINT, self.signal_quit)
signal.signal(signal.SIGTERM, self.signal_quit)
signal.signal(signal.SIGABRT, self.signal_quit)
if sys.platform.startswith("win"):
signal.signal(signal.SIGBREAK, self.signal_quit)
else:
signal.signal(signal.SIGUSR1, self.signal_reload)
signal.signal(signal.SIGUSR2, self.signal_config_reload)
signal.signal(signal.SIGHUP, self.signal_quit)
pid = os.getpid()
for action, _signal in [("UPDATE", "SIGUSR1"), ("RELOAD", "SIGUSR2"), ("QUIT", "SIGINT")]:
self.logger.debug(
f"To {action:^6}, run: 'kill -{getattr(signal, _signal):<2} {pid}' or 'kill -{_signal:<7} {pid}'"
)
def setup_tasks(self, config: Dict[str, Any]):
super().setup_tasks(config)
def check_house_durability(self) -> Dict[int, datetime]:
ret = {}
for q, dt in super().check_house_durability().items():
if q <= self.tasks.get_default("renew_houses", 0):
ret.update({q: dt})
return ret
def to_json(self, indent: bool = False) -> str:
return utils.json_dumps(self.as_dict, indent=False, sort_keys=True)
def setup_citizen(self, config):
super().setup_citizen(config)
self.tasks.set_default("cheap_medals", config.get("cheap_medals", False))

240
ebot/utils.py Normal file
View File

@ -0,0 +1,240 @@
""" eBot
Copyright (C) 2022 Eriks K
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import json
import logging
import os
import platform
import re
import subprocess
from datetime import datetime
from typing import Any, Dict, List, Union
from erepublik import constants, utils
logger = logging.getLogger("Player")
def _norm(dt: datetime) -> datetime:
return constants.erep_tz.normalize(dt)
def parse_input(msg: str) -> bool:
msg += " (y|n):"
data = None
while data not in ["", "y", "Y", "n", "N"]:
try:
data = input(msg)
except EOFError:
data = "n"
return data in ["", "y", "Y"]
def get_config(
config_location: str = "config.json",
) -> Dict[str, Union[str, int, bool, List[str], Dict[str, Union[int, str, bool]]]]:
CONFIG = {}
if os.path.isfile(config_location):
with open(config_location, "r") as f:
CONFIG.update(json.load(f))
logger.info("Config file found. Checking...")
should_save = parse_config(CONFIG)
else:
should_save = parse_config(CONFIG)
if should_save:
with open(config_location, "w") as f:
json.dump(CONFIG, f, indent=True)
return CONFIG
def parse_config(config: Dict[str, Any]) -> bool:
"""Parse configuration dictionary and fill any missing keys.
:type config: dict
:rtype: bool
:param config: None or dict with configs
:return: boolean if passed dict had to be changed
"""
changed = False
if not config.get("email"):
config["email"] = input("Player email: ")
changed = True
if not config.get("password"):
config["password"] = input("Player password: ")
changed = True
if "employ" in config:
config["employees"] = config.pop("employ")
changed = True
_basic_prompt_dict = dict(
work="Should I work",
train="Should I train",
ot="Should I work overtime",
wam="Should I WAM",
employees="Should I employ employees",
)
for key, prompt in _basic_prompt_dict.items():
if key not in config:
config[key] = parse_input(prompt)
changed = True
if config["wam"] or config["employees"]:
if "auto_sell" not in config or not isinstance(config["auto_sell"], list):
config["auto_sell"] = []
changed = True
if parse_input("Should I auto sell produced products"):
if parse_input("Should I auto sell final products"):
_final_prompt_dict = dict(
food="Should I auto sell Food products",
weapon="Should I auto sell Weapon products",
house="Should I auto sell House products",
aircraft="Should I auto sell Aircraft products",
)
for key, prompt in _final_prompt_dict.items():
if parse_input(prompt):
config["auto_sell"].append(key)
if parse_input("Should I auto sell raw products"):
_raw_prompt_dict = dict(
foodRaw="Should I auto sell Food raw",
weaponRaw="Should I auto sell Weapon raw",
houseRaw="Should I auto sell House raw",
airplaneRaw="Should I auto sell Aircraft raw",
)
for key, prompt in _raw_prompt_dict.items():
if parse_input(prompt):
config["auto_sell"].append(key)
if config["auto_sell"]:
if "auto_sell_all" not in config:
print("When selling produced items should I also sell items already in inventory?")
config["auto_sell_all"] = parse_input("Y - sell all, N - only just produced")
changed = True
else:
config["auto_sell_all"] = False
if "auto_buy_raw" not in config:
config["auto_buy_raw"] = parse_input("Should I auto buy raw deficit at WAM or employ")
changed = True
else:
config["auto_sell"] = []
config["auto_sell_all"] = False
config["auto_buy_raw"] = False
if "fight" not in config:
config["fight"] = False # parse_input("Should I fight")
changed = True
if config.get("fight"):
_fight_prompt_dict = dict(
air="Should I fight in AIR",
ground="Should I fight in GROUND",
all_in="When full energy should i go all in\n Y - all in, N - 1h worth of energy",
next_energy="Should I fight when next WC +1 energy is reachable",
boosters="Should I use +50% dmg boosters, when fighting on ground",
travel_to_fight="Should I travel to fight",
epic_hunt="Should I check for epic battles",
rw_def_side="Should I fight on defenders side in RWs",
continuous_fighting="If already fought in any battle, \nshould I continue to fight all FF in that battle",
maverick="If MaverickPack is active, \nshould I try to fight in non-native divisions?",
)
for key, prompt in _fight_prompt_dict.items():
if key not in config:
config[key] = parse_input(prompt)
changed = True
if not config["epic_hunt"]:
config["epic_hunt_ebs"] = False
elif "epic_hunt_ebs" not in config:
config["epic_hunt_ebs"] = parse_input("Should I eat EBs when fighting in epic battle")
changed = True
else:
config["air"] = False
config["ground"] = False
config["all_in"] = False
config["next_energy"] = False
config["boosters"] = False
config["travel_to_fight"] = False
config["epic_hunt"] = False
config["epic_hunt_ebs"] = False
config["rw_def_side"] = False
config["continuous_fighting"] = False
_other_prompt_dict = dict(
spin_wheel_of_fortune="Should I auto spin WheelOfFortune for 10% cc amount",
congress="Candidate for congress",
party_president="Candidate for party presidency",
debug="Should I generate debug files",
random_sleep="Should I add random amount (0-120sec) to sleep time",
buy_gold="Should I auto buy 10g every day",
interactive="Should I print output to console?",
telegram="Should I send notifications through Telegram",
)
for key, prompt in _other_prompt_dict.items():
if key not in config:
config[key] = parse_input(prompt)
changed = True
if "telegram_chat_id" not in config and config["telegram"]:
config["telegram_chat_id"] = ""
if "telegram_token" not in config and config["telegram"]:
config["telegram_token"] = ""
if "proxy" not in config:
config["_proxy"] = {
"kind": "socks or http",
"host": "localhost",
"port": 8080,
"username": "optional",
"password": "optional",
}
changed = True
return changed
def jsonify(data) -> Any:
return utils.json_loads(dict_to_json(data))
def dict_to_json(data) -> str:
return utils.json_dumps(data)
def pid_alive(pid: int) -> bool:
"""Check For whether a pid is alive"""
system = platform.uname().system
if re.search(r"Linux|Darwin", system, re.IGNORECASE):
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
elif re.search("Windows", system, re.IGNORECASE):
out = subprocess.check_output(["tasklist", "/fi", f"PID eq {pid}"]).strip()
return bool(re.search(b"No tasks", out, re.IGNORECASE))
else:
return False
# raise RuntimeError(f"unsupported system={system}")