336 lines
12 KiB
Python
336 lines
12 KiB
Python
import datetime
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import unicodedata
|
|
import warnings
|
|
from base64 import b64encode
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Union
|
|
|
|
import pytz
|
|
import requests
|
|
from requests import Response
|
|
|
|
from . import __version__, constants
|
|
|
|
try:
|
|
import simplejson as json
|
|
except ImportError:
|
|
import json
|
|
|
|
__all__ = [
|
|
'VERSION', 'calculate_hit', 'date_from_eday', 'eday_from_date', 'deprecation', 'get_final_hit_dmg', 'write_file',
|
|
'get_air_hit_dmg_value', 'get_file', 'get_ground_hit_dmg_value', 'get_sleep_seconds', 'good_timedelta', 'slugify',
|
|
'interactive_sleep', 'json', 'localize_dt', 'localize_timestamp', 'normalize_html_json', 'now', 'silent_sleep',
|
|
'json_decode_object_hook', 'json_load', 'json_loads', 'json_dump', 'json_dumps', 'b64json', 'ErepublikJSONEncoder',
|
|
]
|
|
|
|
VERSION: str = __version__
|
|
|
|
|
|
def now() -> datetime.datetime:
|
|
return datetime.datetime.now(constants.erep_tz).replace(microsecond=0)
|
|
|
|
|
|
def localize_timestamp(timestamp: int) -> datetime.datetime:
|
|
return datetime.datetime.fromtimestamp(timestamp, constants.erep_tz)
|
|
|
|
|
|
def localize_dt(dt: Union[datetime.date, datetime.datetime]) -> datetime.datetime:
|
|
if isinstance(dt, datetime.datetime):
|
|
return constants.erep_tz.localize(dt)
|
|
elif isinstance(dt, datetime.date):
|
|
return constants.erep_tz.localize(datetime.datetime.combine(dt, datetime.time(0, 0, 0)))
|
|
else:
|
|
raise TypeError(f"Argument dt must be and instance of datetime.datetime or datetime.date not {type(dt)}")
|
|
|
|
|
|
def good_timedelta(dt: datetime.datetime, td: datetime.timedelta) -> datetime.datetime:
|
|
"""Normalize timezone aware datetime object after timedelta to correct jumps over DST switches
|
|
|
|
:param dt: Timezone aware datetime object
|
|
:type dt: datetime.datetime
|
|
:param td: timedelta object
|
|
:type td: datetime.timedelta
|
|
:return: datetime object with correct timezone when jumped over DST
|
|
:rtype: datetime.datetime
|
|
"""
|
|
return constants.erep_tz.normalize(dt + td)
|
|
|
|
|
|
def eday_from_date(date: Union[datetime.date, datetime.datetime] = None) -> int:
|
|
if date is None:
|
|
date = now()
|
|
if isinstance(date, datetime.date):
|
|
date = datetime.datetime.combine(date, datetime.time(0, 0, 0))
|
|
return (date - datetime.datetime(2007, 11, 20, 0, 0, 0)).days
|
|
|
|
|
|
def date_from_eday(eday: int) -> datetime.date:
|
|
return localize_dt(datetime.date(2007, 11, 20)) + datetime.timedelta(days=eday)
|
|
|
|
|
|
def get_sleep_seconds(time_until: datetime.datetime) -> int:
|
|
""" time_until aware datetime object Wrapper for sleeping until """
|
|
sleep_seconds = int((time_until - now()).total_seconds())
|
|
return sleep_seconds if sleep_seconds > 0 else 0
|
|
|
|
|
|
def interactive_sleep(sleep_seconds: int):
|
|
while sleep_seconds > 0:
|
|
seconds = sleep_seconds
|
|
if (seconds - 1) // 1800:
|
|
seconds = seconds % 1800 if seconds % 1800 else 1800
|
|
elif (seconds - 1) // 300:
|
|
seconds = seconds % 300 if seconds % 300 else 300
|
|
elif (seconds - 1) // 60:
|
|
seconds = seconds % 60 if seconds % 60 else 60
|
|
# elif (seconds - 1) // 30:
|
|
# seconds = seconds % 30 if seconds % 30 else 30
|
|
else:
|
|
seconds = 1
|
|
sys.stdout.write(f"\rSleeping for {sleep_seconds:4} more seconds")
|
|
sys.stdout.flush()
|
|
time.sleep(seconds)
|
|
sleep_seconds -= seconds
|
|
sys.stdout.write("\r")
|
|
|
|
|
|
silent_sleep = time.sleep
|
|
|
|
|
|
# def _write_log(msg, timestamp: bool = True, should_print: bool = False):
|
|
# erep_time_now = now()
|
|
# txt = f"[{erep_time_now.strftime('%F %T')}] {msg}" if timestamp else msg
|
|
# if not os.path.isdir('log'):
|
|
# os.mkdir('log')
|
|
# with open(f'log/{erep_time_now.strftime("%F")}.log', 'a', encoding='utf-8') as f:
|
|
# f.write(f'{txt}\n')
|
|
# if should_print:
|
|
# print(txt)
|
|
#
|
|
#
|
|
# def write_interactive_log(*args, **kwargs):
|
|
# kwargs.pop('should_print', None)
|
|
# _write_log(should_print=True, *args, **kwargs)
|
|
#
|
|
#
|
|
# def write_silent_log(*args, **kwargs):
|
|
# kwargs.pop('should_print', None)
|
|
# _write_log(should_print=False, *args, **kwargs)
|
|
|
|
|
|
def get_file(filepath: str) -> str:
|
|
file = Path(filepath)
|
|
if file.exists():
|
|
if file.is_dir():
|
|
return str(file / 'new_file.txt')
|
|
else:
|
|
version = 1
|
|
try:
|
|
version = int(file.suffix[1:]) + 1
|
|
basename = file.stem
|
|
except ValueError:
|
|
basename = file.name
|
|
version += 1
|
|
|
|
full_name = file.parent / f"{basename}.{version}"
|
|
while full_name.exists():
|
|
version += 1
|
|
full_name = file.parent / f"{basename}.{version}"
|
|
return str(full_name)
|
|
else:
|
|
os.makedirs(file.parent, exist_ok=True)
|
|
return str(file)
|
|
|
|
|
|
def write_file(filename: str, content: str) -> int:
|
|
filename = get_file(filename)
|
|
with open(filename, 'ab') as f:
|
|
ret = f.write(content.encode("utf-8"))
|
|
return ret
|
|
|
|
|
|
def normalize_html_json(js: str) -> str:
|
|
js = re.sub(r' \'(.*?)\'', lambda a: f'"{a.group(1)}"', js)
|
|
js = re.sub(r'(\d\d):(\d\d):(\d\d)', r'\1\2\3', js)
|
|
js = re.sub(r'([{\s,])(\w+)(:)(?!"})', r'\1"\2"\3', js)
|
|
js = re.sub(r',\s*}', '}', js)
|
|
return js
|
|
|
|
|
|
def slugify(value, allow_unicode=False) -> str:
|
|
"""
|
|
Function copied from Django2.2.1 django.utils.text.slugify
|
|
Convert to ASCII if 'allow_unicode' is False. Convert spaces to hyphens.
|
|
Remove characters that aren't alphanumerics, underscores, or hyphens.
|
|
Convert to lowercase. Also strip leading and trailing whitespace.
|
|
"""
|
|
value = str(value)
|
|
if allow_unicode:
|
|
value = unicodedata.normalize('NFKC', value)
|
|
else:
|
|
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
|
|
value = re.sub(r'[^\w\s-]', '_', value).strip().lower()
|
|
return re.sub(r'[-\s]+', '-', value)
|
|
|
|
|
|
def calculate_hit(strength: float, rang: int, tp: bool, elite: bool, ne: bool, booster: int = 0,
|
|
weapon: int = 200, is_deploy: bool = False) -> Decimal:
|
|
dec = 3 if is_deploy else 0
|
|
base_str = (1 + Decimal(str(round(strength, 3))) / 400)
|
|
base_rnk = (1 + Decimal(str(rang / 5)))
|
|
base_wpn = (1 + Decimal(str(weapon / 100)))
|
|
dmg = 10 * base_str * base_rnk * base_wpn
|
|
|
|
dmg = get_final_hit_dmg(dmg, rang, tp=tp, elite=elite, ne=ne, booster=booster)
|
|
return Decimal(round(dmg, dec))
|
|
|
|
|
|
def get_ground_hit_dmg_value(citizen_id: int, natural_enemy: bool = False, true_patriot: bool = False,
|
|
booster: int = 0, weapon_power: int = 200) -> Decimal:
|
|
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{citizen_id}').json()
|
|
rang = r['military']['militaryData']['ground']['rankNumber']
|
|
strength = r['military']['militaryData']['ground']['strength']
|
|
elite = r['citizenAttributes']['level'] > 100
|
|
if natural_enemy:
|
|
true_patriot = True
|
|
|
|
return calculate_hit(strength, rang, true_patriot, elite, natural_enemy, booster, weapon_power)
|
|
|
|
|
|
def get_air_hit_dmg_value(citizen_id: int, natural_enemy: bool = False, true_patriot: bool = False, booster: int = 0,
|
|
weapon_power: int = 0) -> Decimal:
|
|
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{citizen_id}').json()
|
|
rang = r['military']['militaryData']['aircraft']['rankNumber']
|
|
elite = r['citizenAttributes']['level'] > 100
|
|
return calculate_hit(0, rang, true_patriot, elite, natural_enemy, booster, weapon_power)
|
|
|
|
|
|
def get_final_hit_dmg(base_dmg: Union[Decimal, float, str], rang: int,
|
|
tp: bool = False, elite: bool = False, ne: bool = False, booster: int = 0) -> Decimal:
|
|
dmg = Decimal(str(base_dmg))
|
|
|
|
if elite:
|
|
dmg = dmg * 11 / 10
|
|
if tp and rang >= 70:
|
|
dmg = dmg * (1 + Decimal((rang - 69) / 10))
|
|
dmg = dmg * (100 + booster) / 100
|
|
if ne:
|
|
dmg = dmg * 11 / 10
|
|
return Decimal(dmg)
|
|
|
|
|
|
def deprecation(message):
|
|
warnings.warn(message, DeprecationWarning, stacklevel=2)
|
|
|
|
|
|
# def wait_for_lock(function):
|
|
# def wrapper(instance, *args, **kwargs):
|
|
# if not instance.concurrency_available.wait(600):
|
|
# e = 'Concurrency not freed in 10min!'
|
|
# instance.write_log(e)
|
|
# if instance.debug:
|
|
# instance.report_error(e)
|
|
# return None
|
|
# else:
|
|
# instance.concurrency_available.clear()
|
|
# try:
|
|
# ret = function(instance, *args, **kwargs)
|
|
# except Exception as e:
|
|
# instance.concurrency_available.set()
|
|
# raise e
|
|
# instance.concurrency_available.set()
|
|
# return ret
|
|
#
|
|
# return wrapper
|
|
|
|
|
|
def json_decode_object_hook(
|
|
o: Union[Dict[str, Any], List[Any], int, float, str]
|
|
) -> Union[Dict[str, Any], List[Any], int, float, str, datetime.date, datetime.datetime, datetime.timedelta]:
|
|
""" Convert classes.ErepublikJSONEncoder datetime, date and timedelta to their python objects
|
|
|
|
:param o:
|
|
:return: Union[Dict[str, Any], List[Any], int, float, str, datetime.date, datetime.datetime, datetime.timedelta]
|
|
"""
|
|
if o.get('__type__'):
|
|
_type = o.get('__type__')
|
|
if _type == 'datetime':
|
|
dt = datetime.datetime.strptime(f"{o['date']} {o['time']}", '%Y-%m-%d %H:%M:%S')
|
|
if o.get('tzinfo'):
|
|
dt = pytz.timezone(o['tzinfo']).localize(dt)
|
|
return dt
|
|
elif _type == 'date':
|
|
dt = datetime.datetime.strptime(f"{o['date']}", '%Y-%m-%d')
|
|
return dt.date()
|
|
elif _type == 'timedelta':
|
|
return datetime.timedelta(seconds=o['total_seconds'])
|
|
return o
|
|
|
|
|
|
def json_load(f, **kwargs):
|
|
kwargs.update(object_hook=json_decode_object_hook)
|
|
return json.load(f, **kwargs)
|
|
|
|
|
|
def json_loads(s: str, **kwargs):
|
|
kwargs.update(object_hook=json_decode_object_hook)
|
|
return json.loads(s, **kwargs)
|
|
|
|
|
|
def json_dump(obj, fp, *args, **kwargs):
|
|
if not kwargs.get('cls'):
|
|
kwargs.update(cls=ErepublikJSONEncoder)
|
|
return json.dump(obj, fp, *args, **kwargs)
|
|
|
|
|
|
def json_dumps(obj, *args, **kwargs):
|
|
if not kwargs.get('cls'):
|
|
kwargs.update(cls=ErepublikJSONEncoder)
|
|
return json.dumps(obj, *args, **kwargs)
|
|
|
|
|
|
def b64json(obj: Union[Dict[str, Union[int, List[str]]], List[str]]):
|
|
if isinstance(obj, list):
|
|
return b64encode(json.dumps(obj).encode('utf-8')).decode('utf-8')
|
|
elif isinstance(obj, (int, str)):
|
|
return obj
|
|
elif isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
obj[k] = b64json(v)
|
|
else:
|
|
from .classes import ErepublikException
|
|
raise ErepublikException(f'Unhandled object type! obj is {type(obj)}')
|
|
return b64encode(json.dumps(obj).encode('utf-8')).decode('utf-8')
|
|
|
|
|
|
class ErepublikJSONEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
from erepublik.citizen import Citizen
|
|
if isinstance(o, Decimal):
|
|
return float(f"{o:.02f}")
|
|
elif isinstance(o, datetime.datetime):
|
|
return dict(__type__='datetime', date=o.strftime("%Y-%m-%d"), time=o.strftime("%H:%M:%S"),
|
|
tzinfo=str(o.tzinfo) if o.tzinfo else None)
|
|
elif isinstance(o, datetime.date):
|
|
return dict(__type__='date', date=o.strftime("%Y-%m-%d"))
|
|
elif isinstance(o, datetime.timedelta):
|
|
return dict(__type__='timedelta', days=o.days, seconds=o.seconds,
|
|
microseconds=o.microseconds, total_seconds=o.total_seconds())
|
|
elif isinstance(o, Response):
|
|
return dict(headers=dict(o.__dict__['headers']), url=o.url, text=o.text, status_code=o.status_code)
|
|
elif hasattr(o, 'as_dict'):
|
|
return o.as_dict
|
|
elif isinstance(o, set):
|
|
return list(o)
|
|
elif isinstance(o, Citizen):
|
|
return o.to_json()
|
|
try:
|
|
return super().default(o)
|
|
except Exception as e: # noqa
|
|
return 'Object is not JSON serializable'
|