""" eBot Copyright (C) 2022 Eriks K This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import os import re import signal import sys import threading from collections import defaultdict from datetime import datetime, timedelta from random import randint from typing import Any, Callable, Dict, List, NoReturn, Optional, Set, Tuple, Union import httpx import sentry_sdk from dateutil.parser import parse from erepublik import Citizen, utils from erepublik.classes import Battle from httpx import Response from ebot import __version__ from ebot.helpers import BotStop, Task, Tasks from ebot.utils import _norm, jsonify VERSION = __version__ EVERSION = utils.VERSION class BasePlayer(Citizen): do_captcha: bool = False _last_donation_time: datetime __last_update_times: Dict[str, Dict[str, Union[datetime, Any]]] def __init__(self, email, password, auto_login: bool = False): super().__init__(email, password, auto_login) self._last_donation_time = self.now self.__last_update_times = defaultdict(lambda: dict(time=_norm(self.now - timedelta(days=1)), value=None)) def setup_citizen(self, config): for _key in [ "work", "train", "ot", "wam", "employees", "auto_sell_all", "auto_buy_raw", "force_wam", "telegram", "spin_wheel_of_fortune", "interactive", ]: if hasattr(self.config, _key): setattr(self.config, _key, bool(config.get(_key, False))) else: raise AttributeError(f'{self.__class__.__name__}.config has no attribute "{_key}"!') self.config.auto_sell = config.get("auto_sell", []) self.config.telegram_token = config.get("telegram_token") or None self.config.telegram_chat_id = config.get("telegram_chat_id", "") self.reporter.allowed = not config.get("reporting_is_not_allowed") self.set_debug(config.get("debug", False)) self.set_interactive(config.get("interactive", False)) if config.get("proxy"): proxy_data = config.get("proxy") if hasattr(self, f"set_{proxy_data.get('kind')}_proxy"): args = ( proxy_data.get("host"), proxy_data.get("port"), proxy_data.get("username"), proxy_data.get("password"), ) getattr(self, f"set_{proxy_data.get('kind')}_proxy")(*args) else: self.logger.error(f"Can't set '{proxy_data.get('kind')}' proxy") raise LookupError(f"Can't set '{proxy_data.get('kind')}' proxy") def setup_tasks(self, config: Dict[str, Any]): pass def signal_quit(self, sig_num, frame): if frame: pass self.set_locks() self.sleep(2) self.logger.debug(f"Received: {sig_num} (Quit)") if not self.restricted_ip: self.dump_instance() raise BotStop(0) def signal_reload(self, sig_num, frame): if frame: pass self.logger.debug(f"Received: {sig_num} (Update)") self.logger.warning("Forcing full update...") self.update_all(True) self.send_state_update() self.send_inventory_update() self.write_log("Forced full update completed!") def signal_config_reload(self, sig_num, frame): if frame: pass self.logger.debug(f"Received: {sig_num} (Update)") self.logger.warning("Reloading config file...") self.load_config() self.update_all(True) self.send_state_update() self.send_inventory_update() self.write_log("Configs reloaded successfully!") def report_error(self, msg: str = "", sentry: bool = True, extra: Dict[str, Any] = None): if sentry: sentry_sdk.capture_exception() if extra is None: extra = {} extra.update({"ebot_version": VERSION, "erep_version": EVERSION}) super().report_error(msg, extra) def _report_action(self, action: str, msg: str, **kwargs): super()._report_action(action, msg, **jsonify(kwargs)) def report_action(self, action: str, msg: str, **kwargs): self._report_action(action, msg, **jsonify(kwargs)) def _get_main_citizen_profile_json(self, c_id: int): if _norm(self.__last_update_times[f"q7hit__{c_id}"]["time"] + timedelta(hours=4)) <= self.now: self.__last_update_times[f"q7hit__{c_id}"]["value"] = super()._get_main_citizen_profile_json(c_id) self.__last_update_times[f"q7hit__{c_id}"]["time"] = self.now return self.__last_update_times[f"q7hit__{c_id}"]["value"] @classmethod def load_from_dump(cls, dump_name: str = ""): filename = dump_name if dump_name else f"{cls.__name__}__dump.json" with open(filename) as f: data = utils.json.load(f) instance = cls(data["config"]["email"], "", False) for cookie in data["cookies"]: instance._req.cookies.set(cookie["name"], cookie["value"], cookie["domain"], cookie["path"]) if data.get("user_agent"): instance._req.headers.update({"User-Agent": data["user_agent"]}) instance.load_config("config.json") instance._resume_session() instance.login() return instance def load_config(self, config_path: str = "config.json"): if os.path.isfile(config_path): with open(config_path) as f: try: configs = utils.json.load(f) except utils.json.JSONDecodeError: self.logger.error(f"Config file '{config_path}' must be a JSON file!") return self.setup_citizen(configs) else: self.logger.warning(f"Config file '{config_path}' not found!") def donate_money(self, *args, **kwargs) -> bool: self.sleep((self._last_donation_time + timedelta(seconds=5) - self.now).total_seconds()) # noqa ret = super().donate_money(*args, **kwargs) self._last_donation_time = self.now return ret def candidate_for_party_presidency(self) -> Optional[Response]: if self.politics.is_party_member: return super().candidate_for_party_presidency() else: self.report_action("POLITIC_CONGRESS", "Unable to apply for party president elections - not a party member") return None def candidate_for_congress(self, presentation: str = "") -> Optional[Response]: if self.politics.is_party_member: return super().candidate_for_congress(presentation) else: self.report_action("POLITIC_CONGRESS", "Unable to apply for congress elections - not a party member") return None @property def as_dict(self): d = super().as_dict d.update(_last_donation_time=self._last_donation_time, __last_update_times=self.__last_update_times) return d def do_captcha_challenge(self, *args, **kwargs): if self.do_captcha: return super().do_captcha_challenge(*args, **kwargs) return False def solve_captcha(self, src: str) -> List[Dict[str, int]]: r = self.reporter._req.post(f"{self.reporter.url}/captcha/api", data={"password": "CaptchaDevAPI", "src": src}) r = r.json() if r["status"]: return [dict(x=icon["x"] + randint(-4, 4), y=icon["y"] + randint(-4, 4)) for icon in r["result"]["result"]] return [] class PlayerTasks(BasePlayer): tasks: Tasks def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.tasks = Tasks() def setup_tasks(self, config): super().setup_tasks(config) self._setup_player_tasks(config) @property def as_dict(self): ret = super().as_dict ret["tasks"] = self.tasks.as_dict return ret def do_tasks(self) -> NoReturn: if self.restricted_ip: self.tasks.pop("wam") self.tasks.pop("employ") self.tasks.sort() for task in list(self.tasks): if task.time <= self.now: task_function_name: str = f"task_{task.name}" try: task_function: Callable = getattr(self, task_function_name) except (AttributeError, TypeError): self.report_error(f"Task '{task.name}' has no task's function '{task_function_name}' defined!") raise NotImplementedError(f"Task '{task_function_name}' not implemented!") if callable(task_function): self.write_log(f"Doing task: {task.human_name}") task_function() else: raise TypeError(f"Task '{task_function_name}' is not a function!") def get_next_task(self) -> Task: self.tasks.sort() return self.tasks[0] def enable_task(self, title: str, time: datetime = None, priority: bool = False): next_time = time if time is not None else self.now self.tasks[title] = (next_time, priority) def task_work(self): self.update_citizen_info() self.work() if self.config.ot: self.task_ot() self.collect_daily_task() self.tasks["work"] = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1)), True def task_ot(self): self.update_inventory() if self.ot_points >= 24 and self.now >= self.my_companies.next_ot_time: self.work_ot() self.update_inventory() try: hours = ((24 - self.ot_points) // len(self.inventory.active["House"])) + 1 except ZeroDivisionError: hours = 24 self.tasks["ot"] = _norm(self.now + timedelta(hours=hours if hours > 1 else 1)), True def task_train(self): self.update_citizen_info() self.train() self.collect_daily_task() next_time = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1)) self.tasks["train"] = next_time, True def task_wam(self): if self.restricted_ip: self.tasks.pop("wam") return has_more_to_wam = self.work_as_manager() if has_more_to_wam: next_time = _norm(self.tasks["wam"].time + timedelta(hours=1)) else: next_time = _norm( self.now.replace(hour=self.tasks.get_default("wam_hour", 14), minute=0, second=0, microsecond=0) + timedelta(days=1) ) self.tasks["wam"] = next_time def task_employees(self): if self.restricted_ip: self.tasks.pop("employees") return if self.employ_employees(): _h = self.tasks.get_default("employ_hour", 8) next_time = _norm(self.now.replace(hour=_h, minute=0, second=0) + timedelta(days=1)) else: next_time = _norm(self.tasks["employees"].time + timedelta(minutes=30)) self.tasks["employees"] = next_time def task_buy_gold(self): for offer in self.get_monetary_offers(): if offer["amount"] >= 10 and self.details.cc >= 20 * offer["price"]: # TODO: check allowed amount to buy self.buy_monetary_market_offer(offer=offer["offer_id"], amount=10, currency=62) break self.tasks["buy_gold"] = _norm(self.tasks["buy_gold"].time + timedelta(days=1)), True def task_congress(self): now = self.now if 1 <= now.day < 16: next_time = _norm(now.replace(day=16)) elif 16 <= now.day < 24: self.candidate_for_congress() if not now.month == 12: next_time = _norm(now.replace(month=now.month + 1, day=16)) else: next_time = _norm(now.replace(year=now.year + 1, month=1, day=16)) else: if not now.month == 12: next_time = _norm(now.replace(month=now.month + 1, day=16)) else: next_time = _norm(now.replace(year=now.year + 1, month=1, day=16)) self.tasks["congress"] = _norm(next_time.replace(hour=1, minute=30, second=0, microsecond=0)) def task_party_president(self): next_time = self.now if next_time.day == 15: pass elif next_time.day < 15: # now.day ∈[1;15) self.candidate_for_party_presidency() else: # now.day ∈ (15;31] self.candidate_for_party_presidency() next_time = next_time.replace(day=16) if self.now.month == 12: next_time = next_time.replace(year=self.now.year + 1, month=1) else: next_time = next_time.replace(month=self.now.month + 1) self.tasks["party_president"] = _norm(next_time.replace(day=16, hour=0, minute=1, second=0, microsecond=0)) def task_contribute_cc(self): if not self.now.weekday(): self.update_money() cc = (self.details.cc // self.tasks.get_default("contribute_cc")) * self.tasks.get_default("contribute_cc") self.contribute_cc_to_country(cc, self.details.citizenship) next_time = _norm((self.now + timedelta(days=7 - self.now.weekday())).replace(hour=2, minute=0, second=0)) self.tasks["contribute_cc"] = next_time def task_renew_houses(self): end_times = self.renew_houses() if end_times: self.tasks["renew_houses"] = _norm(min(end_times.values()) - timedelta(hours=24)) else: self.logger.warning("No houses found!") for q in range(1, self.tasks.get_default("renew_houses", 0) + 1): self.write_log(f"Buying and activating q{q}...") if not self.buy_and_activate_house(q): break end_times = self.check_house_durability() if end_times: next_time = _norm(min(end_times.values()) - timedelta(hours=24)) else: next_time = _norm(self.now + timedelta(hours=6)) self.tasks["renew_houses"] = next_time def task_spin_wheel_of_fortune(self): percents_to_lose = self.tasks.get_default("spin_wheel_of_fortune") self.update_citizen_info() if self.wheel_of_fortune and percents_to_lose: if isinstance(percents_to_lose, bool): percents_to_lose = 10 else: percents_to_lose = int(percents_to_lose) max_spend = ((self.details.cc * (percents_to_lose / 100)) // 100) * 100 max_cost = 0 next_cost = max_cost + 100 if max_cost else 500 current_sum = 0 while current_sum < max_spend: max_cost = next_cost next_cost += 100 current_sum += max_cost self.spin_wheel_of_fortune(max_cost) self.tasks["spin_wheel_of_fortune"] = _norm(self.now.replace(hour=0, minute=0, second=0) + timedelta(days=1)) def _setup_player_tasks(self, config: Dict[str, Any]): now = self.now self.tasks.set_default("random_sleep", config.get("random_sleep")) for _action in ["work", "train", "ot"]: # "fight", "epic_hunt" if getattr(self.config, _action): self.enable_task(_action, now, priority=True) for _action in ["congress", "party_president"]: if config.get(_action, True): self.enable_task(_action) for _action in ["wam", "employees"]: if getattr(self.config, _action) and not self.restricted_ip: _hour = 14 if not isinstance(config[_action], bool): try: _hour = abs(int(config[_action])) % 24 except ValueError: _hour = 14 self.tasks.set_default(f"{_action}_hour", _hour) self.enable_task(_action, _norm(now.replace(hour=_hour, minute=0, second=0, microsecond=0))) if config.get("buy_gold"): self.enable_task("buy_gold", _norm(now.replace(hour=23, minute=57, second=0, microsecond=0)), True) if config.get("contribute_cc", 0): self.tasks.set_default("contribute_cc", int(config.get("contribute_cc", 0))) self.enable_task("contribute_cc", _norm(now.replace(hour=2, minute=0, second=0))) if config.get("renew_houses"): self.tasks.set_default("renew_houses", config.get("renew_houses")) self.enable_task("renew_houses") if config.get("spin_wheel_of_fortune", False): spin_value = config.get("spin_wheel_of_fortune") percent = int(10 if isinstance(spin_value, bool) else spin_value) self.tasks.set_default("spin_wheel_of_fortune", percent) self.enable_task("spin_wheel_of_fortune") class PlayerBackgroundTasks(BasePlayer): _preset_pay_table: Dict[int, int] _default_pay_table: Dict[int, int] _hits_to_pay: Dict[int, int] _bg_task_queue: List[threading.Thread] def __init__(self, *args, **kwargs): self._preset_pay_table = {} self._default_pay_table = {1: 400, 2: 500, 3: 600} self._bg_task_queue = [] self._hits_to_pay = defaultdict(int) super().__init__(*args, **kwargs) def setup_tasks(self, config): super().setup_tasks(config) self._setup_player_bg_tasks(config) def signal_quit(self, sig_num, frame): self.set_locks() if self._hits_to_pay: self.write_log("Paying for hits, before quitting") for player_id, amount in self._hits_to_pay.items(): if amount: self.donate_money(player_id, amount, 1) self._hits_to_pay[player_id] = 0 for thread in self._bg_task_queue: if thread.is_alive(): self.write_log(f"Waiting on thread '{thread.name:^32}'") thread.join() return super().signal_quit(sig_num, frame) def signal_reload(self, sig_num, frame): for thread in self._bg_task_queue: self.write_log(f"Thread '{thread.name:^32}', alive {thread.is_alive()}") if self._hits_to_pay: self.write_log("Clearing hit table...") for player_id, amount in self._hits_to_pay.items(): if amount: self.donate_money(player_id, amount, 1) self._hits_to_pay[player_id] = 0 else: self._hits_to_pay.clear() return super().signal_reload(sig_num, frame) def thread_name(self, name: str) -> str: return f"{self.name}__{threading.active_count() - 1}__{name}" def background_task_start_battles(self, wars): def update_war_info(): rj = self._get_military_campaigns_json_list().json() if rj.get("countries"): if rj.get("battles"): return {battle_data.get("id"): Battle(battle_data) for battle_data in rj.get("battles").values()} return {} wars: Dict[str, Dict[str, Union[bool, List[int]]]] finished_war_ids: Set[int] = {*[]} war_data: Dict[str, Dict[str, Union[bool, List[int]]]] = wars war_ids: Set[int] = {int(war_id) for war_id in war_data.keys()} next_attack_time = self.now next_attack_time = _norm(next_attack_time.replace(minute=next_attack_time.minute // 5 * 5, second=0)) while not self.stop_threads.is_set(): try: attacked = False all_battles = update_war_info() running_wars = {b.war_id for b in all_battles.values()} if all_battles: for war_id in war_ids - finished_war_ids - running_wars: war = war_data[str(war_id)] war_regions = set(war.get("regions")) auto_attack = war.get("auto_attack") try: start_h = war.get("attack_time")[0] end_h = war.get("attack_time")[1] except (TypeError, IndexError): start_h = 21 end_h = 3 if start_h <= end_h: time_check = start_h <= self.now.hour < end_h else: time_check = start_h <= self.now.hour or self.now.hour < end_h status = self.get_war_status(war_id) if status.get("ended", False): finished_war_ids.add(war_id) continue if not status.get("can_attack"): continue if auto_attack or time_check: for reg in war_regions: if attacked: break if reg in status.get("regions", {}).keys(): reg_name = status.get("regions", {}).get(reg) self._post_wars_attack_region(war_id, reg, reg_name) self._report_action("MILITARY_QUEUE_ATTACK", f"Battle for *{reg_name}* queued") break if attacked: break war_ids -= finished_war_ids if attacked: next_attack_time = _norm(next_attack_time + timedelta(hours=1)) else: next_attack_time = _norm(next_attack_time + timedelta(minutes=5)) self.stop_threads.wait(utils.get_sleep_seconds(next_attack_time)) except Exception as e: self.report_error(f"Task error: start_battles {e.args}") def background_task_report_game_token_price(self): while not self.stop_threads.is_set(): try: _next_time = _norm(self.now.replace(second=0, microsecond=0)) if _next_time.minute < 50: _next_time = _norm(_next_time.replace(minute=(_next_time.minute // 10 + 1) * 10)) else: _next_time = _norm(_next_time.replace(minute=0) + timedelta(hours=1)) offers = self.get_game_token_offers() httpx.post("https://erep.lv/market/gametoken/add/", json=dict(all_offers=True, **offers)) self.stop_threads.wait(utils.get_sleep_seconds(_next_time)) except Exception as e: self.report_error(f"Task error: game_token {e}") def background_task_report_org_accounts(self, org_list: List[int] = None): if org_list is None: org_list = [] watch_organisations = org_list while not self.stop_threads.is_set(): try: next_report_time = _norm(self.now.replace(minute=0, second=0, microsecond=0)) while not self.stop_threads.is_set(): for organisation_id in watch_organisations: to_report = {"organisation_id": organisation_id} account = self.fetch_organisation_account(organisation_id) if account.get("ok"): to_report.update(**account) httpx.post("https://erep.lv/org/account/add/", json=to_report) next_report_time = _norm(next_report_time + timedelta(hours=1)) to_sleep = (next_report_time - self.now).total_seconds() self.stop_threads.wait(to_sleep if to_sleep > 0 else 0) except Exception as e: self.report_error(f"Task error: report_org_accounts {e}") def background_task_birthday_journey(self): priority = [ "anniversary_decoration", "permanent_energy_house", "energy_house", "energy_booster", "house_pool_bonus", "ground_vehicle_blueprint", "air_vehicle_blueprint", "air_damage_booster_50", "air_damage_booster_20", "gold", "energy_bars", "winter_treat", "trump_bomb", "big_bomb", "stinger_missile", "damage_booster_100", "overtime_points", "speed_booster_2", "air_deploy_rank_points_booster_20", "air_deploy_rank_points_booster_10", "air_deploy_influence_booster_20", "air_deploy_influence_booster_10", "deploy_size_booster_200", "deploy_size_booster_100", "ground_deploy_rank_points_booster_20", "ground_deploy_rank_points_booster_10", "ground_deploy_influence_booster_20", "ground_deploy_influence_booster_10", "vehicle_discharge_document", "fuel", ] try: node_map: Dict[int, Set[int]] = defaultdict(set) while not self.stop_threads.is_set(): quest_data = self.get_anniversary_quest_data() # can_claim_extra = quest_data.get("rewards", {}).get("canCollectExtra") if not node_map: for pair in quest_data["neighbors"]: node_map[pair["nodeId"]].add(pair["neighborId"]) node_map[pair["neighborId"]].add(pair["nodeId"]) visited_nodes: List[Tuple[int, datetime, str]] = [] if not quest_data.get("status", {}).get("progress"): self.start_unlocking_map_quest_node(30219) quest_data = self.get_anniversary_quest_data() if quest_data["status"]["progress"]: for node in quest_data["status"]["progress"].values(): visited_nodes.append((node["nodeId"], parse(node["finishUnlockTime"]), node["nodeStatus"])) visited_nodes.sort(key=lambda o: o[1]) visited_nodes.reverse() current_queue = [] available_nodes = {*[]} for node_id, finish_datetime, status in visited_nodes: if status in ("claimed", "unclaimed"): available_nodes.update(node_map[node_id]) # elif status == "unclaimed": # available_nodes.update(node_map[node_id]) # collection_data = self.collect_map_quest_node(node_id).json() # if collection_data.get("error"): # self.logger.error(collection_data.get("error", {}).get("message")) # claimable_node = quest_data.get("cities").get(str(node_id)) # if quest_data.get("status").get("inventory").get("miles") < claimable_node.get("miles"): # self.travel_to_region(713) # self.travel_to_residence() # else: # if can_claim_extra: # self.collect_map_quest_node(node_id, True) elif status == "unlocking": current_queue.append(finish_datetime) if not current_queue: available_nodes -= {nid for nid, _, __ in visited_nodes} if available_nodes: for kind in priority: for node_id in available_nodes: if kind in [r["type"] for r in quest_data["cities"][str(node_id)]["rewards"]]: self.start_unlocking_map_quest_node(node_id) quest_data = self.get_anniversary_quest_data() current_queue.append( parse(quest_data["status"]["progress"][str(node_id)]["finishUnlockTime"]) ) if current_queue: break else: continue break current_queue.sort() if current_queue: next_finish = min(current_queue) else: self.report_error("Task Birthday Journey: Empty queue!", sentry=False) return to_sleep = utils.get_sleep_seconds(next_finish) self.write_log( f"Task: 'Birthday Journey' sleeping until (eRep): {next_finish} (sleeping for {to_sleep}s)" ) self.stop_threads.wait(to_sleep if to_sleep > 0 else 0) except Exception as e: self.report_error(f"Task error: Birthday Journey {e}") def _setup_player_bg_tasks(self, config: Dict[str, Any]): if config.get("state_update_repeater", True): t = threading.Thread(target=self.state_update_repeater, name=self.thread_name("state_update_repeater")) t.start() self._bg_task_queue.append(t) for bg_task in ["start_battles", "game_tokens", "org_fetcher", "birthday_journey"]: # "clear_bhs", if config.get(bg_task): try: _task_fn = getattr(self, f"background_task_{bg_task}") except AttributeError: self.report_error(f"Task '{bg_task}' has no task's function 'background_task_{bg_task}' defined!") raise NotImplementedError(f"Task 'background_task_{bg_task}' not implemented!") args = (config.get(bg_task),) if bg_task in ["start_battles", "clear_bhs", "org_fetcher"] else tuple() t = threading.Thread(target=_task_fn, args=args, name=self.thread_name(bg_task)) t.start() self._bg_task_queue.append(t) @property def as_dict(self): ret = super().as_dict ret.update(hits_payment_aggregation=self._hits_to_pay) return ret def get_war_status(self, war_id: int) -> Dict[str, Union[bool, Dict[int, str]]]: r = self._get_wars_show(war_id) html = r.text ret = {} reg_re = re.compile(rf'data-war-id="{war_id}" data-region-id="(\d+)" data-region-name="([- \w]+)"') if reg_re.findall(html): ret.update(regions={}, can_attack=True) for reg in reg_re.findall(html): ret["regions"].update({int(reg[0]): reg[1]}) elif re.search( r'Join', html, ): battle_id = re.search( r'Join', html, ).group(1) ret.update(can_attack=False, battle_id=int(battle_id)) elif re.search(r"This war is no longer active.", html): ret.update(can_attack=False, ended=True) else: ret.update(can_attack=False) return ret class Player(PlayerTasks, PlayerBackgroundTasks, BasePlayer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) signal.signal(signal.SIGINT, self.signal_quit) signal.signal(signal.SIGTERM, self.signal_quit) signal.signal(signal.SIGABRT, self.signal_quit) if sys.platform.startswith("win"): signal.signal(signal.SIGBREAK, self.signal_quit) else: signal.signal(signal.SIGUSR1, self.signal_reload) signal.signal(signal.SIGUSR2, self.signal_config_reload) signal.signal(signal.SIGHUP, self.signal_quit) pid = os.getpid() for action, _signal in [("UPDATE", "SIGUSR1"), ("RELOAD", "SIGUSR2"), ("QUIT", "SIGINT")]: self.logger.debug( f"To {action:^6}, run: 'kill -{getattr(signal, _signal):<2} {pid}' or 'kill -{_signal:<7} {pid}'" ) def setup_tasks(self, config: Dict[str, Any]): super().setup_tasks(config) def check_house_durability(self) -> Dict[int, datetime]: ret = {} for q, dt in super().check_house_durability().items(): if q <= self.tasks.get_default("renew_houses", 0): ret.update({q: dt}) return ret def to_json(self, indent: bool = False) -> str: return utils.json_dumps(self.as_dict, indent=False, sort_keys=True) def setup_citizen(self, config): super().setup_citizen(config) self.tasks.set_default("cheap_medals", config.get("cheap_medals", False))