mongona

mongona
-- --
正在获取天气

python 实现一个自定义上下文管理器

python 实现一个自定义上下文管理器

1、什么是上下文管理器?

      上下文管理器(context manager)是Python2.5开始支持的一种语法,用于规定某个对象的使用范围。一旦进入或者离开该使用范围,会有特殊操作被调用 (比如为对象分配或者释放内存)。它的语法形式是,with...as...    使用关键字 withas; 上下文管理器是指在一段代码执行之前执行一段代码作预处理工作;执行之后再执行一段代码,用于一些清理工作。比如打开文件进行读写,读写完之后需要将文件关闭。又比如在数据库操作中,操作之前需要连接数据库,操作之后需要关闭数据库。在上下文管理协议中,有两个方法__enter____exit__,分别实现上述两个功能;

 links: 

 https://www.geeksforgeeks.org/context-manager-in-python/   

 http://book.pythontips.com/en/latest/context_managers.html

https://www.python.org/dev/peps/pep-0343/  官方文档

2、实现一个自定义上下文

     任何定义了__enter__()和__exit__()方法的对象都可以用于上下文管理器。

话不多说,上代码 :  demo1

# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date:  2019/9/12 18:25
"""
import os
import subprocess
import datetime


class CustomFile(object):
    """自定义一个上下文 文件类"""
    def __init__(self, file_name, file_model, *args, **kwargs):
        self.file_name = file_name
        self.file_model = file_model

    def __enter__(self, *args, **kwargs):
        self.my_print("before open file, do something!")
        self.f = open(self.file_name, self.file_model)
        return self.f

    def __exit__(self, *args, **kwargs):
        """
        __exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
        __exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
        """
        self.my_print(args)
        self.my_print("before close , do something!")
        self.f.close()

    @classmethod
    def my_print(cls, *args):
        print('当前时间:{}'.format(datetime.datetime.now()), *args)


temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))

CustomFile.my_print('start')
with CustomFile(temp_file, 'r') as file:
    data = file.read()
    CustomFile.my_print(data)
CustomFile.my_print('end')


输出:

/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:08:04.933938 start
当前时间:2019-09-12 19:08:04.933993 before open file, do something!
当前时间:2019-09-12 19:08:04.934072 hello world

当前时间:2019-09-12 19:08:04.934091 (None, None, None)
当前时间:2019-09-12 19:08:04.934103 before close , do something!
当前时间:2019-09-12 19:08:04.934123 end

Process finished with exit code 0

 

 

demo2:

from contextlib import contextmanager

temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))


def demo2():
    from contextlib import contextmanager
    @contextmanager
    # 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
    # 同时yield返回值赋值给as后的变量。
    def custom_file(_temp_file, model):
        # __enter__   start
        f = open(_temp_file, model)
        try:
            yield f
        # __enter__  end
        # __exit__  start
        finally:
            f.close()
    return custom_file


with demo2()(temp_file,  'r') as file:   # 我也不知道为什么要这么写 
    data = file.read()
    CustomFile.my_print(data)

装饰器contextmanager 源码:

def contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

 

_GeneratorContextManager 源码:

class _GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds): # 被装饰的函数 func
        self.gen = func(*args, **kwds)
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn't provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):  #  也是实现了__enter__方法的
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):  # 也是实现了__exit__方法的
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                # Suppress StopIteration *unless* it's the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don't re-raise the passed in exception. (issue27122)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it's *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")

如果我重写装饰器和content类代码如下:

# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date:  2019/9/12 18:25
"""
import os
import subprocess
import datetime
import sys
from contextlib import ContextDecorator, AbstractContextManager
from functools import wraps

temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))


class CustomFile(object):
    """自定义一个上下文 文件类"""
    def __init__(self, file_name, file_model, *args, **kwargs):
        self.file_name = file_name
        self.file_model = file_model

    def __enter__(self, *args, **kwargs):
        self.my_print("before open file, do something!")
        self.f = open(self.file_name, self.file_model)
        return self.f

    def __exit__(self, *args, **kwargs):
        """
        __exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
        __exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
        """
        self.my_print(args)
        self.my_print("before close , do something!")
        self.f.close()

    @classmethod
    def my_print(cls, *args):
        print('当前时间:{}'.format(datetime.datetime.now()), *args)


def demo1(_temp_file):
    CustomFile.my_print('start')
    with CustomFile(_temp_file, 'r') as file:
        data = file.read()
        CustomFile.my_print(data)
    CustomFile.my_print('end')


def demo2():
    from contextlib import contextmanager
    @contextmanager
    # 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
    # 同时yield返回值赋值给as后的变量。
    def custom_file(_temp_file, model):
        # __enter__   start
        f = open(_temp_file, model)
        try:
            yield f
        # __enter__  end
        # __exit__  start
        finally:
            f.close()
    return custom_file


class GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds):
        self.gen = func(*args, **kwds)
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn't provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        CustomFile.my_print('before open do something')
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):
        CustomFile.my_print('before close do something')
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                # Suppress StopIteration *unless* it's the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don't re-raise the passed in exception. (issue27122)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it's *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")


def my_contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return GeneratorContextManager(func, args, kwds)
    return helper


@my_contextmanager
def custom_file(_temp_file, model):
    # __enter__   start
    f = open(_temp_file, model)
    try:
        yield f
    # __enter__  end
    # __exit__  start
    finally:
        f.close()
    return custom_file


CustomFile.my_print('start')
with custom_file(temp_file, 'r') as f:
    data = f.read()
    CustomFile.my_print(data)
CustomFile.my_print('end')

输出:

/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:37:17.099099 start
当前时间:2019-09-12 19:37:17.099160 before open do something
当前时间:2019-09-12 19:37:17.099236 hello world

当前时间:2019-09-12 19:37:17.099254 before close do something
当前时间:2019-09-12 19:37:17.099279 end

Process finished with exit code 0

可以看到执行顺序同样是: 先enter 后exit

如果继续看 AbstractContextManager 会发现 :

class AbstractContextManager(abc.ABC):

    """An abstract base class for context managers."""

    def __enter__(self):
        """Return `self` upon entering the runtime context."""
        return self

    @abc.abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        return None

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AbstractContextManager:
            return _collections_abc._check_methods(C, "__enter__", "__exit__")
        return NotImplemented
AbstractContextManager 同样有__enter__ __exit__ 方法

这就说明:

This PEP adds a new statement "with" to the Python language to make it possible to factor out standard uses of try/finally statements.

In this PEP, context managers provide __enter__() and __exit__() methods that are invoked on entry to and exit from the body of the with statement.

https://www.python.org/dev/peps/pep-0343/#specification-the-with-statement

2
0
富强,民主,文明,和谐,自由,平等,公正,法治,爱国,敬业,诚信,友善。
打赏二维码