2021-07-20 14:05:54 +03:00

300 lines
11 KiB
Python

import datetime
import os
import re
import sys
import time
import unicodedata
import warnings
from base64 import b64encode
from decimal import Decimal
from logging import Logger
from pathlib import Path
from typing import Any, Dict, List, Union
import pytz
import requests
from requests import Response
from erepublik import __version__, constants
try:
import simplejson as json
except ImportError:
import json
__all__ = [
'VERSION', 'calculate_hit', 'date_from_eday', 'eday_from_date', 'deprecation', 'get_final_hit_dmg', 'write_file',
'get_air_hit_dmg_value', 'get_file', 'get_ground_hit_dmg_value', 'get_sleep_seconds', 'good_timedelta', 'slugify',
'interactive_sleep', 'json', 'localize_dt', 'localize_timestamp', 'normalize_html_json', 'now', 'silent_sleep',
'json_decode_object_hook', 'json_load', 'json_loads', 'json_dump', 'json_dumps', 'b64json', 'ErepublikJSONEncoder',
]
VERSION: str = __version__
def now() -> datetime.datetime:
return datetime.datetime.now(constants.erep_tz).replace(microsecond=0)
def localize_timestamp(timestamp: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(timestamp, constants.erep_tz)
def localize_dt(dt: Union[datetime.date, datetime.datetime]) -> datetime.datetime:
if isinstance(dt, datetime.datetime):
return constants.erep_tz.localize(dt)
elif isinstance(dt, datetime.date):
return constants.erep_tz.localize(datetime.datetime.combine(dt, datetime.time(0, 0, 0)))
else:
raise TypeError(f"Argument dt must be and instance of datetime.datetime or datetime.date not {type(dt)}")
def good_timedelta(dt: datetime.datetime, td: datetime.timedelta) -> datetime.datetime:
"""Normalize timezone aware datetime object after timedelta to correct jumps over DST switches
:param dt: Timezone aware datetime object
:type dt: datetime.datetime
:param td: timedelta object
:type td: datetime.timedelta
:return: datetime object with correct timezone when jumped over DST
:rtype: datetime.datetime
"""
return constants.erep_tz.normalize(dt + td)
def eday_from_date(date: Union[datetime.date, datetime.datetime] = None) -> int:
if date is None:
date = now()
if isinstance(date, datetime.date):
date = datetime.datetime.combine(date, datetime.time(0, 0, 0))
return (date - datetime.datetime(2007, 11, 20, 0, 0, 0)).days
def date_from_eday(eday: int) -> datetime.date:
return localize_dt(datetime.date(2007, 11, 20)) + datetime.timedelta(days=eday)
def get_sleep_seconds(time_until: datetime.datetime) -> int:
""" time_until aware datetime object Wrapper for sleeping until """
sleep_seconds = int((time_until - now()).total_seconds())
return sleep_seconds if sleep_seconds > 0 else 0
def interactive_sleep(sleep_seconds: int):
while sleep_seconds > 0:
seconds = sleep_seconds
if (seconds - 1) // 1800:
seconds = seconds % 1800 if seconds % 1800 else 1800
elif (seconds - 1) // 300:
seconds = seconds % 300 if seconds % 300 else 300
elif (seconds - 1) // 60:
seconds = seconds % 60 if seconds % 60 else 60
# elif (seconds - 1) // 30:
# seconds = seconds % 30 if seconds % 30 else 30
else:
seconds = 1
sys.stdout.write(f"\rSleeping for {sleep_seconds:4} more seconds")
sys.stdout.flush()
time.sleep(seconds)
sleep_seconds -= seconds
sys.stdout.write("\r")
silent_sleep = time.sleep
def get_file(filepath: str) -> str:
file = Path(filepath)
if file.exists():
if file.is_dir():
return str(file / 'new_file.txt')
else:
version = 1
try:
version = int(file.suffix[1:]) + 1
basename = file.stem
except ValueError:
basename = file.name
version += 1
full_name = file.parent / f"{basename}.{version}"
while full_name.exists():
version += 1
full_name = file.parent / f"{basename}.{version}"
return str(full_name)
else:
os.makedirs(file.parent, exist_ok=True)
return str(file)
def write_file(filename: str, content: str) -> int:
filename = get_file(filename)
with open(filename, 'ab') as f:
ret = f.write(content.encode("utf-8"))
return ret
def normalize_html_json(js: str) -> str:
js = re.sub(r' \'(.*?)\'', lambda a: f'"{a.group(1)}"', js)
js = re.sub(r'(\d\d):(\d\d):(\d\d)', r'\1\2\3', js)
js = re.sub(r'([{\s,])(\w+)(:)(?!"})', r'\1"\2"\3', js)
js = re.sub(r',\s*}', '}', js)
return js
def slugify(value, allow_unicode=False) -> str:
"""
Function copied from Django2.2.1 django.utils.text.slugify
Convert to ASCII if 'allow_unicode' is False. Convert spaces to hyphens.
Remove characters that aren't alphanumerics, underscores, or hyphens.
Convert to lowercase. Also strip leading and trailing whitespace.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '_', value).strip().lower()
return re.sub(r'[-\s]+', '-', value)
def calculate_hit(strength: float, rang: int, tp: bool, elite: bool, ne: bool, booster: int = 0,
weapon: int = 200, is_deploy: bool = False) -> Decimal:
dec = 3 if is_deploy else 0
base_str = (1 + Decimal(str(round(strength, 3))) / 400)
base_rnk = (1 + Decimal(str(rang / 5)))
base_wpn = (1 + Decimal(str(weapon / 100)))
dmg = 10 * base_str * base_rnk * base_wpn
dmg = get_final_hit_dmg(dmg, rang, tp=tp, elite=elite, ne=ne, booster=booster)
return Decimal(round(dmg, dec))
def get_ground_hit_dmg_value(citizen_id: int, natural_enemy: bool = False, true_patriot: bool = False,
booster: int = 0, weapon_power: int = 200) -> Decimal:
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{citizen_id}').json()
rang = r['military']['militaryData']['ground']['rankNumber']
strength = r['military']['militaryData']['ground']['strength']
elite = r['citizenAttributes']['level'] > 100
if natural_enemy:
true_patriot = True
return calculate_hit(strength, rang, true_patriot, elite, natural_enemy, booster, weapon_power)
def get_air_hit_dmg_value(citizen_id: int, natural_enemy: bool = False, true_patriot: bool = False, booster: int = 0,
weapon_power: int = 0) -> Decimal:
r = requests.get(f'https://www.erepublik.com/en/main/citizen-profile-json/{citizen_id}').json()
rang = r['military']['militaryData']['aircraft']['rankNumber']
elite = r['citizenAttributes']['level'] > 100
return calculate_hit(0, rang, true_patriot, elite, natural_enemy, booster, weapon_power)
def get_final_hit_dmg(base_dmg: Union[Decimal, float, str], rang: int,
tp: bool = False, elite: bool = False, ne: bool = False, booster: int = 0) -> Decimal:
dmg = Decimal(str(base_dmg))
if elite:
dmg = dmg * 11 / 10
if tp and rang >= 70:
dmg = dmg * (1 + Decimal((rang - 69) / 10))
dmg = dmg * (100 + booster) / 100
if ne:
dmg = dmg * 11 / 10
return Decimal(dmg)
def deprecation(message):
warnings.warn(message, DeprecationWarning, stacklevel=2)
def json_decode_object_hook(
o: Union[Dict[str, Any], List[Any], int, float, str]
) -> Union[Dict[str, Any], List[Any], int, float, str, datetime.date, datetime.datetime, datetime.timedelta]:
""" Convert classes.ErepublikJSONEncoder datetime, date and timedelta to their python objects
:param o:
:return: Union[Dict[str, Any], List[Any], int, float, str, datetime.date, datetime.datetime, datetime.timedelta]
"""
if o.get('__type__'):
_type = o.get('__type__')
if _type == 'datetime':
dt = datetime.datetime.strptime(f"{o['date']} {o['time']}", '%Y-%m-%d %H:%M:%S')
if o.get('tzinfo'):
dt = pytz.timezone(o['tzinfo']).localize(dt)
return dt
elif _type == 'date':
dt = datetime.datetime.strptime(f"{o['date']}", '%Y-%m-%d')
return dt.date()
elif _type == 'timedelta':
return datetime.timedelta(seconds=o['total_seconds'])
return o
def json_load(f, **kwargs):
# kwargs.update(object_hook=json_decode_object_hook)
return json.load(f, **kwargs)
def json_loads(s: str, **kwargs):
# kwargs.update(object_hook=json_decode_object_hook)
return json.loads(s, **kwargs)
def json_dump(obj, fp, *args, **kwargs):
if not kwargs.get('cls'):
kwargs.update(cls=ErepublikJSONEncoder)
return json.dump(obj, fp, *args, **kwargs)
def json_dumps(obj, *args, **kwargs):
if not kwargs.get('cls'):
kwargs.update(cls=ErepublikJSONEncoder)
return json.dumps(obj, *args, **kwargs)
def b64json(obj: Union[Dict[str, Union[int, List[str]]], List[str]]):
if isinstance(obj, list):
return b64encode(json.dumps(obj, separators=(',', ':')).encode('utf-8')).decode('utf-8')
elif isinstance(obj, (int, str)):
return obj
elif isinstance(obj, dict):
for k, v in obj.items():
obj[k] = b64json(v)
else:
from .classes import ErepublikException
raise ErepublikException(f'Unhandled object type! obj is {type(obj)}')
return b64encode(json.dumps(obj, separators=(',', ':')).encode('utf-8')).decode('utf-8')
class ErepublikJSONEncoder(json.JSONEncoder):
def default(self, o):
try:
from erepublik.citizen import Citizen
if isinstance(o, Decimal):
return float(f"{o:.02f}")
elif isinstance(o, datetime.datetime):
return dict(__type__='datetime', date=o.strftime("%Y-%m-%d"), time=o.strftime("%H:%M:%S"),
tzinfo=str(o.tzinfo) if o.tzinfo else None)
elif isinstance(o, datetime.date):
return dict(__type__='date', date=o.strftime("%Y-%m-%d"))
elif isinstance(o, datetime.timedelta):
return dict(__type__='timedelta', days=o.days, seconds=o.seconds,
microseconds=o.microseconds, total_seconds=o.total_seconds())
elif isinstance(o, Response):
return dict(headers=dict(o.__dict__['headers']), url=o.url, text=o.text, status_code=o.status_code)
elif hasattr(o, 'as_dict'):
return o.as_dict
elif isinstance(o, set):
return list(o)
elif isinstance(o, Citizen):
return o.to_json()
elif isinstance(o, Logger):
return str(o)
elif hasattr(o, '__dict__'):
return o.__dict__
else:
return super().default(o)
except Exception as e: # noqa
return str(e)