From 582d1c6f0be332ad4cb9f421bea5c2be56a12408 Mon Sep 17 00:00:00 2001 From: Stefan Israelsson Tampe Date: Fri, 24 Aug 2018 22:23:23 +0200 Subject: socket.py --- modules/language/python/module/selectors.py | 400 +++++++++++++++++++--------- 1 file changed, 275 insertions(+), 125 deletions(-) (limited to 'modules/language/python/module/selectors.py') diff --git a/modules/language/python/module/selectors.py b/modules/language/python/module/selectors.py index 41d70ba..6afb52e 100644 --- a/modules/language/python/module/selectors.py +++ b/modules/language/python/module/selectors.py @@ -1,5 +1,4 @@ module(selectors) - """Selectors module. This module allows high-level and efficient I/O multiplexing, built upon the @@ -10,13 +9,14 @@ This module allows high-level and efficient I/O multiplexing, built upon the from abc import ABCMeta, abstractmethod from collections import namedtuple, Mapping import math -import select as selectraw +import select import sys # generic events, that must be mapped to implementation-specific ones EVENT_READ = (1 << 0) EVENT_WRITE = (1 << 1) + def _fileobj_to_fd(fileobj): """Return a file descriptor from a file object. @@ -44,6 +44,11 @@ def _fileobj_to_fd(fileobj): SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) +SelectorKey.__doc__ = """SelectorKey(fileobj, fd, events, data) + + Object used to associate a file object to its backing + file descriptor, selected event mask, and attached data. +""" class _SelectorMapping(Mapping): """Mapping of file objects to selector keys.""" @@ -64,6 +69,7 @@ class _SelectorMapping(Mapping): def __iter__(self): return iter(self._selector._fd_to_key) + class BaseSelector(metaclass=ABCMeta): """Selector abstract base class. @@ -189,6 +195,7 @@ class BaseSelector(metaclass=ABCMeta): def __exit__(self, *args): self.close() + class _BaseSelectorImpl(BaseSelector): """Base selector implementation.""" @@ -298,10 +305,10 @@ class SelectSelector(_BaseSelectorImpl): if sys.platform == 'win32': def _select(self, r, w, _, timeout=None): - r, w, x = selectraw.select(r, w, w, timeout) + r, w, x = select.select(r, w, w, timeout) return r, w + x, [] else: - _select = selectraw.select + _select = select.select def select(self, timeout=None): timeout = None if timeout is None else max(timeout, 0) @@ -324,132 +331,275 @@ class SelectSelector(_BaseSelectorImpl): ready.append((key, events & key.events)) return ready -class PollSelector(_BaseSelectorImpl): - """Poll-based selector.""" - - def __init__(self): - super().__init__() - self._poll = selectraw.poll() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= selectraw.POLLIN - if events & EVENT_WRITE: - poll_events |= selectraw.POLLOUT - self._poll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # poll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) - ready = [] - try: - fd_event_list = self._poll.poll(timeout) - except InterruptedError: +if hasattr(select, 'poll'): + + class PollSelector(_BaseSelectorImpl): + """Poll-based selector.""" + + def __init__(self): + super().__init__() + self._poll = select.poll() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._poll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._poll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # poll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._poll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) return ready - for fd, event in fd_event_list: - events = 0 - if event & ~selectraw.POLLIN: - events |= EVENT_WRITE - if event & ~selectraw.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -class EpollSelector(_BaseSelectorImpl): - """Epoll-based selector.""" - def __init__(self): - super().__init__() - self._epoll = selectraw.epoll() - - def fileno(self): - return self._epoll.fileno() - def register(self, fileobj, events, data=None): - key = super().register(fileobj, events, data) - epoll_events = 0 - if events & EVENT_READ: - epoll_events |= selectraw.EPOLLIN - if events & EVENT_WRITE: - epoll_events |= selectraw.EPOLLOUT - try: - self._epoll.register(key.fd, epoll_events) - except BaseException: - super().unregister(fileobj) - raise - return key - - def unregister(self, fileobj): - key = super().unregister(fileobj) - try: - self._epoll.unregister(key.fd) - except OSError: - # This can happen if the FD was closed since it - # was registered. - pass - return key - - def select(self, timeout=None): - if timeout is None: - timeout = -1 - elif timeout <= 0: - timeout = 0 - else: - # epoll_wait() has a resolution of 1 millisecond, round away - # from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) * 1e-3 - - # epoll_wait() expects `maxevents` to be greater than zero; - # we want to make sure that `select()` can be called when no - # FD is registered. - max_ev = max(len(self._fd_to_key), 1) - - ready = [] - try: - fd_event_list = self._epoll.poll(timeout, max_ev) - except InterruptedError: +if hasattr(select, 'epoll'): + + class EpollSelector(_BaseSelectorImpl): + """Epoll-based selector.""" + + def __init__(self): + super().__init__() + self._epoll = select.epoll() + + def fileno(self): + return self._epoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + epoll_events = 0 + if events & EVENT_READ: + epoll_events |= select.EPOLLIN + if events & EVENT_WRITE: + epoll_events |= select.EPOLLOUT + try: + self._epoll.register(key.fd, epoll_events) + except BaseException: + super().unregister(fileobj) + raise + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + try: + self._epoll.unregister(key.fd) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + return key + + def select(self, timeout=None): + if timeout is None: + timeout = -1 + elif timeout <= 0: + timeout = 0 + else: + # epoll_wait() has a resolution of 1 millisecond, round away + # from zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) * 1e-3 + + # epoll_wait() expects `maxevents` to be greater than zero; + # we want to make sure that `select()` can be called when no + # FD is registered. + max_ev = max(len(self._fd_to_key), 1) + + ready = [] + try: + fd_event_list = self._epoll.poll(timeout, max_ev) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.EPOLLIN: + events |= EVENT_WRITE + if event & ~select.EPOLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) return ready - for fd, event in fd_event_list: - events = 0 - if event & ~selectraw.EPOLLIN: - events |= EVENT_WRITE - if event & ~selectraw.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - def close(self): - self._epoll.close() - super().close() - -DefaultSelector = EpollSelector + def close(self): + self._epoll.close() + super().close() + + +if hasattr(select, 'devpoll'): + + class DevpollSelector(_BaseSelectorImpl): + """Solaris /dev/poll selector.""" + + def __init__(self): + super().__init__() + self._devpoll = select.devpoll() + + def fileno(self): + return self._devpoll.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + poll_events = 0 + if events & EVENT_READ: + poll_events |= select.POLLIN + if events & EVENT_WRITE: + poll_events |= select.POLLOUT + self._devpoll.register(key.fd, poll_events) + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + self._devpoll.unregister(key.fd) + return key + + def select(self, timeout=None): + if timeout is None: + timeout = None + elif timeout <= 0: + timeout = 0 + else: + # devpoll() has a resolution of 1 millisecond, round away from + # zero to wait *at least* timeout seconds. + timeout = math.ceil(timeout * 1e3) + ready = [] + try: + fd_event_list = self._devpoll.poll(timeout) + except InterruptedError: + return ready + for fd, event in fd_event_list: + events = 0 + if event & ~select.POLLIN: + events |= EVENT_WRITE + if event & ~select.POLLOUT: + events |= EVENT_READ + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready -__all__ = [ 'BaseSelector' , - 'DefaultSelector' , - 'EpollSelector' , - 'PollSelector' , - 'SelectSelector' , - 'EventRead' , - 'EventWrite' ] + def close(self): + self._devpoll.close() + super().close() + + +if hasattr(select, 'kqueue'): + + class KqueueSelector(_BaseSelectorImpl): + """Kqueue-based selector.""" + + def __init__(self): + super().__init__() + self._kqueue = select.kqueue() + + def fileno(self): + return self._kqueue.fileno() + + def register(self, fileobj, events, data=None): + key = super().register(fileobj, events, data) + try: + if events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + if events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_ADD) + self._kqueue.control([kev], 0, 0) + except BaseException: + super().unregister(fileobj) + raise + return key + + def unregister(self, fileobj): + key = super().unregister(fileobj) + if key.events & EVENT_READ: + kev = select.kevent(key.fd, select.KQ_FILTER_READ, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # This can happen if the FD was closed since it + # was registered. + pass + if key.events & EVENT_WRITE: + kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, + select.KQ_EV_DELETE) + try: + self._kqueue.control([kev], 0, 0) + except OSError: + # See comment above. + pass + return key + + def select(self, timeout=None): + timeout = None if timeout is None else max(timeout, 0) + max_ev = len(self._fd_to_key) + ready = [] + try: + kev_list = self._kqueue.control(None, max_ev, timeout) + except InterruptedError: + return ready + for kev in kev_list: + fd = kev.ident + flag = kev.filter + events = 0 + if flag == select.KQ_FILTER_READ: + events |= EVENT_READ + if flag == select.KQ_FILTER_WRITE: + events |= EVENT_WRITE + + key = self._key_from_fd(fd) + if key: + ready.append((key, events & key.events)) + return ready + def close(self): + self._kqueue.close() + super().close() + + +# Choose the best implementation, roughly: +# epoll|kqueue|devpoll > poll > select. +# select() also can't accept a FD > FD_SETSIZE (usually around 1024) +#if 'KqueueSelector' in globals(): +# DefaultSelector = KqueueSelector +if 'EpollSelector' in globals(): + DefaultSelector = EpollSelector +elif 'DevpollSelector' in globals(): + DefaultSelector = DevpollSelector +elif 'PollSelector' in globals(): + DefaultSelector = PollSelector +else: + DefaultSelector = SelectSelector -- cgit v1.2.3