File: //proc/thread-self/root/usr/local/CyberCP/lib64/python3.10/site-packages/tornado/test/log_test.py
#
# Copyright 2012 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import glob
import logging
import os
import re
import subprocess
import sys
import tempfile
import unittest
import warnings
from tornado.escape import utf8
from tornado.log import LogFormatter, define_logging_options, enable_pretty_logging
from tornado.options import OptionParser
from tornado.util import basestring_type
@contextlib.contextmanager
def ignore_bytes_warning():
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=BytesWarning)
        yield
class LogFormatterTest(unittest.TestCase):
    # Matches the output of a single logging call (which may be multiple lines
    # if a traceback was included, so we use the DOTALL option)
    LINE_RE = re.compile(
        b"(?s)\x01\\[E [0-9]{6} [0-9]{2}:[0-9]{2}:[0-9]{2} log_test:[0-9]+\\]\x02 (.*)"
    )
    def setUp(self):
        self.formatter = LogFormatter(color=False)
        # Fake color support.  We can't guarantee anything about the $TERM
        # variable when the tests are run, so just patch in some values
        # for testing.  (testing with color off fails to expose some potential
        # encoding issues from the control characters)
        self.formatter._colors = {logging.ERROR: "\u0001"}
        self.formatter._normal = "\u0002"
        # construct a Logger directly to bypass getLogger's caching
        self.logger = logging.Logger("LogFormatterTest")
        self.logger.propagate = False
        self.tempdir = tempfile.mkdtemp()
        self.filename = os.path.join(self.tempdir, "log.out")
        self.handler = self.make_handler(self.filename)
        self.handler.setFormatter(self.formatter)
        self.logger.addHandler(self.handler)
    def tearDown(self):
        self.handler.close()
        os.unlink(self.filename)
        os.rmdir(self.tempdir)
    def make_handler(self, filename):
        return logging.FileHandler(filename, encoding="utf-8")
    def get_output(self):
        with open(self.filename, "rb") as f:
            line = f.read().strip()
            m = LogFormatterTest.LINE_RE.match(line)
            if m:
                return m.group(1)
            else:
                raise Exception("output didn't match regex: %r" % line)
    def test_basic_logging(self):
        self.logger.error("foo")
        self.assertEqual(self.get_output(), b"foo")
    def test_bytes_logging(self):
        with ignore_bytes_warning():
            # This will be "\xe9" on python 2 or "b'\xe9'" on python 3
            self.logger.error(b"\xe9")
            self.assertEqual(self.get_output(), utf8(repr(b"\xe9")))
    def test_utf8_logging(self):
        with ignore_bytes_warning():
            self.logger.error("\u00e9".encode("utf8"))
        if issubclass(bytes, basestring_type):
            # on python 2, utf8 byte strings (and by extension ascii byte
            # strings) are passed through as-is.
            self.assertEqual(self.get_output(), utf8("\u00e9"))
        else:
            # on python 3, byte strings always get repr'd even if
            # they're ascii-only, so this degenerates into another
            # copy of test_bytes_logging.
            self.assertEqual(self.get_output(), utf8(repr(utf8("\u00e9"))))
    def test_bytes_exception_logging(self):
        try:
            raise Exception(b"\xe9")
        except Exception:
            self.logger.exception("caught exception")
        # This will be "Exception: \xe9" on python 2 or
        # "Exception: b'\xe9'" on python 3.
        output = self.get_output()
        self.assertRegex(output, rb"Exception.*\\xe9")
        # The traceback contains newlines, which should not have been escaped.
        self.assertNotIn(rb"\n", output)
    def test_unicode_logging(self):
        self.logger.error("\u00e9")
        self.assertEqual(self.get_output(), utf8("\u00e9"))
class EnablePrettyLoggingTest(unittest.TestCase):
    def setUp(self):
        super().setUp()
        self.options = OptionParser()
        define_logging_options(self.options)
        self.logger = logging.Logger("tornado.test.log_test.EnablePrettyLoggingTest")
        self.logger.propagate = False
    def test_log_file(self):
        tmpdir = tempfile.mkdtemp()
        try:
            self.options.log_file_prefix = tmpdir + "/test_log"
            enable_pretty_logging(options=self.options, logger=self.logger)
            self.assertEqual(1, len(self.logger.handlers))
            self.logger.error("hello")
            self.logger.handlers[0].flush()
            filenames = glob.glob(tmpdir + "/test_log*")
            self.assertEqual(1, len(filenames))
            with open(filenames[0], encoding="utf-8") as f:
                self.assertRegex(f.read(), r"^\[E [^]]*\] hello$")
        finally:
            for handler in self.logger.handlers:
                handler.flush()
                handler.close()
            for filename in glob.glob(tmpdir + "/test_log*"):
                os.unlink(filename)
            os.rmdir(tmpdir)
    def test_log_file_with_timed_rotating(self):
        tmpdir = tempfile.mkdtemp()
        try:
            self.options.log_file_prefix = tmpdir + "/test_log"
            self.options.log_rotate_mode = "time"
            enable_pretty_logging(options=self.options, logger=self.logger)
            self.logger.error("hello")
            self.logger.handlers[0].flush()
            filenames = glob.glob(tmpdir + "/test_log*")
            self.assertEqual(1, len(filenames))
            with open(filenames[0], encoding="utf-8") as f:
                self.assertRegex(f.read(), r"^\[E [^]]*\] hello$")
        finally:
            for handler in self.logger.handlers:
                handler.flush()
                handler.close()
            for filename in glob.glob(tmpdir + "/test_log*"):
                os.unlink(filename)
            os.rmdir(tmpdir)
    def test_wrong_rotate_mode_value(self):
        try:
            self.options.log_file_prefix = "some_path"
            self.options.log_rotate_mode = "wrong_mode"
            self.assertRaises(
                ValueError,
                enable_pretty_logging,
                options=self.options,
                logger=self.logger,
            )
        finally:
            for handler in self.logger.handlers:
                handler.flush()
                handler.close()
class LoggingOptionTest(unittest.TestCase):
    """Test the ability to enable and disable Tornado's logging hooks."""
    def logs_present(self, statement, args=None):
        # Each test may manipulate and/or parse the options and then logs
        # a line at the 'info' level.  This level is ignored in the
        # logging module by default, but Tornado turns it on by default
        # so it is the easiest way to tell whether tornado's logging hooks
        # ran.
        IMPORT = "from tornado.options import options, parse_command_line"
        LOG_INFO = 'import logging; logging.info("hello")'
        program = ";".join([IMPORT, statement, LOG_INFO])
        proc = subprocess.Popen(
            [sys.executable, "-c", program] + (args or []),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )
        stdout, stderr = proc.communicate()
        self.assertEqual(proc.returncode, 0, "process failed: %r" % stdout)
        return b"hello" in stdout
    def test_default(self):
        self.assertFalse(self.logs_present("pass"))
    def test_tornado_default(self):
        self.assertTrue(self.logs_present("parse_command_line()"))
    def test_disable_command_line(self):
        self.assertFalse(self.logs_present("parse_command_line()", ["--logging=none"]))
    def test_disable_command_line_case_insensitive(self):
        self.assertFalse(self.logs_present("parse_command_line()", ["--logging=None"]))
    def test_disable_code_string(self):
        self.assertFalse(
            self.logs_present('options.logging = "none"; parse_command_line()')
        )
    def test_disable_code_none(self):
        self.assertFalse(
            self.logs_present("options.logging = None; parse_command_line()")
        )
    def test_disable_override(self):
        # command line trumps code defaults
        self.assertTrue(
            self.logs_present(
                "options.logging = None; parse_command_line()", ["--logging=info"]
            )
        )