Source code for bimmer_connected.utils

"""General utils and base classes used in the library."""

import datetime
import inspect
import json
import logging
import pathlib
import time
import traceback
from enum import Enum
from typing import TYPE_CHECKING, Dict, List, Optional

from bimmer_connected.models import AnonymizedResponse

    from typing import Callable, TypeVar

    from typing_extensions import ParamSpec

    _T = TypeVar("_T")
    _R = TypeVar("_R")
    _P = ParamSpec("_P")
_LOGGER = logging.getLogger(__name__)

JSON_IGNORED_KEYS = ["account", "_account", "vehicle", "_vehicle", "status", "remote_services"]

[docs]def get_class_property_names(obj: object): """Return the names of all properties of a class.""" return [p[0] for p in inspect.getmembers(type(obj), inspect.isdatadescriptor) if not p[0].startswith("_")]
[docs]def parse_datetime(date_str: str) -> Optional[datetime.datetime]: """Convert a time string into datetime.""" if not date_str: return None date_formats = ["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"] for date_format in date_formats: try: # Parse datetimes using `time.strptime` to allow running in some embedded python interpreters. # time_struct = time.strptime(date_str, date_format) parsed = datetime.datetime(*(time_struct[0:6])) if time_struct.tm_gmtoff and time_struct.tm_gmtoff != 0: parsed = parsed - datetime.timedelta(seconds=time_struct.tm_gmtoff) parsed = parsed.replace(tzinfo=datetime.timezone.utc) return parsed except ValueError: pass _LOGGER.error("unable to parse '%s' using %s", date_str, date_formats) return None
[docs]class MyBMWJSONEncoder(json.JSONEncoder): """JSON Encoder that handles data classes, properties and additional data types."""
[docs] def default(self, o): # noqa: D102 if isinstance(o, (datetime.datetime,, datetime.time)): return o.isoformat() if not isinstance(o, Enum) and hasattr(o, "__dict__") and isinstance(o.__dict__, Dict): retval: Dict = o.__dict__ retval.update({p: getattr(o, p) for p in get_class_property_names(o) if p not in JSON_DEPRECATED_KEYS}) return {k: v for k, v in retval.items() if k not in JSON_IGNORED_KEYS} return str(o)
[docs]def deprecated(replacement: Optional[str] = None) -> "Callable[[Callable[_P, _R]], Callable[_P, _R | None]]": """Mark a function or property as deprecated.""" def decorator(func: "Callable[_P, _R]") -> "Callable[_P, _R | None]": def _func_wrapper(*args: "_P.args", **kwargs: "_P.kwargs") -> "_R | None": replacement_text = f" Please change to '{replacement}'." if replacement else "" stack = traceback.extract_stack()[-2] _LOGGER.warning( "DeprecationWarning:%s:%s: '%s' is deprecated.%s", stack.filename, stack.lineno, func.__qualname__, replacement_text, ) return func(*args, **kwargs) return _func_wrapper return decorator
[docs]def to_camel_case(input_str: str) -> str: """Convert SNAKE_CASE or snake_case to camelCase.""" retval = "" flag_upper = False for curr in input_str.lower(): if not curr.isalnum(): if curr == "_": flag_upper = True continue retval = retval + (curr.upper() if flag_upper else curr) flag_upper = False return retval
[docs]def log_response_store_to_file(response_store: List[AnonymizedResponse], logfile_path: pathlib.Path) -> None: """Log all responses to files.""" for response in response_store: output_path = logfile_path / response.filename content = response.content with open(output_path, "w", encoding="UTF-8") as logfile: if output_path.suffix == ".json" or not isinstance(content, str): json.dump(content or [], logfile, indent=4, sort_keys=True) else: logfile.write(content or "NO CONTENT")