# Copyright 2013-2014 The Meson development team

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import enum
import os
import io
import sys
import time
import platform
import shlex
import subprocess
import shutil
import typing as T
from contextlib import contextmanager
from pathlib import Path

if T.TYPE_CHECKING:
    from ._typing import StringProtocol, SizedStringProtocol

    from .mparser import BaseNode

    TV_Loggable = T.Union[str, 'AnsiDecorator', StringProtocol]
    TV_LoggableList = T.List[TV_Loggable]

"""This is (mostly) a standalone module used to write logging
information about Meson runs. Some output goes to screen,
some to logging dir and some goes to both."""

def is_windows() -> bool:
    platname = platform.system().lower()
    return platname == 'windows'

def _windows_ansi() -> bool:
    # windll only exists on windows, so mypy will get mad
    from ctypes import windll, byref  # type: ignore
    from ctypes.wintypes import DWORD

    kernel = windll.kernel32
    stdout = kernel.GetStdHandle(-11)
    mode = DWORD()
    if not kernel.GetConsoleMode(stdout, byref(mode)):
        return False
    # ENABLE_VIRTUAL_TERMINAL_PROCESSING == 0x4
    # If the call to enable VT processing fails (returns 0), we fallback to
    # original behavior
    return bool(kernel.SetConsoleMode(stdout, mode.value | 0x4) or os.environ.get('ANSICON'))

def colorize_console() -> bool:
    _colorize_console = getattr(sys.stdout, 'colorize_console', None)  # type: bool
    if _colorize_console is not None:
        return _colorize_console

    try:
        if is_windows():
            _colorize_console = os.isatty(sys.stdout.fileno()) and _windows_ansi()
        else:
            _colorize_console = os.isatty(sys.stdout.fileno()) and os.environ.get('TERM', 'dumb') != 'dumb'
    except Exception:
        _colorize_console = False

    sys.stdout.colorize_console = _colorize_console  # type: ignore[attr-defined]
    return _colorize_console

def setup_console() -> None:
    # on Windows, a subprocess might call SetConsoleMode() on the console
    # connected to stdout and turn off ANSI escape processing. Call this after
    # running a subprocess to ensure we turn it on again.
    if is_windows():
        try:
            delattr(sys.stdout, 'colorize_console')
        except AttributeError:
            pass

log_dir = None               # type: T.Optional[str]
log_file = None              # type: T.Optional[T.TextIO]
log_fname = 'meson-log.txt'  # type: str
log_depth = []               # type: T.List[str]
log_timestamp_start = None   # type: T.Optional[float]
log_fatal_warnings = False   # type: bool
log_disable_stdout = False   # type: bool
log_errors_only = False      # type: bool
_in_ci = 'CI' in os.environ  # type: bool
_logged_once = set()         # type: T.Set[T.Tuple[str, ...]]
log_warnings_counter = 0     # type: int
log_pager: T.Optional['subprocess.Popen'] = None

def disable() -> None:
    global log_disable_stdout  # pylint: disable=global-statement
    log_disable_stdout = True

def enable() -> None:
    global log_disable_stdout  # pylint: disable=global-statement
    log_disable_stdout = False

def set_quiet() -> None:
    global log_errors_only  # pylint: disable=global-statement
    log_errors_only = True

def set_verbose() -> None:
    global log_errors_only  # pylint: disable=global-statement
    log_errors_only = False

def initialize(logdir: str, fatal_warnings: bool = False) -> None:
    global log_dir, log_file, log_fatal_warnings  # pylint: disable=global-statement
    log_dir = logdir
    log_file = open(os.path.join(logdir, log_fname), 'w', encoding='utf-8')
    log_fatal_warnings = fatal_warnings

def set_timestamp_start(start: float) -> None:
    global log_timestamp_start  # pylint: disable=global-statement
    log_timestamp_start = start

def shutdown() -> T.Optional[str]:
    global log_file  # pylint: disable=global-statement
    if log_file is not None:
        path = log_file.name
        exception_around_goer = log_file
        log_file = None
        exception_around_goer.close()
        return path
    stop_pager()
    return None

class AnsiDecorator:
    plain_code = "\033[0m"

    def __init__(self, text: str, code: str, quoted: bool = False):
        self.text = text
        self.code = code
        self.quoted = quoted

    def get_text(self, with_codes: bool) -> str:
        text = self.text
        if with_codes and self.code:
            text = self.code + self.text + AnsiDecorator.plain_code
        if self.quoted:
            text = f'"{text}"'
        return text

    def __len__(self) -> int:
        return len(self.text)

    def __str__(self) -> str:
        return self.get_text(colorize_console())

class AnsiText:
    def __init__(self, *args: 'SizedStringProtocol'):
        self.args = args

    def __len__(self) -> int:
        return sum(len(x) for x in self.args)

    def __str__(self) -> str:
        return ''.join(str(x) for x in self.args)


def bold(text: str, quoted: bool = False) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1m", quoted=quoted)

def italic(text: str, quoted: bool = False) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[3m", quoted=quoted)

def plain(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "")

def red(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1;31m")

def green(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1;32m")

def yellow(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1;33m")

def blue(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1;34m")

def cyan(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[1;36m")

def normal_red(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[31m")

def normal_green(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[32m")

def normal_yellow(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[33m")

def normal_blue(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[34m")

def normal_cyan(text: str) -> AnsiDecorator:
    return AnsiDecorator(text, "\033[36m")

# This really should be AnsiDecorator or anything that implements
# __str__(), but that requires protocols from typing_extensions
def process_markup(args: T.Sequence[TV_Loggable], keep: bool) -> T.List[str]:
    arr = []  # type: T.List[str]
    if log_timestamp_start is not None:
        arr = ['[{:.3f}]'.format(time.monotonic() - log_timestamp_start)]
    for arg in args:
        if arg is None:
            continue
        if isinstance(arg, str):
            arr.append(arg)
        elif isinstance(arg, AnsiDecorator):
            arr.append(arg.get_text(keep))
        else:
            arr.append(str(arg))
    return arr

def force_print(*args: str, nested: bool, sep: T.Optional[str] = None,
                end: T.Optional[str] = None) -> None:
    if log_disable_stdout:
        return
    iostr = io.StringIO()
    print(*args, sep=sep, end=end, file=iostr)

    raw = iostr.getvalue()
    if log_depth:
        prepend = log_depth[-1] + '| ' if nested else ''
        lines = []
        for l in raw.split('\n'):
            l = l.strip()
            lines.append(prepend + l if l else '')
        raw = '\n'.join(lines)

    # _Something_ is going to get printed.
    try:
        output = log_pager.stdin if log_pager else None
        print(raw, end='', file=output)
    except UnicodeEncodeError:
        cleaned = raw.encode('ascii', 'replace').decode('ascii')
        print(cleaned, end='')

def debug(*args: TV_Loggable, sep: T.Optional[str] = None,
          end: T.Optional[str] = None) -> None:
    arr = process_markup(args, False)
    if log_file is not None:
        print(*arr, file=log_file, sep=sep, end=end)
        log_file.flush()

def _debug_log_cmd(cmd: str, args: T.List[str]) -> None:
    if not _in_ci:
        return
    args = [f'"{x}"' for x in args]  # Quote all args, just in case
    debug('!meson_ci!/{} {}'.format(cmd, ' '.join(args)))

def cmd_ci_include(file: str) -> None:
    _debug_log_cmd('ci_include', [file])


def log(*args: TV_Loggable, is_error: bool = False,
        once: bool = False, nested: bool = True,
        sep: T.Optional[str] = None,
        end: T.Optional[str] = None) -> None:
    if once:
        log_once(*args, is_error=is_error, nested=nested, sep=sep, end=end)
    else:
        _log(*args, is_error=is_error, nested=nested, sep=sep, end=end)


def _log(*args: TV_Loggable, is_error: bool = False,
         nested: bool = True, sep: T.Optional[str] = None,
         end: T.Optional[str] = None) -> None:
    arr = process_markup(args, False)
    if log_file is not None:
        print(*arr, file=log_file, sep=sep, end=end)
        log_file.flush()
    if colorize_console():
        arr = process_markup(args, True)
    if not log_errors_only or is_error:
        force_print(*arr, nested=nested, sep=sep, end=end)

def log_once(*args: TV_Loggable, is_error: bool = False,
             nested: bool = True, sep: T.Optional[str] = None,
             end: T.Optional[str] = None) -> None:
    """Log variant that only prints a given message one time per meson invocation.

    This considers ansi decorated values by the values they wrap without
    regard for the AnsiDecorator itself.
    """
    def to_str(x: TV_Loggable) -> str:
        if isinstance(x, str):
            return x
        if isinstance(x, AnsiDecorator):
            return x.text
        return str(x)
    t = tuple(to_str(a) for a in args)
    if t in _logged_once:
        return
    _logged_once.add(t)
    _log(*args, is_error=is_error, nested=nested, sep=sep, end=end)

# This isn't strictly correct. What we really want here is something like:
# class StringProtocol(typing_extensions.Protocol):
#
#      def __str__(self) -> str: ...
#
# This would more accurately embody what this function can handle, but we
# don't have that yet, so instead we'll do some casting to work around it
def get_error_location_string(fname: str, lineno: int) -> str:
    return f'{fname}:{lineno}:'


class _Severity(enum.Enum):

    NOTICE = enum.auto()
    WARNING = enum.auto()
    ERROR = enum.auto()
    DEPRECATION = enum.auto()


def _log_error(severity: _Severity, *rargs: TV_Loggable,
               once: bool = False, fatal: bool = True,
               location: T.Optional[BaseNode] = None,
               nested: bool = True, sep: T.Optional[str] = None,
               end: T.Optional[str] = None,
               is_error: bool = True) -> None:
    from .mesonlib import MesonException, relpath

    # The typing requirements here are non-obvious. Lists are invariant,
    # therefore T.List[A] and T.List[T.Union[A, B]] are not able to be joined
    if severity is _Severity.NOTICE:
        label = [bold('NOTICE:')]  # type: TV_LoggableList
    elif severity is _Severity.WARNING:
        label = [yellow('WARNING:')]
    elif severity is _Severity.ERROR:
        label = [red('ERROR:')]
    elif severity is _Severity.DEPRECATION:
        label = [red('DEPRECATION:')]
    # rargs is a tuple, not a list
    args = label + list(rargs)

    if location is not None:
        location_file = relpath(location.filename, os.getcwd())
        location_str = get_error_location_string(location_file, location.lineno)
        # Unions are frankly awful, and we have to T.cast here to get mypy
        # to understand that the list concatenation is safe
        location_list = T.cast('TV_LoggableList', [location_str])
        args = location_list + args

    log(*args, once=once, nested=nested, sep=sep, end=end, is_error=is_error)

    global log_warnings_counter  # pylint: disable=global-statement
    log_warnings_counter += 1

    if log_fatal_warnings and fatal:
        raise MesonException("Fatal warnings enabled, aborting")

def error(*args: TV_Loggable,
          once: bool = False, fatal: bool = True,
          location: T.Optional[BaseNode] = None,
          nested: bool = True, sep: T.Optional[str] = None,
          end: T.Optional[str] = None) -> None:
    return _log_error(_Severity.ERROR, *args, once=once, fatal=fatal, location=location,
                      nested=nested, sep=sep, end=end, is_error=True)

def warning(*args: TV_Loggable,
            once: bool = False, fatal: bool = True,
            location: T.Optional[BaseNode] = None,
            nested: bool = True, sep: T.Optional[str] = None,
            end: T.Optional[str] = None) -> None:
    return _log_error(_Severity.WARNING, *args, once=once, fatal=fatal, location=location,
                      nested=nested, sep=sep, end=end, is_error=True)

def deprecation(*args: TV_Loggable,
                once: bool = False, fatal: bool = True,
                location: T.Optional[BaseNode] = None,
                nested: bool = True, sep: T.Optional[str] = None,
                end: T.Optional[str] = None) -> None:
    return _log_error(_Severity.DEPRECATION, *args, once=once, fatal=fatal, location=location,
                      nested=nested, sep=sep, end=end, is_error=True)

def notice(*args: TV_Loggable,
           once: bool = False, fatal: bool = True,
           location: T.Optional[BaseNode] = None,
           nested: bool = True, sep: T.Optional[str] = None,
           end: T.Optional[str] = None) -> None:
    return _log_error(_Severity.NOTICE, *args, once=once, fatal=fatal, location=location,
                      nested=nested, sep=sep, end=end, is_error=False)

def get_relative_path(target: Path, current: Path) -> Path:
    """Get the path to target from current"""
    # Go up "current" until we find a common ancestor to target
    acc = ['.']
    for part in [current, *current.parents]:
        try:
            path = target.relative_to(part)
            return Path(*acc, path)
        except ValueError:
            pass
        acc += ['..']

    # we failed, should not get here
    return target

def exception(e: Exception, prefix: T.Optional[AnsiDecorator] = None) -> None:
    if prefix is None:
        prefix = red('ERROR:')
    log()
    args = []  # type: T.List[T.Union[AnsiDecorator, str]]
    if all(getattr(e, a, None) is not None for a in ['file', 'lineno', 'colno']):
        # Mypy doesn't follow hasattr, and it's pretty easy to visually inspect
        # that this is correct, so we'll just ignore it.
        path = get_relative_path(Path(e.file), Path(os.getcwd()))  # type: ignore
        args.append(f'{path}:{e.lineno}:{e.colno}:')  # type: ignore
    if prefix:
        args.append(prefix)
    args.append(str(e))

    restore = log_disable_stdout
    if restore:
        enable()
    log(*args, is_error=True)
    if restore:
        disable()

# Format a list for logging purposes as a string. It separates
# all but the last item with commas, and the last with 'and'.
def format_list(input_list: T.List[str]) -> str:
    l = len(input_list)
    if l > 2:
        return ' and '.join([', '.join(input_list[:-1]), input_list[-1]])
    elif l == 2:
        return ' and '.join(input_list)
    elif l == 1:
        return input_list[0]
    else:
        return ''

@contextmanager
def nested(name: str = '') -> T.Generator[None, None, None]:
    log_depth.append(name)
    try:
        yield
    finally:
        log_depth.pop()

def start_pager() -> None:
    if not colorize_console():
        return
    pager_cmd = []
    if 'PAGER' in os.environ:
        pager_cmd = shlex.split(os.environ['PAGER'])
    else:
        less = shutil.which('less')
        if not less and is_windows():
            git = shutil.which('git')
            if git:
                path = Path(git).parents[1] / 'usr' / 'bin'
                less = shutil.which('less', path=str(path))
        if less:
            pager_cmd = [less]
    if not pager_cmd:
        return
    global log_pager # pylint: disable=global-statement
    assert log_pager is None
    try:
        # Set 'LESS' environment variable, rather than arguments in
        # pager_cmd, to also support the case where the user has 'PAGER'
        # set to 'less'. Arguments set are:
        # "R" : support color
        # "X" : do not clear the screen when leaving the pager
        # "F" : skip the pager if content fits into the screen
        env = os.environ.copy()
        if 'LESS' not in env:
            env['LESS'] = 'RXF'
        # Set "-c" for lv to support color
        if 'LV' not in env:
            env['LV'] = '-c'
        log_pager = subprocess.Popen(pager_cmd, stdin=subprocess.PIPE,
                                     text=True, encoding='utf-8', env=env)
    except Exception as e:
        # Ignore errors, unless it is a user defined pager.
        if 'PAGER' in os.environ:
            from .mesonlib import MesonException
            raise MesonException(f'Failed to start pager: {str(e)}')

def stop_pager() -> None:
    global log_pager # pylint: disable=global-statement
    if log_pager:
        try:
            log_pager.stdin.flush()
            log_pager.stdin.close()
        except BrokenPipeError:
            pass
        log_pager.wait()
        log_pager = None


def code_line(text: str, line: str, colno: int) -> str:
    """Print a line with a caret pointing to the colno

    :param text: A message to display before the line
    :param line: The line of code to be pointed to
    :param colno: The column number to point at
    :return: A formatted string of the text, line, and a caret
    """
    return f'{text}\n{line}\n{" " * colno}^'
