#!/usr/bin/env python3
"""Client for managing Sieve scripts remotely using the ManageSieve protocol"""
#
# Copyright 2023 and 2024 Odin Kroeger
#
# This file is part of SieveManager.
#
# SieveManager is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# SieveManager is distributed in the hope that it will be useful,
# but WITHOUT ALL WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SieveManager. If not, see <https://www.gnu.org/licenses/>.
#
#
# Modules
#
from __future__ import annotations
from abc import ABC, abstractmethod
from base64 import b64decode, b64encode
from collections import UserDict, defaultdict, deque
from collections.abc import Iterable
from contextlib import suppress
from dataclasses import dataclass
from errno import (ECONNABORTED, EEXIST, EINPROGRESS, EISCONN,
ENOENT, ENOTCONN, ETIMEDOUT)
from fcntl import LOCK_SH, LOCK_NB
from getopt import GetoptError, getopt
from inspect import Parameter
from os import O_CREAT, O_EXCL, O_WRONLY, O_TRUNC, SEEK_END, SEEK_SET
from os import path
from signal import SIG_DFL, SIG_IGN, SIGHUP, SIGINT, SIGTERM
from tempfile import SpooledTemporaryFile, TemporaryDirectory
from typing import (Any, BinaryIO, Callable, ClassVar, Final, IO, Iterator,
NoReturn, Optional, Union, Sequence, TextIO, TypeVar)
import code
import contextlib
import dataclasses
import datetime
import enum
import fcntl
import fnmatch
import getpass
import hashlib
import hmac
import inspect
import io
import ipaddress
import itertools
import locale
import logging
import math
import netrc
import os
import pwd
import random
import re
import readline
import rlcompleter
import secrets
import select
import shlex
import shutil
import signal
import socket
import subprocess # nosec B404
import ssl
import string
import stringprep
import sys
import threading
import types
import unicodedata
import urllib.error
import urllib.parse
import urllib.request
try:
from cryptography import x509
from cryptography.hazmat.primitives.hashes import SHA1
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import (AuthorityInformationAccess,
ExtensionNotFound, ocsp)
from cryptography.x509.oid import AuthorityInformationAccessOID
from cryptography.x509.ocsp import OCSPResponseStatus, OCSPCertStatus
HAVE_CRYPTOGRAPHY: Final = True # type: ignore
except ImportError:
HAVE_CRYPTOGRAPHY: Final = False # type: ignore
try:
import dns.rdatatype
import dns.resolver
from dns.exception import DNSException
HAVE_DNSPYTHON = True # type: ignore
except ImportError:
HAVE_DNSPYTHON = False # type: ignore
if sys.version_info < (3, 9):
sys.exit('SieveManager requires Python 3.9 or later')
#
# Metadata
#
__version__ = '0.7.4.7'
__author__ = 'Odin Kroeger'
__copyright__ = '2023 and 2024 Odin Kroeger'
__license__ = 'GPLv3+'
__all__ = [
# ABCs
'AbstractAuth',
'AbstractSASLAdapter',
# ManageSieve
'SieveManager',
'Atom',
'Line',
'Word',
'Capabilities',
'Response',
'URL',
# SASL
'BaseAuth',
'BasePwdAuth',
'BaseScramAuth',
'BaseScramPlusAuth',
'CramMD5Auth',
'ExternalAuth',
'LoginAuth',
'PlainAuth',
'ExternalAuth',
'ScramSHA1Auth',
'ScramSHA1PlusAuth',
'ScramSHA224Auth',
'ScramSHA224PlusAuth',
'ScramSHA256Auth',
'ScramSHA256PlusAuth',
'ScramSHA384Auth',
'ScramSHA384PlusAuth',
'ScramSHA512Auth',
'ScramSHA512PlusAuth',
'ScramSHA3_512Auth',
'ScramSHA3_512PlusAuth',
'SASLPrep',
# Errors
'Error',
'ProtocolError',
'SecurityError',
'CapabilityError',
'ConfigError',
'DataError',
'OperationError',
'UsageError',
'AppError',
'AppConfigError',
'AppConnectionError',
'AppOperationError',
'AppSecurityError',
'OCSPError',
'OCSPDataError',
'OCSPOperationError',
'SASLError',
'SASLCapabilityError',
'SASLProtocolError',
'SASLSecurityError',
'SieveError',
'SieveCapabilityError',
'SieveConnectionError',
'SieveOperationError',
'SieveProtocolError',
'TLSError',
'TLSCapabilityError',
'TLSSecurityError'
]
#
# Globals
#
ABOUT: Final[str] = f'SieveManager {__version__}\nCopyright {__copyright__}'
"""About message."""
DEBUG: bool = False
"""Print stack traces even for expected error types?"""
EDITOR: Final[list[str]] = shlex.split(os.getenv('EDITOR', 'ed'), posix=True)
""":envvar:`EDITOR` or :command:`ed` if :envvar:`EDITOR` is unset."""
ENCODING: Final[str] = locale.getpreferredencoding(do_setlocale=False)
"""Encoding."""
HOME: Final[str] = os.getenv('HOME', pwd.getpwuid(os.getuid()).pw_dir)
"""Home directory."""
PAGER: Final[list[str]] = shlex.split(os.getenv('PAGER', 'more'), posix=True)
""":envvar:`PAGER` or :command:`more` if :envvar:`PAGER` is unset."""
VISUAL: Final[list[str]] = shlex.split(os.getenv('VISUAL', 'vi'), posix=True)
""":envvar:`VISUAL` or :command:`vi` if :envvar:`VISUAL` is unset."""
XDG_CONFIG_HOME: Final[str] = os.getenv('XDG_CONFIG_HOME', f'{HOME}/.config')
"""X Desktop group base configuration directory."""
CONFIGFILES: Final[tuple[str, ...]] = (
'/etc/sieve/config',
'/etc/sieve.cf',
f'{XDG_CONFIG_HOME}/sieve/config',
f'{HOME}/.sieve/config',
f'{HOME}/.sieve.cf'
)
"""Default configuration files."""
#
# Types
#
[docs]
class Atom(str):
"""ManageSieve keyword (e.g., ``LISTSCRIPTS``, ``OK``)."""
# pylint: disable=eq-without-hash
def __eq__(self, other) -> bool:
return self.casefold() == other.casefold()
def __ne__(self, other) -> bool:
return self.casefold() != other.casefold()
AuthMech = type['AbstractAuth']
"""Alias for subclasses of :class:`AbstractAuth`."""
[docs]
class AuthState(enum.IntEnum):
"""State of the authentication process."""
PREAUTH = enum.auto()
""""AUTHENTICATE" has *not* been issued."""
SENT = enum.auto()
"""Data sent, ready to receive."""
RECEIVED = enum.auto()
"""Data received, ready to send."""
DONE = enum.auto()
"""Authentication concluded."""
Line = list['Word']
""":class:`List <list>` of :class:`Word`-s."""
[docs]
class LogLevel(enum.IntEnum):
"""Logging levels supported by :class:`SieveConfig`."""
AUTH = logging.DEBUG // 2
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
[docs]
def fromdelta(self, delta: int) -> 'LogLevel':
"""Get a :class:`LogLevel` from a `delta`.
For example:
>>> LogLevel.INFO.fromdelta(-1)
<LogLevel.WARNING: 30>
>>> LogLevel.INFO.fromdelta(0)
<LogLevel.INFO: 20>
>>> LogLevel.INFO.fromdelta(1)
<LogLevel.DEBUG: 10>
>>> # Out-of-bounds deltas do not raise an error
>>> LogLevel.INFO.fromdelta(-math.inf)
<LogLevel.ERROR: 40>
>>> LogLevel.INFO.fromdelta(math.inf)
<LogLevel.AUTH: 5>
"""
levels = list(self.__class__)
index = levels.index(self) - delta
return levels[min(max(index, 0), len(levels) - 1)]
[docs]
class SASLPrep(enum.IntEnum):
"""Controls which strings are prepared for authentication.
.. seealso::
:rfc:`3454`
Preparation of Internationalized Strings
:rfc:`4013`
Stringprep Profile for User Names and Passwords
:rfc:`4422` (sec. 4)
SASL protocol requirements
"""
NONE = 0
USERNAMES = 1
PASSWORDS = 2
ALL = 3
[docs]
class ShellCmd(enum.IntEnum):
"""Shell actions that may overwrite or remove files."""
NONE = 0
CP = 1
GET = 2
MV = 4
PUT = 8
RM = 16
ALL = 31
[docs]
class ShellPattern(str):
""":class:`BaseShell` pattern."""
def __add__(self, other: Union['ShellPattern', str]) -> 'ShellPattern':
return self.__class__(super().__add__(other))
def __radd__(self, other: Union['ShellPattern', str]) -> 'ShellPattern':
return self.__class__(other.__add__(self))
ShellWord = Union[ShellPattern, str]
"""Alias for :class:`ShellPattern` and `str`."""
Word = Union[Atom, None, int, str, Line]
"""Alias for :class:`Atom`, ``None``, ``int``, ``str``, and :class:`Line`."""
T = TypeVar('T')
"""Type variable."""
#
# Abstract base classes
#
AbstractAuthT = TypeVar('AbstractAuthT', bound='AbstractAuth')
"""Type variable for :class:`AbstractAuth`."""
[docs]
class AbstractAuth(ABC):
"""Abstract base class for SASL mechanisms.
The ManageSieve "AUTHENTICATE" command performs a `Simple Authentication
and Security Layer`_ (SASL) protocol exchange. SASL is a framework that
comprises different authentication mechanisms ("SASL mechanisms").
:meth:`SieveManager.authenticate` does *not* implement any such mechanism
itself, but delegates the SASL protocol exchange to classes that do.
Such classes must subclass this class *and* have a :attr:`name`
attribute that indicates the mechanism they implemented.
.. tip::
Do *not* subclass :class:`AbstractAuth` directly.
Subclass :class:`BaseAuth` instead.
.. seealso::
:class:`AbstractSASLAdapter`
Abstract base class for sending and receiving SASL messages.
:rfc:`3454`
Preparation of Internationalized Strings
:rfc:`4013`
Stringprep Profile for User Names and Passwords
:rfc:`4422`
Simple Authentication and Security Layer (SASL)
:rfc:`5804` (sec. 2.1)
ManageSieve "AUTHENTICATE" command
"""
[docs]
@abstractmethod
def __init__(self, conn: 'AbstractSASLAdapter',
authcid: str, authzid: str = '',
prepare: SASLPrep = SASLPrep.ALL):
"""Prepare authentication.
`authcid` and `authzid` are prepared according to :rfc:`3454` and
:rfc:`4013` if the specification of the SASL mechanism mandates or
recommends string preparation and ``prepare & SASLPrep.USERNAMES``
evaluates as true.
Arguments:
conn: Connection over which to authenticate.
authcid: Authentication ID (user to login as).
authzid: Authorisation ID (user whose rights to acquire).
prepare: Which credentials to prepare.
Raises:
ValueError: Bad characters in username.
"""
[docs]
@abstractmethod
def __call__(self) -> Optional[Any]:
"""Authenticate as :attr:`authcid`.
:attr:`authcid` is authorised as :attr:`authzid`
if :attr:`authzid` is set (proxy authentication).
Returns:
Data returned by the server, if any.
Raises:
ConnectionError: Server has closed the connection.
OperationError: Authentication failed.
SASLCapabilityError: Some feature is not supported.
SASLProtocolError: Server violated the SASL protocol.
SASLSecurityError: Server verification failed.
TLSCapabilityError: Channel-binding is not supported.
"""
[docs]
@classmethod
def getmechs(cls: type[AbstractAuthT], sort: bool = True,
obsolete: bool = False) -> list[type[AbstractAuthT]]:
"""Get authentication classes that subclass this class.
Arguments:
sort: Sort mechanisms by :attr:`order`?
obsolete: Return obsolete mechanisms?
"""
mechs: list[type[AbstractAuthT]] = []
for subcls in cls.__subclasses__():
# pylint: disable=bad-indentation
if (not subcls.__abstractmethods__
and (obsolete or not subcls.obsolete)):
mechs.append(subcls)
mechs.extend(subcls.getmechs(sort=False, obsolete=obsolete))
if sort:
mechs.sort(key=lambda s: s.order)
return mechs
name: ClassVar[str]
"""Mechanism name."""
authcid: str
"""Authentication ID (user to login as)."""
authzid: str = ''
"""Authorisation ID (user whose rights to acquire)."""
obsolete: bool = False
"""Is this mechanism obsolete?"""
order: int = 0
"""Mechanism precedence."""
[docs]
class AbstractSASLAdapter(ABC):
"""Abstract base class for sending and receiving SASL messages.
Messages that comprise an SASL protocol exchange ("SASL messages")
must be translated to the underlying protocol. This class defines the
types of messages that may occur in an SASL protocol exchange.
Classes that translate between SASL and the underlying protocol
must subclass this class.
.. seealso::
:class:`AbstractAuth`
Abstract base class for SASL mechanisms.
:rfc:`4422`
Simple Authentication and Security Layer (SASL)
"""
[docs]
@abstractmethod
def abort(self):
"""Abort authentication.
Raises:
ProtocolError: Protocol violation.
"""
[docs]
@abstractmethod
def begin(self, name: str, data: Optional[bytes] = None):
"""Begin authentication.
Arguments:
name: SASL mechanism name.
data: Optional client-first message.
Raises:
ConnectionError: Connection was closed.
ProtocolError: Protocol violation.
"""
[docs]
@abstractmethod
def end(self):
"""Conclude authentication.
Raises:
ConnectionError: Connection was closed.
OperationError: Authentication failed.
ProtocolError: Protocol violation.
"""
[docs]
@abstractmethod
def send(self, data: bytes):
"""Encode and send an SASL message.
Raises:
ConnectionError: Connection was closed.
ProtocolError: Protocol violation.
"""
[docs]
@abstractmethod
def receive(self) -> bytes:
"""Receive and decode an SASL message.
Raises:
ConnectionError: Connection was closed.
OperationError: Authentication failed.
ProtocolError: Protocol violation.
"""
@property
@abstractmethod
def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
"""Underlying socket."""
@sock.setter
@abstractmethod
def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
pass
#
# ACAP
#
[docs]
class BaseACAPConn(ABC):
"""Base class for ACAP parsers/serialisers.
The ManageSieve uses the syntax and data types of the Application Control
Access Protocol (ACAP). This class provides a :meth:`parser <receiveline>`
for converting ACAP lines into Python objects and a :meth:`serialiser
<sendline>` for converting Python objects into ACAP lines.
.. seealso::
:rfc:`2244` (secs. 2.2 and 2.6)
ACAP commands, responses, and data formats.
:rfc:`5804` (secs. 1.2 and 4)
ManageSieve syntax.
"""
# pylint: disable=missing-raises-doc
[docs]
def receiveline(self) -> Line:
"""Receive a line and parse it.
================== =================
ACAP type Python type
================== =================
Atom :class:`Atom`
Literal :class:`str`
Nil ``None``
Number :class:`int`
Parenthesised List :class:`list`
String :class:`str`
================== =================
For example:
>>> mgr.sendline(Atom('listscripts'))
>>> mgr.receiveline()
['foo.sieve', 'ACTIVE']
>>> mgr.receiveline()
['bar.sieve']
>>> mgr.receiveline()
['baz.sieve']
>>> mgr.receiveline()
['OK', 'Listscripts completed.']
Raises:
ValueError: Line is malformed.
"""
assert self.file
words: Line = []
stack: list[Line] = []
ptr: Line = words
while line := self.file.readline().decode('utf8'):
size: int = -1
for token in self._lexpattern.finditer(line):
key = token.lastgroup
value = token.group(key) # type: ignore[arg-type]
if key == 'leftparen':
parens: list[Word] = []
stack.append(ptr)
ptr.append(parens)
ptr = parens
elif key == 'rightparen':
try:
ptr = stack.pop()
except IndexError as err:
raise ValueError('unexpected parenthesis') from err
elif key == 'atom':
if value.casefold() == 'nil':
ptr.append(None)
else:
ptr.append(Atom(value))
elif key == 'number':
ptr.append(int(value))
elif key == 'string':
ptr.append(value)
elif key == 'literal':
size = int(value)
literal = self.file.read(size).decode('utf8')
ptr.append(literal)
elif key == 'garbage':
raise ValueError('unrecognised data')
else:
# NOTREACHED
raise AppSoftwareError('unknown data type')
if size == -1:
break
if stack:
raise ValueError('unbalanced parantheses')
return words
[docs]
def sendline(self, *objs: Union[IO[Any], 'Word'], whole: bool = True):
"""Convert `objs` to ACAP types and send them.
================== ======================================
Python type ACAP type
================== ======================================
:class:`Atom` Atom
:class:`typing.IO` Literal
``None`` Nil
:class:`bytes` Literal or String [#literal]_
:class:`list` Parenthesised List
:class:`int` Number [#nums]_
:class:`str` Literal or String [#literal]_ [#utf8]_
================== ======================================
For example:
>>> mgr.sendline(Atom('havespace'), 'script.sieve', 12345)
>>> mgr.receiveline()
['OK', 'Putscript would succeed.']
Pipeline commands:
>>> mgr.isalive(check=True)
>>> with open('foo.sieve') as script:
>>> mgr.sendline(Atom('putscript', script, 'foo.sieve'))
>>> mgr.sendline(Atom('logout'))
>>> for _ in range(2):
>>> mgr.collect(check=True)
Arguments:
objs: Objects to serialise.
whole: Conclude data with CRLF?
Raises:
ValueError: Number is not within the range [0, 4,294,967,295].
TypeError: Object cannot be represented as ACAP data type.
.. [#literal] Depending on content.
.. [#nums] Numbers must be within the range [0, 4,294,967,295]
.. [#utf8] Strings are encoded in UTF-8 and normalised to form C.
"""
assert self.file
write = self.file.write
normalize = unicodedata.normalize
isstr = self._isstr
def encode(s: str) -> bytes:
return normalize('NFC', s).encode('utf8')
def writestr(b: bytes):
write(b'"%s"' % b if isstr(b) else b'{%d+}\r\n%s' % (len(b), b))
for i, obj in enumerate(objs):
if i > 0:
write(b' ')
if obj is None:
write(b'NIL')
elif isinstance(obj, Atom):
write(encode(obj))
elif isinstance(obj, int):
if not 0 <= obj < 4_294_967_296:
raise ValueError(f'{obj}: not in [0, 4,294,967,295]')
write(encode(str(obj)))
elif isinstance(obj, bytes):
writestr(obj)
elif isinstance(obj, str):
writestr(encode(obj))
elif isinstance(obj, (IO, io.IOBase, SpooledTemporaryFile)):
write(b'{%d+}\r\n' % getfilesize(obj))
while block := obj.read(io.DEFAULT_BUFFER_SIZE):
write(encode(block) if isinstance(block, str) else block)
elif isinstance(obj, Iterable): # type: ignore
write(b'(')
self.sendline(*obj, whole=False)
write(b')')
else:
raise TypeError(type(obj).__name__ + ': not an ACAP type')
if whole:
write(b'\r\n')
self.file.flush()
@property
@abstractmethod
def file(self) -> Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]:
"""File-like access to the underlying socket."""
@file.setter
@abstractmethod
def file(self, file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]):
pass
_lexpattern: re.Pattern = re.compile('|'.join((
r'\b(?P<atom>[a-z][^(){}\s\\]*)\b',
r'\b(?P<number>\d+)\b',
r'"(?P<string>[^\0\r\n"]*)"',
r'{(?P<literal>\d+)\+?}',
r'(?P<leftparen>\()',
r'(?P<rightparen>\))',
r'(?P<garbage>\S+)'
)), flags=re.IGNORECASE)
"""Lexer pattern for :meth:`re.Pattern.finditer`."""
# Strings may be 1024 octets long, but I limit the length to 1020 octets
# to allow for errors (two octets for quotations marks, one for the
# terminating null byte, one for other off-by-one errors).
_isstr: Callable[..., Optional[re.Match]] = \
re.compile(br'[^\0\r\n"]{0,1020}').fullmatch
"""Check whether bytes can be represented as ACAP string."""
#
# ManageSieve
#
[docs]
class SieveConn(BaseACAPConn):
"""Low-level connection to a ManageSieve server.
For example:
>>> conn = SieveConn('imap.foo.example')
>>> conn.authenticate('user', 'password')
>>> with open('script.sieve', 'br') as file:
>>> conn.execute('putscript', file.name, file)
>>> conn.execute('logout')
.. warning::
:class:`SieveConn` is not thread-safe.
.. seealso::
:rfc:`2244`
Application Configuration Access Protocol
:rfc:`2782`
DNS SRV
:rfc:`5804`
ManageSieve
"""
[docs]
def __init__(self, *args, **kwargs):
"""Create a :class:`SieveConn` object.
If `args` or `kwargs` are given, they are passed to :meth:`open`.
Otherwise, no connection is established.
For example:
>>> with SieveConn('imap.host.example') as conn:
>>> conn.authenticate('user', 'password')
>>> ...
>>> with SieveConn() as conn:
>>> conn.open('imap.host.example')
>>> conn.authenticate('user', 'password')
>>> ...
Arguments:
args: Positional arguments for :meth:`open`.
kwargs: Keyword arguments for :meth:`open`.
Raises:
AppConnectionError: :attr:`sock` has died.
SieveCapabilityError: "STARTTLS" not supported.
SieveProtocolError: Server violated the ManageSieve protocol.
TLSSecurityError: Server certificate has been revoked.
"""
if args or kwargs:
self.open(*args, **kwargs)
def __del__(self):
"""Shut the connection down."""
with suppress(OSError):
self.shutdown()
[docs]
def authenticate(self, login: str, *auth, owner: str = '',
sasl: Union[AuthMech, Iterable[AuthMech]] = (),
logauth: bool = False, **kwauth):
"""Authenticate as `login`.
How the user is authenticated depends on the type of SASL_ mechanisms
given in `sasl` (e.g., password-based or the "EXTERNAL" mechanism).
If no mechanisms are given, authentication is attempted with every
supported non-obsolete password-based mechanism, starting with those
with better security properties and progressing to those with worse
security properties.
Unrecognised arguments are passed on to SASL mechanism constructors.
Password-based mechanisms require a password:
>>> mgr.authenticate('user', 'password')
By contrast, the "EXTERNAL" mechanism takes no arguments:
>>> mgr.authenticate('user', sasl=ExternalAuth)
If an `owner` is given, the scripts of that `owner` are managed,
instead of those owned by `login`. This requires elevated privileges.
Arguments:
login: User to login as (authentication ID).
owner: User whose scripts to manage (authorisation ID).
sasl: SASL mechanisms (default: :meth:`BasePwdAuth.getmechs`).
logauth: Log authentication exchange?
auth: Positional arguments for SASL mechanism constructors.
kwauth: Keyword arguments for SASL mechanism constructors.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SASLCapabilityError: Authentication mechanisms exhausted.
SASLProtocolError: Server violated the SASL protocol.
SASLSecurityError: Server could not be verified.
SieveConnectionError: Server has closed the connection.
SieveOperationError: Authentication failed.
SieveProtocolError: Server violated the ManageSieve protocol.
ValueError: Bad characters in credentials.
.. note::
If an `owner` is given, but the selected authentication mechanism
does not support proxy authentication, an error is logged to the
console and authentication is attempted with the next mechanism.
"""
kwauth['authzid'] = owner
logger = self.logger
self._auth = auth
self._kwauth = kwauth
self._logauth = logauth
self._sasl = (BasePwdAuth.getmechs() if not sasl else
sasl if isinstance(sasl, Iterable) else
(sasl,))
def authenticate():
# pylint: disable=consider-using-with
if not self.lock.acquire(blocking=False):
raise AppOperationError(os.strerror(EINPROGRESS))
if isinstance(self.file, LogIOWrapper) and not logauth:
self.file.quiet = True
try:
for mech in self._sasl:
assert self.capabilities
if mech.name.casefold() in self.capabilities.sasl:
conn = SieveSASLAdapter(self)
try:
run = mech(conn, login, *auth, **kwauth)
if caps := run():
self.capabilities = caps
except SASLCapabilityError as err:
logger.error(err)
continue
except SieveOperationError as err:
# TRANSITION-NEEDED need not be treated specially.
if err.matches('AUTH-TOO-WEAK',
'ENCRYPT-NEEDED',
'TRANSITION-NEEDED'):
logger.error(err)
continue
raise
if authcid := run.authcid:
self.login = authcid
logger.info('Authenticated as %s using %s',
authcid, mech.name.upper())
else:
# NOTREACHED
raise AppSoftwareError('forgot authentication ID')
if authzid := run.authzid:
self.owner = authzid
logger.info('Authorised as %s', authzid)
return
raise SASLCapabilityError('SASL mechanisms exhausted')
finally:
if isinstance(self.file, LogIOWrapper):
self.file.quiet = False
self.lock.release()
# NOTREACHED
self.isalive(check=True)
self._withfollow(authenticate)
[docs]
def close(self):
"""Close the client side of the connection.
.. warning::
Call only when the server has closed the connection.
"""
if self.file is not None:
self.file.close()
self.file = None
if self.poll is not None:
assert self.sock
self.poll.unregister(self.sock)
self.poll = None
if self.sock is not None:
self.sock.close()
self.sock = None
self._auth = ()
self._kwauth = {}
self.capabilities = None
self.host = None
self.port = None
self.login = ''
self.owner = ''
[docs]
def collect(self, check: bool = False) -> tuple['Response', list[Line]]:
"""Collect the server's response to the last command.
For example:
>>> conn.sendline(Atom('listscripts'))
>>> conn.collect()
(Response(response=Atom('OK'), code=(), message=None),
[['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']])
Arguments:
check: Raise an error if the response is not "OK"?
Raises:
AppConnectionError: :attr:`sock` has died.
SieveConnectionError: Server said "BYE". [#collect-check]_
SieveOperationError: Server said "NO". [#collect-check]_
SieveProtocolError: Server violated the ManageSieve protocol.
.. [#collect-check] Only raised if `check` is `True`.
"""
lines: list[Line] = []
while True:
try:
line = self.receiveline()
except ValueError as err:
raise SieveProtocolError(str(err)) from err
if line and isinstance(line[0], Atom):
res = Response.fromline(line)
if check and res.response != 'OK':
raise res.toerror()
self.warning = res.message if res.matches('WARNINGS') else None
return res, lines
lines.append(line)
# NOTREACHED
[docs]
def execute(self, command: str, *args: Union[IO, Word]) \
-> tuple['Response', list[Line]]:
"""Execute `command` and return the server's response.
For example:
>>> conn.execute('listscripts')
(Response(response=Atom('OK'), code=(), message=None),
[['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']])
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server said "BYE".
SieveOperationError: Server said "NO".
SieveProtocolError: Server violated the ManageSieve protocol.
.. note::
Referrals are followed automatically.
"""
def execute() -> tuple['Response', list[Line]]:
# pylint: disable=consider-using-with
if not self.lock.acquire(blocking=False):
raise AppOperationError(os.strerror(EINPROGRESS))
try:
self.isalive(check=True)
self.sendline(Atom(command.upper()), *args)
res, data = self.collect()
finally:
self.lock.release()
if res.response != 'OK':
raise res.toerror()
return res, data
assert command
return self._withfollow(execute)
[docs]
def geturl(self) -> Optional['URL']:
"""URL of the current connection.
For example:
>>> with SieveManager('imap.foo.example') as mgr:
>>> mgr.authenticate('user', 'password')
>>> mgr.geturl()
'sieve://user@imap.foo.example'
.. note::
Only changes to the connection state effected by :meth:`open`,
:meth:`close`, :meth:`shutdown`, :meth:`authenticate`,
:meth:`unauthenticate`, and referrals are tracked.
"""
if self.host:
return URL(hostname=self.host,
port=self.port if self.port != 4190 else None,
username=self.login,
owner=self.owner)
return None
[docs]
def isalive(self, check: bool = False) -> bool:
"""Check whether :attr:`sock` is alive.
Arguments:
check: Raise an error if :attr:`sock` has died.
Raises:
AppConnectionError: :attr:`sock` has died. [#isalive-check]_
.. [#isalive-check] Only raised if `check` is `True`.
"""
assert self.poll
(_, events), = self.poll.poll()
try:
if events & select.POLLERR:
raise AppConnectionError(ENOTCONN, 'socket error')
if events & select.POLLHUP:
raise AppConnectionError(ETIMEDOUT, os.strerror(ETIMEDOUT))
if events & select.POLLNVAL:
raise AppConnectionError(ENOTCONN, os.strerror(ENOTCONN))
except AppConnectionError:
if check:
raise
return True
# pylint: disable=missing-raises-doc, redefined-outer-name
[docs]
def open(self, host: str, port: int = 4190,
source: tuple[str, int] = ('', 0),
timeout: Optional[float] = socket.getdefaulttimeout(),
tls: bool = True, ocsp: bool = True):
"""Connect to `host` at `port`.
Arguments:
host: Server name or address.
port: Server port.
source: Source address and port.
timeout: Timeout in seconds.
tls: Secure the connection?
ocsp: Check whether the server certificate was revoked?
Raises:
ConnectionError: Connection failed.
SieveCapabilityError: "STARTTLS" not supported.
SieveProtocolError: Server violated the ManageSieve protocol.
TLSSecurityError: Server certificate has been revoked.
"""
# pylint: disable=consider-using-with
if not self.lock.acquire(blocking=False):
raise AppOperationError(os.strerror(EINPROGRESS))
try:
self._connect(host, port, source, timeout)
_, lines = self.collect(check=True)
self._source = source
self.capabilities = Capabilities.fromlines(lines)
self.logger.info('Connected to %s:%d', host, port)
finally:
self.lock.release()
if tls:
self._starttls(ocsp=ocsp)
[docs]
def shutdown(self):
"""Shut the connection down.
.. note::
Use only when :meth:`logging out <logout>` would be unsafe.
"""
if self.sock is not None:
self.sock.shutdown(socket.SHUT_RDWR)
self.logger.info('Shut connection down')
self.close()
def _connect(self, host: str, port: int = 4190,
source: tuple[str, int] = ('', 0),
timeout: Optional[float] = socket.getdefaulttimeout()):
"""Connect to `host` at `port`.
Arguments:
host: Server address.
port: Server port.
source: Source address and port.
timeout: Timeout in seconds.
Raises:
ConnectionError: Connection failed.
"""
def connect(host: str):
self.sock = socket.create_connection(
(host, port),
timeout=timeout, source_address=source
)
if self.sock or self.file:
raise AppConnectionError(EISCONN, os.strerror(EISCONN))
if isinetaddr(host):
connect(host)
else:
try:
# This is the algorithm specified by RFC 2782, NOT the one
# specified by RFC 5804, sec. 1.8, which is wrong.
records = list(resolvesrv(f'_sieve._tcp.{host}.'))
for rec in records:
try:
connect(rec.host)
except OSError:
if rec == records[-1]:
raise
except DNSError:
connect(host)
if not self.sock:
raise AppConnectionError(ECONNABORTED, os.strerror(ECONNABORTED))
file = self.sock.makefile('rwb') # type: ignore[attr-defined]
self.file = LogIOWrapper.wrap(file, self.logger)
self.poll = select.poll()
self.poll.register(self.sock)
self.host = host
self.port = port
def _follow(self, url: str):
"""Close the connection, :meth:`open <open>` `url`, and reauthenticate.
Raises:
AppConnectionError: :attr:`sock` has died.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
try:
ref = URL.fromstr(url)
except ValueError as err:
raise SieveProtocolError(err) from err
self.logger.info('Referred to %s', url)
(oargs, okwargs), (auargs, aukwargs) = self._getstate()
self.close()
self.open(ref.hostname, ref.port or 4190, *oargs[2:], **okwargs)
self.authenticate(*auargs, **aukwargs)
def _getstate(self) -> Iterator[tuple[list, dict[str, Any]]]:
"""Get arguments to re-establish the current connection.
For example:
>>> mgr.geturl()
sieve://user@imap.foo.example
>>> (oargs, okwargs), (auth, kwauth) = mgr._getstate()
>>> mgr.shutdown()
>>> mgr.geturl()
>>> mgr.open(*oargs, **okwargs)
>>> mgr.authenticate(*auth, **kwauth)
>>> mgr.geturl()
sieve://user@imap.foo.example
"""
for meth in (self.open, self.authenticate):
args = []
kwargs = {}
signature = inspect.signature(meth) # type: ignore[arg-type]
for name, param in list(signature.parameters.items()):
try:
value = getattr(self, name)
except AttributeError:
value = getattr(self, f'_{name}')
if param.kind in (Parameter.POSITIONAL_ONLY,
Parameter.POSITIONAL_OR_KEYWORD):
args.append(value)
elif param.kind == Parameter.VAR_POSITIONAL:
args.extend(value)
elif param.kind == Parameter.KEYWORD_ONLY:
kwargs[name] = value
elif param.kind == Parameter.VAR_KEYWORD:
kwargs.update(value)
yield (args, kwargs)
# pylint: disable=redefined-outer-name
def _starttls(self, ocsp: bool = True):
"""Start TLS encryption.
Arguments:
ocsp: Check whether the server certificate was revoked?
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveCapabilityError: "STARTTLS" not supported.
TLSSecurityError: Server certificate has been revoked.
.. note::
Called automatically by :meth:`open` unless `tls` is `False`.
"""
assert self.sock
assert self.capabilities
if not self.capabilities.starttls:
raise SieveCapabilityError('STARTTLS: not supported')
self.execute('STARTTLS')
# pylint: disable=consider-using-with
if not self.lock.acquire(blocking=False):
raise AppOperationError(os.strerror(EINPROGRESS))
try:
host = self.host
self.sock = self.sslcontext.wrap_socket(
self.sock, # type: ignore[arg-type]
server_hostname=host
)
file = self.sock.makefile('rwb')
self.file = LogIOWrapper.wrap(file, self.logger) # type: ignore
_, lines = self.collect(check=True)
self.capabilities = Capabilities.fromlines(lines)
self.ocsp = ocsp
proto = self.sock.version()
cipher = self.sock.cipher()
if proto and cipher:
self.logger.info('Started %s using %s', proto, cipher[0])
if ocsp:
if HAVE_CRYPTOGRAPHY:
der = self.sock.getpeercert(binary_form=True)
if not der:
raise TLSSoftwareError('no peer certificate')
cert = x509.load_der_x509_certificate(der)
if certrevoked(cert, logger=self.logger):
raise TLSSecurityError(f'{host}: certificate revoked')
else:
self.logger.error('Module "cryptography" not found')
finally:
self.lock.release()
def _withfollow(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Call `func` and follow referrals.
For example:
>>> mgr._withfollow(mgr.execute, 'listscripts')
Arguments:
func: Function to call.
args: Positional arguments for `func`.
kwargs: Keyword arguments for `func`.
Returns:
The return value of `func`.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server said "BYE".
SieveOperationError: Server said "NO".
SieveProtocolError: Server violated the ManageSieve protocol.
.. seealso::
:rfc:`5804` (sec. 1.3)
ManageSieve "REFERRAL" response codes
"""
while True:
try:
return func(*args, **kwargs)
except SieveConnectionError as err:
if err.matches('REFERRAL'):
try:
url = err.code[1]
except IndexError as exc:
raise SieveProtocolError('unexpected data') from exc
if isinstance(url, Atom) or not isinstance(url, str):
# pylint: disable=raise-missing-from
raise SieveProtocolError('expected string')
self._follow(url)
continue
raise
# NOTREACHED
def _withreopen(self, func: Callable[..., T], *args, **kwargs) -> T:
"""Call `func` and retry if the connection is closed.
For example:
>>> mgr._withreopen(mgr.execute, 'listscripts')
Arguments:
func: Function to call.
args: Positional arguments for `func`.
kwargs: Keyword arguments for `func`.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server said "BYE".
SieveOperationError: Server said "NO".
SieveProtocolError: Server violated the ManageSieve protocol.
Returns:
The return value of `func`.
"""
(oargs, okwargs), (auth, kwauth) = self._getstate()
try:
return func(*args, **kwargs)
except (ConnectionError, TimeoutError, socket.herror, socket.gaierror):
self.close()
self.open(*oargs, **okwargs)
self.authenticate(*auth, **kwauth)
return func(*args, **kwargs)
@property
def timeout(self) -> Optional[float]:
"""Connection timeout in seconds.
Set timeout to 500 ms:
>>> mgr.timeout = 0.5
.. note::
The timeout can only be set while a connection is open.
"""
if self.sock:
return self.sock.gettimeout()
return None
@timeout.setter
def timeout(self, secs: Optional[float]):
if self.sock:
self.sock.settimeout(secs)
@property
def tls(self) -> Optional[str]: # type: ignore[override]
"""TLS version."""
if isinstance(self.sock, ssl.SSLSocket):
return self.sock.version()
return None
capabilities: Optional[Capabilities] = None
"""Server capabilities."""
file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]] = None
"""File-like access to :attr:`sock`."""
host: Optional[str] = None
"""Remote address."""
lock: threading.Lock = threading.Lock()
"""Operation lock."""
logger: logging.Logger = logging.getLogger(__name__)
"""Logger to use.
Messages are logged with the following priorities:
====================== =====================================
Priority Used for
====================== =====================================
:const:`logging.ERROR` Non-fatal errors
:const:`logging.INFO` State changes
:const:`logging.DEBUG` Data sent to/received from the server
====================== =====================================
Suppress logging:
>>> from logging import getLogger
>>> getLogger('sievemgr').setLevel(logging.CRITICAL)
Use a custom logger:
>>> from logging import getLogger
>>> mgr.logger = getLogger('foo').addHandler(logging.NullHandler())
Print data send to/received from the server to standard error:
>>> from logging import getLogger
>>> getLogger('sievemgr').setLevel(logging.DEBUG)
>>> mgr.listscripts()
C: LISTSCRIPTS
S: "foo.sieve" ACTIVE
S: "bar.sieve"
S: "baz.sieve"
S: OK "Listscripts completed"
(Response(response=Atom('OK'), code=(), message=None),
[('foo.sieve', True), ('bar.sieve', False), ('baz.sieve', False)])
"""
login: str = ''
"""Login name (authentication ID)."""
ocsp: bool
"""Check whether the server certificate was revoked?"""
owner: str = ''
"""User whose scripts are managed (authorisation ID)."""
poll: Optional[select.poll] = None
"""Polling object for :attr:`sock`."""
port: Optional[int] = None
"""Remote port."""
sock: Optional[socket.SocketType] = None
"""Underlying socket."""
sslcontext: ssl.SSLContext = ssl.create_default_context()
"""Settings for negotiating Transport Layer Security (TLS).
Disable workarounds for broken X.509 certificates:
>>> with SieveManager() as mgr:
>>> mgr.sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT
>>> mgr.open('imap.foo.example')
>>> ...
Load client certificate/key pair:
>>> with SieveManager() as mgr:
>>> mgr.sslcontext.load_cert_chain(cert='cert.pem')
>>> mgr.open('imap.foo.example')
>>> ...
Use a custom certificate authority:
>>> with SieveManager() as mgr:
>>> mgr.sslcontext.load_verify_locations(cafile='ca.pem')
>>> mgr.open('imap.foo.example')
>>> ...
"""
warning: Optional[str] = None
"""Warning issued in response to the last "CHECKSCRIPT" or "PUTSCRIPT".
For example:
>>> with open('script.sieve', 'br') as file:
>>> mgr.execute('putscript', file, 'script.sieve')
(Response(response=Atom('OK'), code=('warnings,'),
message='line 7: may need to be frobnicated'), [])
>>> mgr.warning
'line 7: may need to be frobnicated'
.. note::
Only set by :meth:`collect`, :meth:`execute`,
:meth:`checkscript`, and :meth:`putscript`.
.. seealso::
:rfc:`5804` (sec. 1.3)
ManageSieve "WARNINGS" response code.
"""
_auth: tuple = ()
"""Positional arguments for SASL mechanism constructors."""
_kwauth: dict[str, Any] = {}
"""Keyword arguments for SASL mechanism constructors."""
_logauth: bool
"""Log the authentication exchange?"""
_sasl: Iterable[AuthMech] = ()
"""SASL mechanisms."""
_source: tuple[str, int] = ('', 0)
"""Source address and port."""
[docs]
class SieveManager(SieveConn, contextlib.AbstractContextManager):
"""Connection to a ManageSieve server.
For example:
>>> with SieveManager('imap.foo.example') as mgr:
>>> mgr.authenticate('user', 'password')
>>> with open('sieve.script', 'br') as script:
>>> mgr.putscript(script, 'sieve.script')
>>> mgr.setactive('sieve.script')
.. warning::
:class:`SieveManager` is not thread-safe.
"""
# pylint: disable=redefined-outer-name
[docs]
def __init__(self, *args, backup: int = 0,
memory: int = 524_288, **kwargs):
"""Create a :class:`SieveManager` object.
If `args` or `kwargs` are given, they are passed to :meth:`open`.
Otherwise, no connection is established.
Arguments:
backup: How many backups to keep by default.
memory: See `max_size` in :class:`tempfile.SpooledTemporaryFile`.
args: Positional arguments for :meth:`open`.
kwargs: Keyword arguments for :meth:`open`.
Raises:
AppConnectionError: :attr:`sock` has died.
SieveCapabilityError: "STARTTLS" not supported.
SieveProtocolError: Server violated the ManageSieve protocol.
TLSSecurityError: Server certificate has been revoked.
"""
super().__init__(*args, **kwargs)
self.backup: int = backup
self.memory: int = memory
def __exit__(self, _exctype, excvalue, _traceback):
"""Exit the context and close the connection appropriately."""
if isinstance(excvalue, (ConnectionError, TimeoutError)):
self.close()
elif isinstance(excvalue, ProtocolError):
self.shutdown()
else:
try:
self.logout()
except AppOperationError:
try:
self.shutdown()
except OSError:
self.close()
[docs]
def backupscript(self, script: str, keep: int = 1):
"""Make an Emacs-style backup of `script`.
`keep` = 0
Do nothing.
`keep` = 1
:file:`script` is backed up as :file:`script~`.
`keep` > 1
:file:`script` is backed up as :file:`script.~{n}~`.
`n` starts with 1 and increments with each backup.
Old backups are deleted if there are more than `keep` backups.
For example:
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=0)
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=1)
>>> mgr.listscripts()
[('script.sieve', True), ('script.sieve~', False)]
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True), ('script.sieve.~1~', False)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True),
('script.sieve.~1~', False),
('script.sieve.~2~', False)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True),
('script.sieve.~2~', False),
('script.sieve.~3~', False)]
Arguments:
script: Script name.
keep: How many backups to keep.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
def getfiles() -> Iterator[str]:
for script, _ in self.listscripts():
yield script
def copy(src: str, targ: str):
self.copyscript(src, targ, backup=0)
backup(script, keep, getfiles, copy, self.deletescript)
[docs]
def checkscript(self, script: Union[str, IO]):
"""Check whether `script` is valid.
Syntax errors trigger a :exc:`SieveOperationError`.
Semantic errors are reported in :attr:`warning`.
For example:
>>> checkscript('foo')
Traceback (most recent call last):
[...]
SieveOperationError: line 1: error: expected end of command ';'
error: parse failed.
>>> checkscript('# foo')
>>>
Arguments:
script: Script (*not* script name).
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveCapabilityError: "CHECKSCRIPT" not supported.
SieveConnectionError: Server has closed the connection.
SieveOperationError: `Script` contains syntax errors.
SieveProtocolError: Server violated the ManageSieve protocol.
.. important::
Sieve scripts must be encoded in UTF-8.
"""
assert self.capabilities
if not self.capabilities.version:
raise SieveCapabilityError('CHECKSCRIPT: not supported')
self.execute('CHECKSCRIPT', script)
[docs]
def copyscript(self, source: str, target: str,
backup: Optional[int] = None):
"""Download `source` and re-upload it as `target`.
Arguments:
source: Source name.
target: Target name.
backup: How many backups to keep (default: :attr:`backup`).
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
with SpooledTemporaryFile(max_size=self.memory, mode='bw+') as temp:
temp.write(self.getscript(source).encode('utf8'))
temp.seek(0)
self.putscript(temp, target, backup=backup)
[docs]
def deletescript(self, script: str):
"""Delete `script`.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self._scripts = None
self.validname(script, check=True)
self.execute('DELETESCRIPT', script)
self.logger.info('Removed %s', script)
[docs]
def editscripts(self, command: list[str], scripts: list[str], *args,
catch: Optional[Callable[[Exception, str], bool]] = None,
check: bool = True, create: bool = True,
**kwargs) -> subprocess.CompletedProcess:
"""Download `scripts`, edit them with `command`, and re-upload them.
The `scripts` are appended to the `command`, which is then passed
to :func:`subprocess.run`. Scripts that have been changed are then
re-uploaded to the server. If the server has closed the connection
in the meantime, the connection is re-established automatically.
If :meth:`putscript` raises an error and `catch` has been given,
then the error and the name of the offending script are passed to
`catch`, which should return `True` if the `command` should be
re-invoked for that script and `False` otherwise. Either way,
the error will be suppressed.
For example:
>>> mgr.editscripts(['vi'], ['foo.sieve'])
>>> cp = mgr.editscripts(['cmp'], ['a.sieve', 'b.sieve'], check=False)
>>> if cp.returncode != 0:
>>> print('a.sieve and b.sieve differ')
Arguments:
command: Command to run.
scripts: Scripts to edit.
catch: Error handler.
check: See :func:`subprocess.run`.
create: Create scripts that do not exist?
args: Positional arguments for :func:`subprocess.run`.
kwargs: Keywords arguments for :func:`subprocess.run`.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveOperationError: At least one script contains a syntax error.
[#editscripts-catch]_
SieveProtocolError: Server violated the ManageSieve protocol.
ValueError: Script name contains path separator.
.. [#editscripts-catch] Only raised if `catch` has *not* been given.
"""
for script in scripts:
self.validname(script, check=True)
if path.sep in script:
raise ValueError(f'{script}: contains {path.sep}')
with TemporaryDirectory() as tmpdir:
fnames = []
ctimes = []
for script in scripts:
fname = path.join(tmpdir, script)
with open(fname, 'w', encoding='utf8') as file:
try:
file.write(self.getscript(script))
except SieveOperationError as err:
if not create:
raise
if not err.code:
if self.scriptexists(script):
raise
elif not err.matches('NONEXISTENT'):
raise
fnames.append(fname)
ctimes.append(os.stat(fname).st_ctime)
while True:
cp = subprocess.run(command + fnames, *args,
check=check, **kwargs)
retry: list[tuple[str, str, float]] = []
for script, fname, ctime in zip(scripts, fnames, ctimes):
if os.stat(fname).st_ctime > ctime:
with open(fname, 'rb') as file:
try:
self._withreopen(self.putscript, file, script)
except SieveOperationError as err:
if catch is None:
raise
if catch(err, script):
retry.append((script, fname, ctime))
if not retry:
return cp
scripts, fnames, ctimes = map(list, zip(*retry))
# NOTREACHED
[docs]
def getactive(self) -> Optional[str]:
"""Get the name of the active script.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
for name, active in self.listscripts():
if active:
return name
return None
[docs]
def getscript(self, script: str) -> str:
"""Download `script`.
For example:
>>> with open('foo.sieve', 'w', encoding='utf8') as file:
>>> file.write(mgr.getscript('foo.sieve'))
Arguments:
script: Script name.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self.validname(script, check=True)
try:
# pylint: disable=unbalanced-tuple-unpacking
_, ((content,),) = self.execute('GETSCRIPT', script)
except ValueError as err:
raise SieveProtocolError('unexpected data') from err
if isinstance(content, Atom):
raise SieveProtocolError('unexpected atom')
if isinstance(content, str):
return content
raise SieveProtocolError('unexpected ' + type(content).__name__)
[docs]
def havespace(self, script: str, size: int):
"""Check whether there is enough space for `script`.
Arguments:
script: Script name.
size: Script size in bytes.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveOperationError: There is *not* enough space.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self.validname(script, check=True)
self.execute('HAVESPACE', script, size)
[docs]
def listscripts(self, cached: bool = False) -> list[tuple[str, bool]]:
"""List scripts and whether they are the active script.
For example:
>>> mgr.listscripts()
[('foo.sieve', False), ('bar.sieve', True)]
>>> scripts = [script for script, _ in mgr.listscripts()]
Arguments:
cached: Return cached response? [#cached]_
Returns:
A list of script name/status tuples.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
.. [#cached] The cache is cleared after :meth:`copyscript`,
:meth:`deletescript`, :meth:`putscript`,
:meth:`renamescript`, :meth:`setactive`, and
:meth:`unsetactive`.
"""
if not cached or self._scripts is None:
self._scripts = []
for line in self.execute('LISTSCRIPTS')[1]:
try:
name = line[0]
except IndexError as err:
raise SieveProtocolError('no data') from err
if not isinstance(name, str):
raise SieveProtocolError('expected string')
try:
status = line[1]
if not isinstance(status, str):
raise SieveProtocolError('expected string')
active = status.casefold() == 'active'
except IndexError:
active = False
self._scripts.append((name, active))
return self._scripts
[docs]
def logout(self):
"""Log out.
.. note::
:meth:`logout` should be called to close the connection
unless :class:`SieveManager` is used as a context manager.
.. warning::
Logging out is unsafe after a :exc:`ProtocolError`.
Use :meth:`shutdown` instead.
"""
if self.sock is not None:
try:
self.execute('LOGOUT')
self.logger.info('Logged out')
except (OperationError, ProtocolError):
with suppress(OSError):
self.shutdown()
except ConnectionError:
pass
self.close()
[docs]
def noop(self, tag: Optional[str] = None) -> Optional[str]:
"""Request a no-op.
For example:
>>> mgr.noop('foo')
'foo'
Arguments:
tag: String for the server to echo back.
Returns:
Server echo.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveCapabilityError: "NOOP" not supported.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
assert self.capabilities
if not self.capabilities.version:
raise SieveCapabilityError('NOOP: not supported')
args = () if tag is None else (tag,)
res, _ = self.execute('NOOP', *args)
try:
data = res.code[1]
except IndexError:
return None
if isinstance(data, Atom) or not isinstance(data, str):
raise SieveProtocolError('expected string')
return data
[docs]
def putscript(self, source: Union[str, IO], target: str,
backup: Optional[int] = None):
"""Upload `source` to the server as `target`.
The server should reject syntactically invalid scripts.
It may issue a :attr:`warning` for semantically invalid scripts,
but should accept them nonetheless. Updates are atomic.
For example:
>>> mgr.putscript('# empty', 'foo.sieve')
>>> with open('foo.sieve', 'br') as file:
>>> mgr.putscript(file, 'foo.sieve')
Arguments:
source: Script (*not* script name).
target: Script name.
backup: How many backups to keep (default: :attr:`backup`).
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveOperationError: `Script` contains syntax errors.
SieveProtocolError: Server violated the ManageSieve protocol.
.. important::
Sieve scripts must be encoded in UTF-8.
"""
self.validname(target, check=True)
try:
keep = self.backup if backup is None else backup
self.backupscript(target, keep=keep)
except SieveOperationError as err:
if not err.code:
for script, _ in self.listscripts():
if script == target:
raise
elif not err.matches('NONEXISTENT'):
raise
self._scripts = None
self.execute('PUTSCRIPT', target, source)
self.logger.info('Uploaded %s', target)
[docs]
def renamescript(self, source: str, target: str, emulate: bool = True):
"""Rename `source` to `target`.
Some servers do not the support the "RENAMESCRIPT" command.
On such servers, renaming is emulated by downloading `source`,
re-uploading it as `target`, marking `target` as the active script
if `source` is the active script, and then deleting `source`.
For example:
>>> mgr.renamescript('foo.sieve', 'bar.sieve', emulate=False)
Arguments:
source: Script name.
target: Script name.
emulate: Emulate "RENAMESCRIPT" if the server does not support it?
Raises:
SieveCapabilityError: "RENAMESCRIPT" not supported. [#emulate]_
SieveOperationError: `source` does not exist or `target` exists.
.. [#emulate] Only raised if `emulate` is `False`.
"""
assert self.capabilities
self.validname(source, check=True)
self.validname(target, check=True)
if self.capabilities.version:
self._scripts = None
self.execute('RENAMESCRIPT', source, target)
self.logger.info('Renamed %s to %s', source, target)
elif emulate:
sourceactive: Optional[bool] = None
for script, active in self.listscripts():
if script == source:
sourceactive = active
if script == target:
raise SieveOperationError(
code=(Atom('alreadyexists'),),
message=f'{target}: {os.strerror(EEXIST)}'
)
if sourceactive is None:
raise SieveOperationError(
code=(Atom('nonexistent'),),
message=f'{source}: {os.strerror(ENOENT)}'
)
self.copyscript(source, target, backup=0)
if sourceactive:
self.setactive(target)
self.deletescript(source)
else:
raise SieveCapabilityError('RENAMESCRIPT: not supported')
[docs]
def scriptexists(self, script: str, cached: bool = False) -> bool:
"""Check if `script` exists.
Arguments:
script: Script name.
cached: Return cached response? [#cached]_
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self.validname(script, check=True)
return any(s == script for s, _ in self.listscripts(cached=cached))
[docs]
def setactive(self, script: str):
"""Mark `script` as the active script.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self.validname(script, check=True)
self._scripts = None
self.execute('SETACTIVE', script)
self.logger.info('Activated %s', script)
[docs]
def unauthenticate(self):
"""Unauthenticate.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveCapabilityError: "UNAUTHENTICATE" not supported.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
assert self.capabilities
if not self.capabilities.unauthenticate:
raise SieveCapabilityError('UNAUTHENTICATE: not supported')
self.execute('UNAUTHENTICATE')
self.login = ''
self.owner = ''
self.logger.info('Un-authenticated')
[docs]
def unsetactive(self):
"""Deactivate the active script.
Raises:
AppConnectionError: :attr:`sock` has died.
AppOperationError: Another operation is already in progress.
SieveConnectionError: Server has closed the connection.
SieveProtocolError: Server violated the ManageSieve protocol.
"""
self._scripts = None
self.execute('SETACTIVE', '')
self.logger.info('Deactivated active script')
[docs]
@classmethod
def validname(cls, script: str, check: bool = False) -> bool:
"""Check whether `script` is a valid script name.
Arguments:
script: Script name
check: Raise an error if `script` is not a valid script name?
Raises:
ValueError: `script` is *not* valid. [#validname-check]_
.. [#validname-check] Only raised if `check` is `True`.
"""
if cls._isname(script):
return True
if check:
raise ValueError(escapectrl(script) + ': bad name')
return False
backup: int = 0
"""How many backups to keep."""
_isname: Callable[..., Optional[re.Match]] = re.compile(
'[^\u0000-\u001f\u0080-\u009f\u2028\u2029]+'
).fullmatch
"""Check whether a string is a valid script name.
.. seealso::
:rfc:`5198` (sec. 2)
Definition of unicode format for network interchange.
:rfc:`5804` (sec. 1.6)
ManageSieve script names.
"""
_scripts: Optional[list[tuple[str, bool]]] = None
"""Scripts returned by the last :meth:`listscripts`."""
[docs]
class SieveSASLAdapter(AbstractSASLAdapter):
"""Adapter to send SASL messages over a :class:`SieveConn`."""
[docs]
def __init__(self, connection: 'SieveConn'):
"""Initialise the adapter."""
self.conn = connection
[docs]
def abort(self):
self.send(b'*')
self.end()
[docs]
def begin(self, name: str, data: Optional[bytes] = None):
assert self.conn
args: list[Any] = [Atom('AUTHENTICATE'), name.upper()]
if data is not None:
args.append(b64encode(data))
self.conn.sendline(*args) # type: ignore[arg-type]
[docs]
def end(self):
assert self.conn
try:
res = Response.fromline(self.conn.receiveline())
except ValueError as err:
raise SieveProtocolError(str(err)) from err
if res.response != 'OK':
raise res.toerror()
[docs]
def send(self, data: bytes):
assert self.conn
conn = self.conn
conn.sendline(b64encode(data)) # type: ignore[arg-type]
[docs]
def receive(self) -> bytes:
assert self.conn
try:
line = self.conn.receiveline()
except ValueError as err:
raise SieveProtocolError(str(err)) from err
if isinstance(line[0], Atom):
res = Response.fromline(line)
raise res.toerror()
try:
word, = line
except ValueError as err:
raise SieveProtocolError('unexpected data') from err
if isinstance(word, Atom) or not isinstance(word, str):
raise SieveProtocolError('expected string')
return b64decode(word)
@property
def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
assert self.conn
assert self.conn.sock
return self.conn.sock
@sock.setter
def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
assert self.conn
self.conn.sock = sock
conn: Optional[SieveConn] = None
"""Underlying connection."""
CapabilitiesT = TypeVar('CapabilitiesT', bound='Capabilities')
""":class:`Capabilities` type variable."""
[docs]
@dataclass
class Capabilities():
"""Server capabilities."""
[docs]
@classmethod
def fromlines(cls: type[CapabilitiesT],
lines: Iterable[Line]) -> CapabilitiesT:
"""Create a :class:`Capabilities` object from a server response."""
def getvalue(words: Line) -> str:
try:
value, = words[1:]
except ValueError as err:
raise SieveProtocolError('expected word') from err
if not isinstance(value, str):
raise SieveProtocolError('expected string')
return value
obj = cls()
for words in lines:
try:
key = words[0].casefold() # type: ignore[union-attr]
except IndexError as err:
raise SieveProtocolError('expected word') from err
except AttributeError as err:
raise SieveProtocolError('expected string') from err
if key in ('implementation', 'language', 'owner', 'version'):
setattr(obj, key, getvalue(words))
elif key in ('notify', 'sasl', 'sieve'):
setattr(obj, key, tuple(getvalue(words).casefold().split()))
elif key == 'maxredirects':
setattr(obj, key, int(getvalue(words)))
elif key in ('starttls', 'unauthenticate'):
setattr(obj, key, True)
else:
try:
obj.notunderstood[key] = words[1]
except IndexError:
obj.notunderstood[key] = True
return obj
implementation: Optional[str] = None
"""Server application."""
sieve: tuple[str, ...] = ()
"""Supported Sieve modules."""
language: Optional[str] = None
"""Language."""
maxredirects: Optional[int] = None
"""Maximum number of redirect operations permitted in a script."""
notify: tuple[str, ...] = ()
"""URI schema parts for supported notification methods."""
owner: str = ''
"""Canonical name of the user whose scripts are managed."""
sasl: tuple[str, ...] = ()
"""Supported authentication methods."""
starttls: bool = False
"""Is "STARTTLS" available?"""
unauthenticate: bool = False
"""Is "UNAUTHENTICATE" available?"""
version: Optional[str] = None
"""ManageSieve protocol version."""
notunderstood: dict = dataclasses.field(default_factory=dict)
"""Capabilities not understood by SieveManager."""
ResponseT = TypeVar('ResponseT', bound='Response')
"""Type variable for :class:`Response`."""
[docs]
@dataclass(frozen=True)
class Response():
"""Server response to a command.
.. seealso::
:rfc:`5804` (secs. 1.2, 1.3, 4, 6.4, and passim)
ManageSieve responses
"""
[docs]
@classmethod
def fromline(cls: type[ResponseT], line: Line) -> ResponseT:
"""Create a :class:`Response` object from a :class:`Line`."""
# pylint: disable=redefined-outer-name
code = cls.code
message = cls.message
response = None
for i, word in enumerate(line):
if isinstance(word, Atom):
if i == 0:
response = word
continue
elif isinstance(word, str):
if 1 <= i <= 2:
message = word
continue
elif isinstance(word, Sequence):
if i == 1:
code = tuple(word)
continue
raise SieveProtocolError('malformed response')
if response is None:
raise SieveProtocolError('expected atom')
return cls(response=response, code=code, message=message)
[docs]
def __str__(self) -> str:
""":attr:`message` or, if no message was returned, a stub message."""
return self.message if self.message else f'server says {self.response}'
[docs]
def matches(self, *categories: str) -> bool:
"""Check if :attr:`code` matches any of the given `categories`.
Returns `False` if :attr:`code` is empty.
Matching is case-insensitive.
For example:
>>> with open('script.sieve') as script:
>>> try:
>>> mgr.putscript(script, script.name)
>>> except SieveOperationError as err:
>>> if err.matches('QUOTA'):
>>> print('over quota')
Print more informative messages:
>>> with open('script.sieve') as script:
>>> try:
>>> mgr.putscript(script, script.name)
>>> except SieveOperationError as err:
>>> if err.matches('QUOTA/MAXSCRIPTS'):
>>> print('too many scripts')
>>> elif err.matches('QUOTA/MAXSIZE'):
>>> print(f'{script.name} is too large')
>>> elif err.matches('QUOTA'):
>>> print('over quota')
"""
try:
rescode = self.code[0]
except IndexError:
return False
assert isinstance(rescode, str)
for cat in categories:
pattern = re.escape(cat.removesuffix('/')) + r'(/|$)'
if re.match(pattern, rescode, flags=re.IGNORECASE):
return True
return False
[docs]
def toerror(self) -> 'SieveError':
"""Convert a :class:`Response` into an error."""
cls = (SieveConnectionError if self.response == 'BYE' else
SieveOperationError)
return cls(self.response, self.code, self.message)
response: Atom
"""'OK', 'NO', or 'BYE'.
======== ===========================
Response Meaning
======== ===========================
'OK' Success
'NO' Failure
'BYE' Connection closed by server
======== ===========================
"""
code: tuple[Word, ...] = ()
"""Response code.
ManageSieve response codes are lists of categories, separated by
slashes ("/"), where each category is the super-category of the
next (e.g., "quota/maxsize").
Some response codes carry data (e.g., ``TAG "SYNC-123"``).
See :rfc:`5804` (sec. 1.3) for a list of response codes.
.. warning::
Servers need *not* return response codes.
"""
message: Optional[str] = None
"""Human-readable message.
.. warning::
Servers need *not* return a message.
"""
[docs]
@dataclass(frozen=True)
class SRV():
"""DNS SRV record.
.. seealso::
:rfc:`2782`
DNS SRV
"""
priority: int
weight: int
host: str
port: int
URLT = TypeVar('URLT', bound='URL')
"""Type variable for :class:`URL`."""
[docs]
@dataclass(frozen=True)
class URL():
"""Sieve URL.
.. seealso::
:rfc:`5804` (sec. 3)
Sieve URL Scheme
"""
[docs]
@classmethod
def fromstr(cls: type[URLT], url: str) -> URLT:
"""Create a :class:`URL` object from a URL string.
For example:
>>> URL.fromstr('sieve://user@imap.foo.example')
URL(hostname='imap.foo.example', scheme='sieve',
username='user', password=None, port=None,
owner=None, scriptname=None)
Raises:
ValueError: Not a valid Sieve URL.
"""
if not re.match(r'([a-z][a-z0-9+.-]*:)?//', url):
url = 'sieve://' + url
parts = urllib.parse.urlsplit(url)
if parts.query or parts.fragment:
raise ValueError(f'{url}: not a Sieve URL')
if not parts.hostname:
raise ValueError(f'{url}: no host')
if not (isinetaddr(parts.hostname) or ishostname(parts.hostname)):
raise ValueError(f'{parts.hostname}: neither address nor hostname')
try:
owner, scriptname = parts.path.split('/', maxsplit=2)[1:]
except ValueError:
owner, scriptname = parts.path[1:], None
return cls(
scheme=parts.scheme,
username=parts.username,
password=parts.password,
hostname=parts.hostname,
port=parts.port,
owner=owner if owner else None,
scriptname=scriptname if scriptname else None
)
[docs]
def __str__(self):
"""Get a string representation of the URL."""
url = ''
if self.scheme:
url += f'{self.scheme}://'
else:
url += 'sieve://'
if self.username:
url += self.username
if self.password is not None:
url += f':{self.password}'
url += '@'
if self.hostname:
url += self.hostname
else:
url += 'localhost'
if self.port is not None:
url += f':{self.port}'
if self.owner:
url += f'/{self.owner}'
if self.scriptname:
url += f'/{self.scriptname}'
return url
hostname: str
scheme: str = 'sieve'
username: Optional[str] = None
password: Optional[str] = None
port: Optional[int] = None
owner: Optional[str] = None
scriptname: Optional[str] = None
#
# Authentication
#
[docs]
class BaseAuth(AbstractAuth, ABC):
"""Base class for authentication mechanisms.
:class:`BaseAuth` provides methods to prepare strings
according to :rfc:`3454` and :rfc:`4013` and a layer over
the underlying :class:`AbstractSASLAdapter` object that
calls :meth:`AbstractSASLAdapter.begin` and
:meth:`AbstractSASLAdapter.end` transparently.
Credentials must be prepared in :meth:`__init__`. Subclasses
should pass `connection`, `authcid`, `authzid`, and `prepare`
to :code:`super().__init__` and use :meth:`prepare` to prepare
the remaining credentials. For example:
.. literalinclude:: ../sievemgr.py
:pyobject: BasePwdAuth.__init__
:dedent: 4
The SASL exchange must be implemented in :meth:`exchange`.
Subclasses should use :meth:`send` and :meth:`receive` to
exchange SASL messages. For example:
.. literalinclude:: ../sievemgr.py
:pyobject: PlainAuth.exchange
:dedent: 4
"""
[docs]
@staticmethod
# pylint: disable=redefined-outer-name (string)
def prepare(string: str) -> str:
"""Prepare `string` according to :rfc:`3454` and :rfc:`4013`.
Returns:
Prepared `string`.
Raises:
ValueError: `String` is malformed.
"""
if any(rlcat := list(map(stringprep.in_table_d1, string))):
if not (rlcat[0] and rlcat[-1]):
raise ValueError(f'{string}: malformed RandLCat string')
if any(map(stringprep.in_table_d2, string)):
raise ValueError(f'{string}: mixes RandLCat and LCat')
prep = ''
for i, char in enumerate(string, start=1):
if stringprep.in_table_b1(char):
pass
elif stringprep.in_table_c12(char):
prep += ' '
elif stringprep.in_table_c21_c22(char):
raise ValueError(f'{string}:{i}: control character')
elif stringprep.in_table_c3(char):
raise ValueError(f'{string}:{i}: private use character')
elif stringprep.in_table_c4(char):
raise ValueError(f'{string}:{i}: non-character code point')
elif stringprep.in_table_c5(char):
raise ValueError(f'{string}:{i}: surrogate code point')
elif stringprep.in_table_c6(char):
raise ValueError(f'{string}:{i}: not plain text')
elif stringprep.in_table_c7(char):
raise ValueError(f'{string}:{i}: not canonical')
elif stringprep.in_table_c8(char):
raise ValueError(f'{string}:{i}: changes display/deprecated')
elif stringprep.in_table_c9(char):
raise ValueError(f'{string}:{i}: tagging character')
elif stringprep.in_table_a1(char):
raise ValueError(f'{string}:{i}: unassigned code point')
else:
prep += char
return unicodedata.ucd_3_2_0.normalize('NFKC', prep)
[docs]
def __init__(self, adapter: AbstractSASLAdapter,
authcid: str, authzid: str = '',
prepare: SASLPrep = SASLPrep.ALL):
"""Prepare authentication.
`authcid` and `authzid` are prepared according to :rfc:`3454` and
:rfc:`4013` if ``prepare & SASLPrep.USERNAMES`` evaluates as true.
Arguments:
conn: Connection over which to authenticate.
authcid: Authentication ID (user to login as).
authzid: Authorisation ID (user whose rights to acquire).
prepare: Which credentials to prepare.
Raises:
ValueError: Bad characters in username.
"""
prepare &= SASLPrep.USERNAMES # type: ignore[assignment]
self.adapter = adapter
self.authcid = self.prepare(authcid) if prepare else authcid
self.authzid = self.prepare(authzid) if prepare else authzid
# pylint: disable=useless-return
[docs]
def __call__(self) -> Optional[Any]:
"""Authenticate as :attr:`authcid`.
:attr:`authcid` is authorised as :attr:`authzid`
if :attr:`authzid` is set (proxy authentication).
Returns:
Data returned by the server, if any.
Raises:
ConnectionError: Server has closed the connection.
OperationError: Authentication failed.
SASLCapabilityError: Some feature is not supported.
SASLProtocolError: Server violated the SASL protocol.
SASLSecurityError: Server verification failed.
TLSCapabilityError: Channel-binding is not supported.
.. note::
Calls :meth:`exchange` and :meth:`end`.
"""
self.exchange()
if self.state == AuthState.RECEIVED:
self.send(b'')
self.end()
return None
[docs]
def abort(self):
"""Abort authentication.
Raises:
ProtocolError: Protocol violation.
"""
self.adapter.abort()
self.state = AuthState.DONE
[docs]
def begin(self, data: Optional[bytes] = None):
"""Begin authentication.
Arguments:
data: Optional client-first message.
Raises:
ConnectionError: Connection was closed.
ProtocolError: Protocol violation.
"""
if self.state == AuthState.PREAUTH:
self.adapter.begin(self.name, data)
self.state = AuthState.SENT
else:
raise SASLProtocolError(f'SASL state {self.state}: unexpected')
[docs]
def end(self):
"""Conclude authentication.
Raises:
ConnectionError: Connection was closed.
OperationError: Authentication failed.
ProtocolError: Protocol violation.
"""
if self.state == AuthState.SENT:
self.adapter.end()
else:
raise SASLProtocolError(f'SASL state {self.state}: unexpected')
[docs]
@abstractmethod
def exchange(self):
"""Exchange SASL messages."""
[docs]
def send(self, data: bytes):
"""Encode and send an SASL message.
Raises:
ConnectionError: Connection was closed.
ProtocolError: Protocol violation.
.. note::
Calls :meth:`begin` if needed.
"""
if self.state == AuthState.PREAUTH:
self.begin(data)
elif self.state == AuthState.RECEIVED:
self.adapter.send(data)
self.state = AuthState.SENT
else:
raise SASLProtocolError(f'SASL state {self.state}: unexpected')
# pylint: disable=missing-raises-doc
[docs]
def receive(self) -> bytes:
"""Receive and decode an SASL message.
Raises:
ConnectionError: Connection was closed.
OperationError: Authentication failed.
ProtocolError: Protocol violation.
.. note::
Calls :meth:`begin` if needed.
"""
if self.state == AuthState.PREAUTH:
self.begin()
if self.state == AuthState.SENT:
try:
data = self.adapter.receive()
self.state = AuthState.RECEIVED
except SieveOperationError as err:
if err.response != 'OK':
raise
if not err.matches('SASL'):
# pylint: disable=raise-missing-from
raise SASLProtocolError('expected data')
try:
word = err.code[1]
except ValueError as valuerr:
raise SASLProtocolError('unexpected data') from valuerr
if isinstance(word, Atom) or not isinstance(word, str):
# pylint: disable=raise-missing-from
raise SASLProtocolError('expected string')
data = b64decode(word)
self.state = AuthState.DONE
return data
raise SASLProtocolError(f'SASL state {self.state}: unexpected')
@property
def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
"""Underlying socket."""
assert self.adapter
return self.adapter.sock
@sock.setter
def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
assert self.adapter
self.adapter.sock = sock
adapter: AbstractSASLAdapter
"""Underlying SASL adapter."""
state: AuthState = AuthState.PREAUTH
"""Current authentication state."""
[docs]
class BasePwdAuth(BaseAuth, ABC):
"""Base class for password-based authentication mechanisms.
Prepares credentials, so that subclasses need only
implement :meth:`exchange`. For example:
.. literalinclude:: ../sievemgr.py
:pyobject: PlainAuth
"""
[docs]
def __init__(self, connection: AbstractSASLAdapter,
authcid: str, password: str, authzid: str = '',
prepare: SASLPrep = SASLPrep.ALL):
"""Prepare authentication.
`authcid`, `password`, and `authzid` are prepared according to
:rfc:`3454` and :rfc:`4013` if ``prepare & SASLPrep.USERNAMES``
and/or ``prepare & SASLPrep.PASSWORDS`` are non-zero.
Arguments:
conn: Connection over which to authenticate.
authcid: Authentication ID (user to login as).
password: Password.
authzid: Authorisation ID (user whose rights to acquire).
prepare: Which credentials to prepare.
Raises:
ValueError: Bad characters in username or password.
"""
super().__init__(connection, authcid, authzid, prepare)
prepare &= SASLPrep.PASSWORDS # type: ignore[assignment]
self.password = self.prepare(password) if prepare else password
password: str
"""Password."""
[docs]
class BaseScramAuth(BasePwdAuth, ABC):
"""Base class for SCRAM authentication mechanisms.
Implements :meth:`exchange`, so that subclasses need only define a digest.
For example:
.. literalinclude:: ../sievemgr.py
:pyobject: ScramSHA1Auth
.. seealso::
:rfc:`5802`
Salted Challenge Response Authentication Mechanism (SCRAM).
:rfc:`7677`
SCRAM-SHA-256 and SCRAM-SHA-256-PLUS.
https://datatracker.ietf.org/doc/html/draft-melnikov-scram-bis
Updated recommendations for implementing SCRAM.
https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha-512-03
SCRAM-SHA-512 and SCRAM-SHA-512-PLUS.
https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha3-512-03
SCRAM-SHA3-512 and SCRAM-SHA3-512-PLUS.
https://csb.stevekerrison.com/post/2022-01-channel-binding
Discussion of TLS channel binding.
https://csb.stevekerrison.com/post/2022-05-scram-detail
Discussion of SCRAM.
"""
[docs]
def exchange(self):
# Compare to
# * https://github.com/stevekerrison/auth-examples
# * https://github.com/horazont/aiosasl
def todict(msg: bytes) -> dict[bytes, bytes]:
return dict([a.split(b'=', maxsplit=1) for a in msg.split(b',')])
def escape(b: bytes) -> bytes:
return b.replace(b'=', b'=3D').replace(b',', b'=2C')
# Parameters
authcid = self.authcid.encode('utf8')
authzid = self.authzid.encode('utf8')
password = self.password.encode('utf8')
chan_bind_type = self.cbtype.encode('utf8')
chan_bind_data = self.cbdata
c_nonce_len = self.noncelen
digest = self.digest
# Send client-first message
chan_bind_attr = b'p=%s' % chan_bind_type if chan_bind_type else b'n'
c_first_prefix = chan_bind_attr + b',' + escape(authzid)
c_nonce = b64encode(secrets.token_bytes(c_nonce_len))
c_first_bare = b'n=%s,r=%s' % (escape(authcid), c_nonce)
c_first = c_first_prefix + b',' + c_first_bare
self.send(c_first)
# Receive server-first message
try:
s_first = self.receive()
except SieveOperationError as err:
# pylint: disable=bad-exception-cause (???)
raise SASLCapabilityError(f'{self.name}: {err}') from err
s_first_dict = todict(s_first)
iters = int(s_first_dict[b'i'])
s_nonce = s_first_dict[b'r']
salt = b64decode(s_first_dict[b's'])
# Send client-final message
salted_pwd = hashlib.pbkdf2_hmac(digest, password, salt, iters)
c_key = hmac.digest(salted_pwd, b'Client Key', digest)
stored_key = hashlib.new(digest, c_key).digest()
chan_bind = b64encode(c_first_prefix + b',' + chan_bind_data)
c_final_prefix = b'c=%s,r=%s' % (chan_bind, s_nonce)
auth_message = b','.join((c_first_bare, s_first, c_final_prefix))
c_signature = hmac.digest(stored_key, auth_message, digest)
c_proof = b64encode(bytes(a ^ b for a, b in zip(c_key, c_signature)))
c_final = c_final_prefix + b',p=%s' % c_proof
self.send(c_final)
# Receive server-final message
s_final = self.receive()
s_key = hmac.digest(salted_pwd, b'Server Key', digest)
s_signature = hmac.digest(s_key, auth_message, digest)
s_final_dict = todict(s_final)
if s_signature != b64decode(s_final_dict[b'v']):
host = self.sock.getpeername()[0]
raise SASLSecurityError(f'{host}: verification failed')
@property
@abstractmethod
def digest(self) -> str:
"""Digest name as used by :mod:`hashlib` and :mod:`hmac`."""
cbtype: str = ''
"""TLS channel-binding type."""
cbdata: bytes = b''
"""TLS channel-binding data."""
noncelen: int = 18
"""Client nonce length in bytes."""
[docs]
class BaseScramPlusAuth(BaseScramAuth, ABC):
"""Base class for SCRAM mechanisms with channel binding.
For example:
.. literalinclude:: ../sievemgr.py
:pyobject: ScramSHA1PlusAuth
"""
# Channel-binding is, for the most part, implemented in BaseScramAuth.
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not isinstance(self.sock, ssl.SSLSocket):
raise SASLProtocolError('non-TLS channel cannot be bound')
for cbtype in ('tls-exporter', 'tls-unique', 'tls-server-endpoint'):
if cbtype in ssl.CHANNEL_BINDING_TYPES:
if cbdata := self.sock.get_channel_binding(cbtype):
self.cbtype = cbtype
self.cbdata = cbdata
break
else:
raise TLSCapabilityError('no supported channel-binding type')
[docs]
class AuthzUnsupportedMixin():
"""Mixin for SASL mechanisms that do not support authorisation.
For example:
.. literalinclude: ../sievemgr.py
:pyobject: LoginAuth
"""
[docs]
def __init__(self, *args, **kwargs):
"""Prepare authentication.
Raises:
SASLCapabilityError: :attr:`authzid` is set."""
assert isinstance(self, BaseAuth)
super().__init__(*args, **kwargs)
if self.authzid:
raise SASLCapabilityError(f'{self.name}: no authorisation')
[docs]
class CramMD5Auth(AuthzUnsupportedMixin, BasePwdAuth):
"""CRAM-MD5 authentication.
.. seealso::
:rfc:`2195` (sec. 2)
Definition of CRAM-MD5.
"""
[docs]
def exchange(self):
challenge = self.receive()
password = self.password.encode('utf8')
digest = hmac.new(password, challenge, hashlib.md5)
data = ' '.join((self.authcid, digest.hexdigest()))
self.send(data.encode('utf8'))
obsolete = True
name = 'CRAM-MD5'
[docs]
class ExternalAuth(BaseAuth):
"""EXTERNAL authentication.
.. seealso::
:rfc:`4422` (App. A)
Definition of the EXTERNAL mechanism.
"""
[docs]
def __call__(self):
"""Authenticate."""
args = (self.authzid.encode('utf8'),) if self.authzid else ()
self.begin(*args)
self.receive()
self.send(b'')
self.end()
[docs]
def exchange(self):
"""No-op."""
name = 'EXTERNAL'
[docs]
class LoginAuth(AuthzUnsupportedMixin, BasePwdAuth):
"""LOGIN authentication.
.. seealso::
https://datatracker.ietf.org/doc/draft-murchison-sasl-login
Definition of the LOGIN mechanism.
"""
[docs]
def __init__(self, *args, **kwargs):
"""Prepare authentication.
Arguments:
conn: Connection over which to authenticate.
authcid: Authentication ID (user to login as).
password: Password.
authzid: Authorisation ID (user whose rights to acquire).
prepare: Which credentials to prepare.
Raises:
ValueError: Password contains CR, LF, or NUL.
"""
super().__init__(*args, **kwargs)
if {self.password} & {'\r', '\n', '\0'}:
raise ValueError('password: contains CR, LF, or NUL')
[docs]
def exchange(self):
self.receive()
self.send(self.authcid.encode('utf8'))
self.receive()
self.send(self.password.encode('utf8'))
obsolete = True
name = 'LOGIN'
[docs]
class PlainAuth(BasePwdAuth):
"""PLAIN authentication.
.. seealso::
:rfc:`4616`
PLAIN authentication mechanism.
"""
[docs]
def exchange(self):
data = '\0'.join((self.authzid, self.authcid, self.password))
self.send(data.encode('utf8'))
name = 'PLAIN'
[docs]
class ScramSHA1Auth(BaseScramAuth):
"""SCRAM-SHA-1 authentication."""
@property
def digest(self) -> str:
return 'sha1'
name = 'SCRAM-SHA-1'
order = -10
[docs]
class ScramSHA1PlusAuth(BaseScramPlusAuth, ScramSHA1Auth):
"""SCRAM-SHA-1-PLUS authentication."""
name = 'SCRAM-SHA-1-PLUS'
order = -1000
[docs]
class ScramSHA224Auth(BaseScramAuth):
"""SCRAM-SHA-224 authentication."""
@property
def digest(self) -> str:
return 'sha224'
name = 'SCRAM-SHA-224'
order = -20
[docs]
class ScramSHA224PlusAuth(BaseScramPlusAuth, ScramSHA224Auth):
"""SCRAM-SHA-224-PLUS authentication."""
name = 'SCRAM-SHA-224-PLUS'
order = -2000
[docs]
class ScramSHA256Auth(BaseScramAuth):
"""SCRAM-SHA-256 authentication."""
@property
def digest(self) -> str:
return 'sha256'
name = 'SCRAM-SHA-256'
order = -30
[docs]
class ScramSHA256PlusAuth(BaseScramPlusAuth, ScramSHA256Auth):
"""SCRAM-SHA-256-PLUS authentication."""
name = 'SCRAM-SHA-256-PLUS'
order = -3000
[docs]
class ScramSHA384Auth(BaseScramAuth):
"""SCRAM-SHA-384 authentication."""
@property
def digest(self) -> str:
return 'sha384'
name = 'SCRAM-SHA-384'
order = -40
[docs]
class ScramSHA384PlusAuth(BaseScramPlusAuth, ScramSHA384Auth):
"""SCRAM-SHA-384-PLUS authentication."""
name = 'SCRAM-SHA-384-PLUS'
order = -4000
[docs]
class ScramSHA512Auth(BaseScramAuth):
"""SCRAM-SHA-512 authentication."""
@property
def digest(self):
return 'sha512'
name = 'SCRAM-SHA-512'
order = -50
[docs]
class ScramSHA512PlusAuth(BaseScramPlusAuth, ScramSHA512Auth):
"""SCRAM-SHA-512-PLUS authentication."""
name = 'SCRAM-SHA-512-PLUS'
order = -5000
# pylint: disable=invalid-name
[docs]
class ScramSHA3_512Auth(BaseScramAuth):
"""SCRAM-SHA-512 authentication."""
@property
def digest(self):
return 'sha3_512'
name = 'SCRAM-SHA3-512'
order = -60
# pylint: disable=invalid-name
[docs]
class ScramSHA3_512PlusAuth(BaseScramPlusAuth, ScramSHA3_512Auth):
"""SCRAM-SHA-512-PLUS authentication."""
name = 'SCRAM-SHA3-512-PLUS'
order = -6000
#
# SieveManager Shell
#
[docs]
class BaseShell():
"""Base class for interactive shells.
:class:`BaseShell` is similar-ish to :class:`cmd.Cmd`. However, lines read
from standard input are :meth:`expanded <expand>` before being passed to
methods that implement commands. :class:`BaseShell` also provides an
extensible :meth:`completion system <complete>` as well as a built-in
:meth:`help system <do_help>`. :attr:`aliases` can be extended, too.
Define a :samp:`do_{command}` method to add `command`.
For example:
>>> class Calculator(BaseShell):
>>> def do_add(self, n, m):
>>> \"\"\"add n m - add n and m\"\"\"
>>> print(n + m)
>>>
>>> def do_sum(self, *numbers):
>>> \"\"\"sum [n ...] - \"\"\"
>>> print sum(numbers)
>>>
>>> aliases = {
>>> '+': 'sum'
>>> }
>>>
>>> calc = Calculator()
>>> calc.executeline('add 0 1')
1
>>> calc.executeline('sum 0 1 2')
3
>>> calc.executeline('+ 0 1 2 3')
6
>>> calc.executeline('add 0')
usage: add n m
>>> calc.executeline('help add')
add n m - add n and m
"""
# Shell behaviour
[docs]
def __init__(self):
"""Initialise a :class:`BaseShell` object."""
self.commands = tuple(self.getcommands())
[docs]
@staticmethod
def columnise(words: Sequence[str], file: TextIO = sys.stdout,
width: int = shutil.get_terminal_size().columns):
"""Print `words` in columns to `file`.
Arguments:
words: Words.
file: Output file.
width: Terminal width in chars.
"""
if (nwords := len(words)) > 0:
colwidth = max(map(len, words)) + 1
maxncols = max((width + 1) // colwidth, 1)
nrows = math.ceil(nwords / maxncols)
rows: tuple[list[str], ...] = tuple([] for _ in range(nrows))
for i, word in enumerate(words):
rows[i % nrows].append(word)
for row in rows:
line = ''.join([col.ljust(colwidth) for col in row])
print(line.rstrip(), file=file)
[docs]
def complete(self, text: str, n: int) -> Optional[str]:
"""Completion function for :func:`readline.set_completer`.
Tab-completion for command names (e.g., "exit") is built in. To
enable tab-completion for the arguments of some `command`, define
a method :samp:`complete_{command}`, which takes the index of the
argument that should be completed and the given `text` and returns
a sequence of :class:`str`-:class:`bool` pairs, where the string
is a completion and the boolean indicates whether a space should
be appended to that completion.
For example:
>>> class Foo(BaseShell)
>>> def do_cmd(self, arg1, arg2):
>>> pass
>>>
>>> def complete_cmd(self, argidx, text):
>>> if argidx == 1:
>>> return [(s, True) for s in ('foo', 'bar', 'baz')]
>>> if argidx == 2:
>>> return [('quux/', False)]
>>> return []
>>>
>>> foo = Foo()
>>> foo.complete('', 0)
'cmd '
>>> foo.complete('', 1)
None
>>> foo.complete('cmd ', 0)
'foo '
>>> foo.complete('cmd ', 1)
'bar '
>>> foo.complete('cmd ', 2)
'baz '
>>> foo.complete('cmd ', 3)
None
>>> foo.complete('cmd b', 0)
'bar '
>>> foo.complete('cmd b', 1)
'baz '
>>> foo.complete('cmd b', 2)
None
>>> foo.complete('cmd bar ', 0)
'quux/'
>>> foo.complete('cmd bar ', 1)
None
Arguments:
text: Possibly partial word to be completed.
n: Index of the completion to return.
Returns:
Either the `n`-th completion for `text` or
``None`` if there is no such completion.
.. admonition:: Side-effects
* Logging of messages with a priority lower than
:data:`logging.ERROR` is suppressed during tab-completion.
* A BEL is printed to the controlling terminal if
no completion for `text` is found.
"""
if n == 0:
logger = self.logger
loglevel = logger.level
if loglevel < logging.ERROR:
logger.setLevel(logging.ERROR)
try:
if args := self.getargs():
command = self.aliases.get(args[0], args[0])
try:
complete = getattr(self, f'complete_{command}')
except AttributeError:
complete = None
self._completions = []
self._completions = sorted(
shlex.quote(word) + (' ' if space else '')
for word, space in complete(len(args), text)
if fnmatch.fnmatchcase(word, text + '*')
) if complete else []
else:
self._completions = sorted(c + ' ' for c in self.commands
if c.startswith(text))
if text and not self._completions:
bell()
finally:
logger.setLevel(loglevel)
try:
return self._completions[n]
except IndexError:
return None
[docs]
@staticmethod
def confirm(prompt: str, default: ConfirmEnum = ConfirmEnum.NO,
multi: bool = False, attempts: int = 3) -> ConfirmEnum:
"""Prompt the user for confirmation.
Arguments:
prompt: Prompt.
default: Default.
multi: Give choices "all" and "none"?
attempts: How often to try before raising a :exc:`ValueError`.
Raises:
ValueError: Unrecognised answer.
"""
assert attempts > 0
with TermIO() as tty:
for _ in range(attempts):
tty.write(prompt + ' ')
answer = tty.readline().strip().casefold()
if answer == '':
return default
if answer in ('y', 'yes'):
return ConfirmEnum.YES
if answer in ('n', 'no'):
return ConfirmEnum.NO
if multi:
if answer == 'all':
return ConfirmEnum.ALL
if answer == 'none':
return ConfirmEnum.NONE
tty.write('Enter "yes", "no", "all", or "none"\n')
else:
tty.write('Enter "yes" or "no"\n')
raise ValueError('too many retries')
# NOTREACHED
[docs]
def enter(self) -> int:
"""Start reading commands from standard input.
Reading stops at the end-of-file marker or when
a command raises :exc:`StopIteration`.
.. admonition:: Side-effects
Entering the shell binds :kbd:`Tab` to :func:`readline.complete`.
"""
hasatty = self.hasatty()
if hasatty:
oldcompleter = readline.get_completer()
olddelims = readline.get_completer_delims()
readline.set_auto_history(True)
readline.set_completer(self.complete)
readline.set_completer_delims(' ')
readline.parse_and_bind('tab: complete')
if readline.__doc__ and 'libedit' in readline.__doc__:
readline.parse_and_bind('python:bind ^I rl_complete')
try:
while True:
try:
prompt = self.getprompt() if hasatty else None
line = input(prompt).strip()
except EOFError:
break
try:
self.retval = self.executeline(line)
except StopIteration:
break
finally:
if hasatty:
readline.set_completer(oldcompleter)
readline.set_completer_delims(olddelims)
return self.retval
[docs]
def execute(self, command: str, *args: str) -> int:
"""Execute `command`.
For example:
>>> shell.execute('ls', 'foo', 'bar')
0
Arguments:
command: Command name.
args: Arguments to the command.
Returns:
Return value.
Raises:
ShellUsageError: Command not found or arguments invalid.
"""
try:
method = getattr(self, f'do_{command}')
except AttributeError as err:
raise ShellUsageError(f'{command}: no such command') from err
try:
inspect.signature(method).bind(*args)
except TypeError as err:
raise ShellUsageError(self.getusage(method)) from err
self.retval = 0 if (retval := method(*args)) is None else retval
return self.retval
[docs]
def executeline(self, line: str) -> int:
""":meth:`Split <split>` `line` and :meth:`execute` it.
For example:
>>> shell.executeline('ls foo bar')
0
Returns:
Return value.
Raises:
ShellUsageError: Command not found or arguments invalid.
"""
for alias, command in self.aliases.items():
prefixlen = len(alias)
if (stripped := line.strip()).startswith(alias):
args = self.expand(stripped[prefixlen:])
break
else:
try:
command, *args = self.expand(line)
except ValueError:
return 0
return self.execute(command, *args)
[docs]
def executescript(self, script: TextIO) -> int:
"""Split `script` into lines and :meth:`execute <excuteline>` them.
For example:
>>> with open('scriptfile') as scriptfile:
>>> shell.executescript(scriptfile)
0
Returns:
Return value.
Raises:
ShellUsageError: Command not found or arguments invalid.
"""
for line in script:
self.retval = self.executeline(line)
return self.retval
[docs]
def expand(self, line: str) -> list[str]:
"""Expand the words that comprise `line`.
Similar to :manpage:`wordexp(3)`. Patterns are expanded using
:func:`fnmatch.fnmatchcase`, with filenames being provided by
the :meth:`completion system <complete>`.
For example:
>>> class Foo(BaseShell)
>>> def complete_cmd(self, _, text):
>>> return [(s, True) for s in ('foo', 'bar', 'baz')]
>>>
>>> foo = Foo()
>>> foo.expand('cmd *')
['cmd', 'foo', 'bar', 'baz']
"""
fnmatchcase = fnmatch.fnmatchcase
expanded: list[str] = []
unescpattern: re.Pattern = re.compile(r'\\([*?\[\]])')
words = self.split(line)
try:
gettokens = getattr(self, f'complete_{words[0]}')
except (IndexError, AttributeError):
gettokens = None
for i, word in enumerate(words):
if gettokens and isinstance(word, ShellPattern):
matches = sorted(token for token, _ in gettokens(i, '')
if (not token.startswith('.')
and fnmatchcase(token, word)))
if not matches:
raise ValueError(f'{word}: no matches')
expanded.extend(matches)
else:
expanded.append(unescpattern.sub(r'\1', word))
return expanded
[docs]
@classmethod
def getargs(cls) -> list[ShellWord]:
""":meth:`Split <split>` line before current completion scope."""
begin = readline.get_begidx()
buffer = readline.get_line_buffer()
return cls.split(buffer[:begin])
[docs]
@classmethod
def getcommands(cls) -> Iterator[str]:
"""Get the shell commands provided by `cls`.
For example:
>>> class Foo(BaseShell)
>>> def do_cmd(self, arg1, arg2):
>>> pass
>>>
>>> tuple(Foo.getcommands())
('cmd', 'exit', 'help')
>>> foo = Foo()
>>> foo.commands
('cmd', 'exit', 'help')
"""
for attr in dir(cls):
with suppress(ValueError):
prefix, name = attr.split('_', maxsplit=1)
if prefix == 'do' and name:
yield name
[docs]
def getprompt(self) -> str:
"""Get a shell prompt."""
return '> '
[docs]
@staticmethod
def getusage(func: Callable) -> Optional[str]:
"""Derive a usage message from `func`'s docstring.
:func:`getusage` assumes that the docstring has the form
:samp:`{command} {args} - {description}`. The usage message
is either the text up to the last dash ("-") or, if the
docstring does not contain a dash, the whole docstring.
Leading and trailing whitespace is stripped.
For example:
>>> def frobnicate(foo, bar):
>>> \"\"\"frobnicate foo bar - frobnicate foo with bar\"\"\"
>>> ...
>>>
>>> BaseShell.getusage(frobnicate)
'frobnicate foo bar'
"""
if (doc := func.__doc__) and (syn := doc.rsplit('-', 1)[0].strip()):
return 'usage: ' + syn
return None
[docs]
@staticmethod
def hasatty() -> bool:
"""Is standard input a terminal?"""
return os.isatty(sys.stdin.fileno())
[docs]
@staticmethod
def split(line: str) -> list['ShellWord']:
"""Split `line` into words as a POSIX-compliant shell would."""
buffer: ShellWord = ''
escape: bool = False
quotes: list[str] = []
tokens: list[ShellWord] = []
unquotepattern = re.compile(r'\[(.)\]')
def addtoken(token: ShellWord):
if not isinstance(token, ShellPattern):
token, _ = unquotepattern.subn(r'\1', token)
tokens.append(token)
for i, char in enumerate(line):
if escape:
buffer += f'[{char}]' if char in '*?[' else char
escape = False
elif quotes:
if char in '\'"':
if char == quotes[-1]:
del quotes[-1]
else:
quotes.append(char)
if quotes:
buffer += char
elif char == '\\':
escape = True
else:
buffer += f'[{char}]' if char in '*?[' else char
elif char == '\\':
escape = True
elif char in '\'"':
quotes.append(char)
elif char.isspace():
if i > 0 and not line[i - 1].isspace():
addtoken(buffer)
buffer = ''
elif char == '#':
break
else:
if char in '*?[':
buffer = ShellPattern(buffer)
buffer += char
if buffer:
addtoken(buffer)
return tokens
# Basic commands
[docs]
def do_exit(self):
"""exit - exit the shell"""
raise StopIteration()
[docs]
def do_help(self, name: Optional[str] = None):
"""help [command] - list commands/show help for command"""
if name:
try:
print(getattr(self, f'do_{name}').__doc__)
except AttributeError:
self.logger.error('%s: Unknown command', name)
else:
self.columnise(self.commands)
# Completers
[docs]
def complete_help(self, *_) -> tuple[tuple[str, bool], ...]:
"""Completer for help."""
return tuple((c, True) for c in self.commands)
# Attributes
aliases: dict[str, str] = {
'!': 'sh',
'?': 'help'
}
"""Mapping of aliases to :attr:`commands`."""
commands: tuple[str, ...]
"""Shell commands. Populated by :meth:`__init__`."""
logger: logging.Logger = logging.getLogger(__name__)
"""Logger.
Messages are logged with the following priorities:
====================== ======================================
Priority Used for
====================== ======================================
:const:`logging.INFO` Help message when the shell is entered
====================== ======================================
"""
retval: int = 0
"""Return value of the most recently completed command."""
_completions: list[str] = []
"""Most recent completions."""
# pylint: disable=too-many-public-methods
[docs]
class SieveShell(BaseShell):
"""Shell around a `SieveManager` connection."""
[docs]
def __init__(self, manager: 'SieveManager', clobber: bool = True,
confirm: ShellCmd = ShellCmd.ALL):
"""Initialise a :class:`SieveShell` object.
Arguments:
manager: Connection to a ManageSieve server.
clobber: Overwrite files?
confirm: Shell commands that require confirmation.
"""
super().__init__()
self.clobber = clobber
self.reqconfirm = confirm
self.manager = manager
# Methods
[docs]
def getprompt(self, *args, **kwargs):
prompt = super().getprompt(*args, **kwargs)
mgr = self.manager
if mgr and (url := mgr.geturl()) is not None:
prompt = ('' if mgr.tls else '(insecure) ') + str(url) + prompt
return prompt
[docs]
def enter(self) -> int:
# pylint: disable=redefined-outer-name
error = self.logger.error
info = self.logger.info
if self.hasatty():
info('Enter "? [command]" for help and "exit" to exit')
while True:
try:
return super().enter()
except (ConnectionError, DNSError, ProtocolError, SecurityError):
raise
# pylint: disable=broad-exception-caught
except Exception as err:
if not self.hasatty():
raise
if isinstance(err, (GetoptError, UsageError)):
error('%s', err)
self.retval = 2
elif isinstance(err, subprocess.CalledProcessError):
error('%s exited with status %d',
err.cmd[0], err.returncode)
self.retval = 1
elif isinstance(err, (FileNotFoundError, FileExistsError)):
error('%s: %s', err.filename, os.strerror(err.errno))
self.retval = 1
# pylint: disable=no-member
elif isinstance(err, OSError) and err.errno:
error('%s', os.strerror(err.errno))
self.retval = 1
elif isinstance(err, (Error, OSError, ValueError)):
for line in str(err).splitlines():
error('%s', line)
self.retval = 1
else:
raise
# NOTREACHED
[docs]
def execute(self, *args, **kwargs) -> int:
retval = super().execute(*args, **kwargs)
if warning := self.manager.warning:
for line in warning.splitlines():
self.logger.warning('%s', escapectrl(line))
return retval
[docs]
def editscripts(self, editor: list[str], *args: str):
"""Edit scripts with the given `editor`."""
mgr = self.manager
def retry(err: Exception, script: str) -> bool:
print(str(err).rstrip('\r\n'), file=sys.stderr)
return bool(self.confirm(f'Re-edit {script}?',
default=ConfirmEnum.YES))
(opts, scripts) = getopt(list(args), 'a')
for opt, _ in opts:
if opt == '-a':
scripts = [active] if (active := mgr.getactive()) else []
if scripts:
mgr.editscripts(editor, list(scripts), catch=retry)
else:
self.logger.error('No scripts given')
# Shell commands
[docs]
@staticmethod
def do_about():
"""about - show information about SieveManager"""
print(ABOUT.strip())
[docs]
def do_activate(self, script: str):
"""activate script - mark script as active"""
self.manager.setactive(script)
# pylint: disable=redefined-loop-name
[docs]
def do_caps(self):
"""caps - show server capabilities"""
print('---')
if (caps := self.manager.capabilities) is not None:
for key, value in caps.__dict__.items():
if not value:
continue
if key in ('implementation', 'language',
'maxredirects', 'owner', 'version'):
if isinstance(value, str):
value = yamlescape(value)
print(f'{key}: {value}')
elif key in ('notify', 'sasl', 'sieve'):
items = [yamlescape(i) if isinstance(i, str) else str(i)
for i in value]
# pylint: disable=consider-using-f-string
print('{}: [{}]'.format(key, ', '.join(items)))
elif key in ('starttls', 'unauthenticate'):
print(f'{key}: yes')
for key, value in caps.notunderstood.items():
if value is not None:
if isinstance(value, str):
value = yamlescape(value)
print(f'{key}: {value}')
print('...')
[docs]
def do_cat(self, *scripts: str):
"""cat [script ...] - concatenate scripts on standard output"""
for script in scripts:
sys.stdout.write(self.manager.getscript(script))
[docs]
def do_cd(self, localdir: str = HOME):
"""cd [localdir] - change local directory"""
os.chdir(localdir)
self.logger.info('Changed directory to %s', localdir)
# pylint: disable=redefined-loop-name
[docs]
def do_cert(self):
"""cert - show the server's TLS certificate."""
indent = ' ' * 4
mgr = self.manager
if not isinstance(mgr.sock, ssl.SSLSocket):
raise ShellUsageError('not a secure connection')
cert = mgr.sock.getpeercert()
assert cert
value: Any
print('---')
for key, value in sorted(cert.items()):
if key in ('OCSP', 'caIssuers', 'crlDistributionPoints'):
print(f'{key}:')
for item in sorted(value):
if isinstance(item, str):
item = yamlescape(item)
print(f'{indent}- {item}')
elif key in ('issuer', 'subject'):
print(f'{key}:')
for pairs in sorted(value):
print(f'{indent}- ', end='')
for i, (k, v) in enumerate(pairs):
if isinstance(v, str):
v = yamlescape(v)
if i == 0:
print(f'{k}: {v}')
else:
print(f'{indent} {k}: {v}\n')
elif key == 'subjectAltName':
print(f'{key}:')
for k, v in sorted(value):
v = yamlescape(v)
print(f'{indent}- {k}: {v}')
elif isinstance(value, str):
value = yamlescape(value)
print(f'{key}: {value}')
elif isinstance(value, int):
print(f'{key}: {value}')
print('...')
[docs]
def do_check(self, localscript: str):
"""check localscript - check whether localscript is valid"""
with open(localscript, 'rb', encoding='utf8') as file:
# checkscript performs a blocking send/sendline,
# but holding a lock should be okay in an interactive app.
fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB)
self.manager.checkscript(file)
self.logger.info('%s is valid', localscript)
[docs]
def do_cmp(self, *args: str) -> int:
"""cmp [-s] script1 [...] scriptN - compare scripts"""
silent = False
opts, scripts = getopt(list(args), 's')
for opt, _ in opts:
if opt == '-s':
silent = True
if len(scripts) < 2:
message = self.getusage(self.do_cmp)
assert message
raise ShellUsageError(message)
prefix = ', '.join(scripts)
contents = map(self.manager.getscript, scripts)
iters = map(str.splitlines, contents)
for i, lines in enumerate(itertools.zip_longest(*iters), start=1):
for j, chars in enumerate(itertools.zip_longest(*lines), start=1):
char1 = chars[0]
for char2 in chars[1:]:
if char1 != char2:
if not silent:
print(f'{prefix}: line {i}, column {j} differs')
return 1
if not silent:
print(f'{prefix}: equal')
return 0
[docs]
def do_cp(self, *args: str):
"""cp [-f|-i] source target - re-upload source as target"""
clobber = self.clobber
confirm = bool(self.reqconfirm & ShellCmd.CP)
opts, scripts = getopt(list(args), 'Cfi')
for opt, _ in opts:
if opt == '-f':
clobber = True
confirm = False
elif opt == '-i':
clobber = True
confirm = True
try:
source, target = scripts
except ValueError as err:
message = self.getusage(self.do_cp)
assert message
raise ShellUsageError(message) from err
if self.manager.scriptexists(target):
if not clobber:
raise FileExistsError(EEXIST, os.strerror(EEXIST), target)
if confirm and not self.confirm(f'Overwrite {target}?'):
return
self.manager.copyscript(source, target)
[docs]
def do_deactivate(self):
"""deactivate - deactivate the active script"""
self.manager.unsetactive()
[docs]
def do_diff(self, *args: str) -> int:
"""diff <options> script1 script2 - show how scripts differ"""
pairs, scripts = getopt(list(args), 'C:U:bcu')
opts = tuple(filter(bool, itertools.chain(*pairs)))
if len(scripts) != 2:
message = self.getusage(self.do_diff)
assert message
raise ShellUsageError(message)
cp = self.manager.editscripts(['diff', *opts], scripts, check=False)
return cp.returncode
[docs]
def do_echo(self, *args: str):
"""echo word [...] - print words to standard output."""
print(*args)
[docs]
def do_ed(self, *args: str):
"""ed [-a] script [...] - edit scripts with a line editor"""
self.editscripts(EDITOR, *args)
[docs]
def do_get(self, *args: str):
"""get [-a] [-f|-i] [-o file] [script ...] - download script"""
mgr = self.manager
opts, sources = getopt(list(args), 'afio:')
output = ''
clobber = self.clobber
confirm = bool(self.reqconfirm & ShellCmd.GET)
multi = len(sources) > 1
for opt, arg in opts:
if opt == '-a':
sources = [active] if (active := mgr.getactive()) else []
elif opt == '-f':
clobber = True
confirm = False
elif opt == '-i':
clobber = True
confirm = True
elif opt == '-o':
if multi:
raise ShellUsageError('-o: too many sources')
output = arg
answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
keep = mgr.backup
for src in sources:
# Try not to make a backup if the source doesn't exist.
# Writing to a temporary file would create a worse race condition.
if keep > 0:
if not mgr.scriptexists(src):
raise FileNotFoundError(ENOENT, os.strerror(ENOENT), src)
targ = output if output else src
flags = O_CREAT | O_EXCL | O_WRONLY | O_TRUNC
if path.exists(targ):
if not clobber:
raise FileExistsError(EEXIST, os.strerror(EEXIST), targ)
if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE):
answer = self.confirm(f'Overwrite {targ}?', multi=multi)
if answer:
def getfiles() -> Iterator[str]:
# pylint: disable=cell-var-from-loop
return readdir(path.dirname(targ), path.isfile)
backup(targ, keep, getfiles, shutil.copy, os.remove)
flags &= ~O_EXCL
else:
continue
fd = os.open(targ, flags, mode=0o644)
# getscript performs a blocking read,
# but holding a lock should be okay in an interactive app.
fcntl.flock(fd, LOCK_SH | LOCK_NB)
with os.fdopen(fd, 'w', encoding='utf8') as file:
file.write(mgr.getscript(src))
self.logger.info('Downloaded %s as %s', src, targ)
[docs]
def do_ls(self, *args: str):
"""ls [-1al] [script ...] - list scripts"""
active = False
long = False
one = not self.hasatty()
(opts, fnames) = getopt(list(args), '1al')
for opt, _ in opts:
if opt == '-1':
one = True
elif opt == '-a':
active = True
elif opt == '-l':
long = True
if active:
if script := self.manager.getactive():
print(script)
else:
self.logger.warning('no active script')
else:
scripts = self.manager.listscripts()
if fnames:
existing = {fname for fname, _ in scripts}
for name in set(fnames) - existing:
raise FileNotFoundError(ENOENT, os.strerror(ENOENT), name)
scripts = [s for s in scripts if s[0] in fnames]
scripts.sort()
if long:
for fname, active in scripts:
print('a' if active else '-', fname)
print('e')
elif one:
for script, _ in scripts:
print(script)
else:
words = [f + ('*' if a else '') for f, a in scripts]
self.columnise(words)
[docs]
def do_more(self, *args: str):
"""more <options> script [...] - display scripts page-by-page."""
mgr = self.manager
pairs, scripts = getopt(list(args), 'aceis')
opts = list(filter(bool, itertools.chain(*pairs)))
if '-a' in opts:
active = mgr.getactive()
if active is None:
self.logger.warning('no active script')
return
scripts = [active]
opts.remove('-a')
elif not scripts:
message = self.getusage(self.do_more)
assert message
raise ShellUsageError(message)
mgr.editscripts(PAGER + opts, list(scripts))
[docs]
def do_mv(self, *args: str):
"""mv [-f|-i] source target - rename source to target"""
clobber = self.clobber
confirm = bool(self.reqconfirm & ShellCmd.MV)
mgr = self.manager
opts, scripts = getopt(list(args), 'Cfi')
for opt, _ in opts:
if opt == '-f':
clobber = True
confirm = False
elif opt == '-i':
clobber = True
confirm = True
try:
source, target = scripts
except ValueError as err:
message = self.getusage(self.do_mv)
assert message
raise ShellUsageError(message) from err
if mgr.scriptexists(target):
if not clobber:
raise FileExistsError(EEXIST, os.strerror(EEXIST), target)
if confirm and not self.confirm(f'Overwrite {target}?'):
return
mgr.backupscript(target)
mgr.deletescript(target)
mgr.renamescript(source, target, emulate=True)
[docs]
def do_put(self, *args: str):
"""put [-f|-i] [-a] [-o name] [localscript ...] - upload scripts"""
active: Optional[str] = None
clobber: bool = self.clobber
confirm: bool = bool(self.reqconfirm & ShellCmd.PUT)
mgr: SieveManager = self.manager
output: str = ''
activate: bool = False
opts, sources = getopt(list(args), 'Cafio:')
multi = len(sources) > 1
for opt, arg in opts:
if opt == '-a':
if multi:
raise ShellUsageError('-a: only one script can be active')
active = mgr.getactive()
activate = True
if active:
output = active
elif opt == '-f':
clobber = True
confirm = False
elif opt == '-i':
clobber = True
confirm = True
elif opt == '-o':
if multi:
raise ShellUsageError('-o: too many sources')
output = arg
answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
for src in sources:
targ = output if output else src
if mgr.scriptexists(targ):
if not clobber:
raise FileExistsError(EEXIST, os.strerror(EEXIST), targ)
if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE):
answer = self.confirm(f'Overwrite {targ}?', multi=multi)
if not answer:
continue
with open(src, encoding='utf8') as file:
# checkscript performs a blocking send/sendline,
# but holding a lock should be okay in an interactive app.
fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB)
mgr.putscript(file, targ)
if activate and targ != active:
mgr.setactive(targ)
[docs]
def do_python(self):
"""python - enter Python read-evaluate-print loop"""
hasatty = self.hasatty()
wrapper = ObjWrapper(self.manager)
if hasatty:
oldcompleter = readline.get_completer()
readline.set_completer(rlcompleter.Completer(wrapper).complete)
with suppress(AttributeError):
readline.clear_history()
readline.set_auto_history(True)
try:
with suppress(SystemExit):
banner = (f'Python {sys.version}\n'
'Enter "help()" for help and "exit()" to exit')
code.interact(local=wrapper, banner=banner, exitmsg='')
finally:
if hasatty:
readline.set_completer(oldcompleter)
with suppress(AttributeError):
readline.clear_history()
[docs]
def do_rm(self, *args: str):
"""rm [-f|-i] [script ...] - remove script"""
confirm = bool(self.reqconfirm & ShellCmd.RM)
opts, scripts = getopt(list(args), 'fi')
for opt, _ in opts:
if opt == '-f':
confirm = False
elif opt == '-i':
confirm = True
answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
multi = len(scripts) > 1
for script in scripts:
if answer is not ConfirmEnum.ALL:
answer = self.confirm(f'Remove {script}?', multi=multi)
if answer is ConfirmEnum.NONE:
self.logger.info('Stopped')
break
if answer:
self.manager.deletescript(script)
[docs]
def do_sh(self, *args: str):
"""sh [command] [argument ...] - run system command or system shell"""
if not args:
args = (pwd.getpwuid(os.getuid()).pw_shell,)
subprocess.run(args, check=True)
[docs]
def do_su(self, user: str):
"""su user - manage scripts of user."""
mgr = self.manager
# pylint: disable=protected-access
_, (args, kwargs) = mgr._getstate()
kwargs['owner'] = user
if mgr.login:
mgr.unauthenticate()
# Some ManageSieve servers reject the first
# "AUTHENTICATE" after an "UNAUTHENTICATE".
for i in range(2):
try:
mgr.authenticate(*args, **kwargs)
except SieveOperationError:
if i:
raise
continue
break
[docs]
def do_vi(self, *args: str):
"""vi [-a] script [...] - edit scripts with a visual editor"""
self.editscripts(VISUAL, *args)
[docs]
def do_xargs(self, command: str, *args: str):
"""xargs cmd [arg ...] - call cmd with arguments from standard input"""
try:
func = getattr(self, f'do_{command}')
except AttributeError as err:
raise ShellUsageError(f'{command}: no such command') from err
lines = []
with suppress(EOFError):
while line := sys.stdin.readline().rstrip('\n'):
lines.append(line)
return func(*args, *lines)
# Globbing and tab-completion
[docs]
@staticmethod
def complete_dirs(_: int, text: str) -> list[tuple[str, bool]]:
"""Complete local directory names."""
return [(d + '/', False)
for d in readdir(path.dirname(text), path.isdir)]
[docs]
@staticmethod
def complete_files(_: int, text: str) -> list[tuple[str, bool]]:
"""Complete local filenames."""
return [(f + '/', False) if path.isdir(f) else (f, True)
for f in readdir(path.dirname(text))]
[docs]
def complete_scripts(self, *_) -> list[tuple[str, bool]]:
"""Complete script names."""
return [(s, True) for s, _ in self.manager.listscripts(cached=True)]
complete_activate = complete_scripts
"""Completer for activate."""
complete_cat = complete_scripts
"""Completer for cat."""
complete_cd = complete_dirs
"""Completer for cd."""
complete_cmp = complete_scripts
"""Completer for cmp."""
complete_cp = complete_scripts
"""Completer for cp."""
complete_check = complete_files
"""Completer for check."""
complete_diff = complete_scripts
"""Completer for diff."""
complete_ed = complete_scripts
"""Completer for ed."""
complete_get = complete_scripts
"""Completer for get."""
complete_ls = complete_scripts
"""Completer for ls."""
complete_more = complete_scripts
"""Completer for more."""
complete_mv = complete_scripts
"""Completer for mv."""
complete_put = complete_files
"""Completer for put."""
complete_rm = complete_scripts
"""Completer for rm."""
complete_vi = complete_scripts
"""Completer for vi."""
# Properties
clobber: bool
"""Overwrite files?"""
reqconfirm: ShellCmd
"""Commands that require confirmation."""
manager: SieveManager
"""Connection to a ManageSieve server."""
[docs]
class ObjWrapper(dict):
"""Object wrapper for use with :func:`code.interact`.
Arguments:
obj: Object to wrap.
"""
[docs]
def __init__(self, obj: Any):
"""Initialise a proxy."""
pairs: dict[str, Any] = {}
for key, value in globals().items():
pairs[key] = value
for cls in obj.__class__.__mro__:
pairs |= cls.__dict__
for name in dir(obj):
pairs[name] = getattr(obj, name)
for name in dir(self):
pairs[name] = getattr(self, name)
super().__init__(pairs)
[docs]
@staticmethod
def exit():
"""Exit the Python read-evaluate-print loop."""
raise SystemExit()
# Needed, or else `help` ignores the wrapper.
[docs]
@staticmethod
def help(*args, **kwargs):
"""Show help."""
help(*args, **kwargs)
#
# Configuration
#
BaseConfigT = TypeVar('BaseConfigT', bound='BaseConfig')
"""Type variable for :class:`BaseConfig`."""
[docs]
class BaseConfig(UserDict):
"""Base class for configurations."""
def __or__(self: BaseConfigT, other) -> BaseConfigT:
obj = self.__class__()
obj.__ior__(self)
obj.__ior__(other)
return obj
def __ior__(self: BaseConfigT, other) -> BaseConfigT:
super().__ior__(other)
with suppress(AttributeError):
for key, value in other._sections.items():
UserDict.__ior__(self._sections[key], value)
return self
[docs]
def loadfile(self, fname: str):
"""Read configuration variables from `fname`.
Raises:
AppConfigError: Syntax error.
"""
ptr = self
cwd = os.getcwd()
with open(fname) as file:
if basedir := path.dirname(fname):
os.chdir(basedir)
try:
for i, line in enumerate(file, start=1):
if (pair := line.strip()) and not pair.startswith('#'):
try:
key, value = pair.split(maxsplit=1)
if key == self._section:
if value not in self._sections:
self._sections[value] = self.__class__()
ptr = self._sections[value]
else:
ptr.set(key, value)
except (AttributeError, TypeError, ValueError) as err:
message = f'{fname}:{i}: {err}'
raise AppConfigError(message) from err
finally:
os.chdir(cwd)
[docs]
def parse(self, expr: str):
"""Split `expr` into a name and a value and set the variable.
`Expr` is split at the first equals sign ("=").
:samp:`{var}` is equivalent to :samp:`{var}=yes`.
:samp:`no{var}` is equivalent to :samp:`{var}=no`.
"""
value: Union[bool, str]
try:
name, value = expr.split('=', maxsplit=1)
except ValueError:
if expr.startswith('no'):
name, value = expr[2:], False
else:
name, value = expr, True
if value == '':
raise ValueError(f'{name}: empty')
self.set(name, value)
[docs]
def set(self, name: str, value):
"""Set the configuration variable `name` to `value`.
raises:
AttributeError: Bad variable.
"""
if name.startswith('_'):
raise AttributeError(f'{name}: private variable')
try:
attr = getattr(self, name)
except AttributeError as err:
raise AttributeError(f'{name}: no such variable') from err
if callable(attr):
raise AttributeError(f'{name}: not a variable')
try:
setattr(self, name, value)
except AttributeError as err:
raise AttributeError(f'{name}: read-only variable') from err
except (TypeError, ValueError) as err:
raise err.__class__(f'{name}: {err}')
@property
def sections(self: BaseConfigT) -> dict[str, BaseConfigT]:
"""Sections in the loaded configuration files."""
return self._sections
_section: ClassVar[str]
"""Name of the statement that starts a section."""
_sections: dict[str, Any] = {}
"""Sections in the loaded configuration files."""
[docs]
class BaseVar(ABC):
"""Base class for :class:`BaseConfig` attributes.
For example:
>>> @dataclasses.dataclass
>>> class FooConfig(BaseConfig):
>>> foo = BoolVar(default=False)
>>> bar = NumVar(cls=int, default=0)
>>>
>>> foo = FooConfig(bar=1)
>>> foo.foo
False
>>> foo.bar
1
>>> foo.foo = 'yes'
>>> foo.foo
True
>>> foo.bar = '2'
>>> foo.bar
2
"""
[docs]
def __init__(self, default: Any = None):
"""Initialise a configuration variable."""
self.default = default
def __get__(self, obj: BaseConfig, _: type) -> Any:
try:
return obj[self.name]
except KeyError:
return self.default
def __set__(self, obj: BaseConfig, value: Any):
if value is None:
with suppress(KeyError):
del obj[self.name]
else:
obj[self.name] = value
def __set_name__(self, _: object, name: str):
self.name = name
name: str
"""Variable name."""
default: Any
"""Default value."""
[docs]
class ExpandingVarMixin():
"""Mixin for variables that do word expansion."""
[docs]
def expand(self, obj: BaseConfig, value: str) -> str:
"""Expand '~' and configuration variables."""
assert isinstance(self, BaseVar)
template = string.Template(value)
# template.get_identifiers is only available in Python >= v3.11.
varnames: set[str] = set()
# pylint: disable=consider-using-f-string
for pattern in (r'\$(%s)' % template.idpattern,
r'\$\{(%s)\}' % template.idpattern):
for match in re.finditer(pattern, value, flags=re.IGNORECASE):
varnames.add(match.group(1))
variables = {}
for name in varnames:
try:
var = getattr(obj, name)
except AttributeError as err:
raise ValueError(f'${name}: no such variable') from err
if var is None:
raise ValueError(f'${name}: not set')
if not isinstance(var, (int, str)):
raise ValueError(f'${name}: not a scalar')
variables[name] = var
return path.expanduser(template.substitute(variables))
[docs]
class ListVarMixin():
"""Mixin for lists."""
splititems: Callable = re.compile(r'\s*,\s*').split
"""Split a comma-separated list into items."""
[docs]
class BoolVar(BaseVar):
"""Convert "yes" and "no" to :class:`bool`."""
def __set__(self, obj: BaseConfig, value: Union[bool, str]):
if value in (True, 'yes'):
super().__set__(obj, True)
elif value in (False, 'no'):
super().__set__(obj, False)
else:
raise ValueError(f'{value}: not a boolean')
[docs]
class CmdVar(BaseVar, ExpandingVarMixin):
"""Split up value into a list using :func:`shlex.split`."""
def __set__(self, obj: BaseConfig, value: str):
super().__set__(obj, shlex.split(value, posix=True))
def __get__(self, obj: BaseConfig, objtype: type) -> Optional[list[str]]:
return (None if (value := super().__get__(obj, objtype)) is None else
[self.expand(obj, word) for word in value])
[docs]
class EnumVar(BaseVar):
"""Convert comma-separated values to an :class:`enum.Enum`."""
[docs]
def __init__(self, *args, cls: type[enum.Enum], **kwargs):
"""Initialise the variable.
Arguments:
name: Variable name.
cls: Enumeration type.
default: Default value.
"""
assert issubclass(cls, enum.Enum)
super().__init__(*args, **kwargs)
self.cls = cls
def __set__(self, obj: BaseConfig, value: Union[enum.Enum, str]):
if isinstance(value, enum.Enum):
super().__set__(obj, value)
elif isinstance(value, str): # type: ignore
for member in self.cls:
if member.name.casefold() == value.casefold():
super().__set__(obj, member)
break
else:
raise ValueError(f'{value}: no such item')
else:
raise TypeError(f'{type(value)}: not an enumeration')
[docs]
class FilenameVar(BaseVar):
"""Expand ``~user`` and make filenames absolute."""
def __set__(self, obj: BaseConfig, value: Optional[str]):
if value is None:
super().__set__(obj, None)
if isinstance(value, str):
super().__set__(obj, path.abspath(path.expanduser(value)))
raise TypeError('{value}: not a str')
[docs]
class FlagVar(BaseVar, ListVarMixin):
"""Convert comma-separated values to an :class:`int`."""
[docs]
def __init__(self, *args, cls: type[enum.IntEnum], **kwargs):
assert issubclass(cls, enum.IntEnum)
super().__init__(*args, **kwargs)
self.cls = cls
def __set__(self, obj: BaseConfig, value: Union[str, int, enum.IntEnum]):
if isinstance(value, (int, enum.IntFlag)):
super().__set__(obj, value)
elif isinstance(value, str): # type: ignore
flag = 0
for name in self.splititems(value):
for member in self.cls:
if name.casefold() == member.name.casefold():
flag |= member.value
break
else:
raise ValueError(f'{name}: no such item')
super().__set__(obj, flag)
else:
raise TypeError(f'{value}: neither an int nor a str')
cls: type[enum.IntEnum]
"""Enumeration type"""
[docs]
class HostVar(BaseVar):
"""Check whether value is a valid hostname."""
def __set__(self, obj: BaseConfig, value: Optional[str]):
if isinstance(value, str):
if not (isinetaddr(value) or ishostname(value)):
raise ValueError(f'{value}: neither hostname nor address')
super().__set__(obj, value)
elif value is None:
super().__set__(obj, value)
else:
raise TypeError(f'{value}: not a string')
[docs]
class NumVar(BaseVar):
"""Convert value to a number of type :attr:`cls`."""
[docs]
def __init__(self, *args, cls: type = int,
minval: Optional[Union[float, int]] = None,
maxval: Optional[Union[float, int]] = None,
**kwargs):
"""Initialise the variable.
Arguments:
name: Variable name.
cls: Number type.
minval: Smallest permissible value.
maxval: Greatest permissible value.
default: Default value.
"""
super().__init__(*args, **kwargs)
self.cls = cls
self.minval = minval
self.maxval = maxval
def __set__(self, obj: BaseConfig, value: Union[int, float, str]):
try:
num = self.cls(value)
except ValueError as err:
raise ValueError(f'{value}: not a number') from err
if self.minval is not None and num < self.minval:
raise ValueError(f'{value} < {self.minval}')
if self.maxval is not None and num > self.maxval:
raise ValueError(f'{value} > {self.maxval}')
super().__set__(obj, num)
cls: type
"""Number type."""
minval: Optional[Union[float, int]]
"""Minimum value."""
maxval: Optional[Union[float, int]]
"""Maximum value."""
[docs]
class SASLMechVar(BaseVar, ListVarMixin):
"""Convert SASL mechanism names to :class:`BaseAuth` subclasses."""
def __set__(self, obj: BaseConfig,
value: Union[Iterable[type[BaseAuth]], str]):
if isinstance(value, str):
classes = AbstractAuth.getmechs(obsolete=True)
mechs = []
for name in self.splititems(value.casefold()):
matches = []
for cls in classes:
if fnmatch.fnmatchcase(cls.name.casefold(), name):
if cls in mechs:
raise ValueError(f'{cls.name}: duplicate')
matches.append(cls)
if not matches:
raise ValueError(f'{name}: no matches')
mechs.extend(matches)
elif isinstance(value, Sequence):
mechs = value # type: ignore[assignment]
else:
raise TypeError(f'{value}: not an SASL mechanism')
super().__set__(obj, mechs)
[docs]
class UniqueVar(BaseVar):
"""Variable the value of which must be unique."""
def __set__(self, obj: BaseConfig, value: str):
values = self.__class__.values
if value in values:
raise ValueError(f'{value}: already in use')
super().__set__(obj, value)
values.add(value)
values: ClassVar[set] = set()
SieveConfigT = TypeVar('SieveConfigT', bound='SieveConfig')
[docs]
class SieveConfig(BaseConfig):
"""Configuration for the SieveManager command-line client."""
[docs]
@classmethod
def fromfiles(cls: type[SieveConfigT], *fnames: str) -> SieveConfigT:
"""Create a new configuration from `fnames`.
Arguments:
fnames: Filenames (default: :data:`CONFIGFILES`)
Raises:
FileNotFoundError: A given file could not be found.
"""
obj = cls()
for fname in (fnames if fnames else CONFIGFILES):
try:
obj.loadfile(fname)
except FileNotFoundError:
if fnames:
raise
return obj
[docs]
def __init__(self, *args, **kwargs):
"""Create a new configuration.
Arguments:
args: Positional arguments used to initialise the back-end.
kwargs: Keyword arguments used as initial configuration values.
"""
super().__init__(*args)
for key, value in kwargs.items():
setattr(self, key, value)
[docs]
def getmanager(self, **variables) -> SieveManager:
"""Open a :class:`SieveManager` connection with this configuration.
Arguments:
variables: Configuration variables.
Raises:
ShellOperationError: Authentication failed.
netrc.NetrcParseError: :file:`.netrc` could not be parsed.
"""
conf = self | self.__class__(**variables)
mgr = SieveManager(backup=conf.backups, memory=conf.memory)
# Helper to obtain passwords and passphrases
def getpass_(passmgr: Optional[list[str]], prompt: str) -> str:
if passmgr and (pass_ := readoutput(*passmgr, logger=mgr.logger)):
return pass_
return askpass(prompt)
# Logging level
mgr.logger.setLevel(conf.verbosity)
# TLS
sslcontext = mgr.sslcontext
if (cadir := conf.cadir) or (cafile := conf.cafile):
sslcontext.load_verify_locations(cafile, cadir)
if cert := conf.cert:
def getpassphrase():
return getpass_(conf.getpassphrase, 'Certificate passphrase: ')
sslcontext.load_cert_chain(cert, conf.key, getpassphrase)
if conf.x509strict:
sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT
# Connect
mgr.open(conf.host, port=conf.port, timeout=conf.timeout,
tls=conf.tls, ocsp=conf.ocsp)
# Authenticate
logauth = conf.verbosity <= LogLevel.AUTH
if (sasl := [s for s in conf.saslmechs if ExternalAuth in s.__mro__]):
with suppress(SASLCapabilityError):
mgr.authenticate(conf.login, owner=conf.owner,
prepare=conf.saslprep, sasl=sasl,
logauth=logauth)
return mgr
if (sasl := [s for s in conf.saslmechs if BasePwdAuth in s.__mro__]):
password = (conf.password if conf.password else
getpass_(conf.getpassword, 'Password: '))
with suppress(SASLCapabilityError):
mgr.authenticate(conf.login, password, owner=conf.owner,
prepare=conf.saslprep, sasl=sasl,
logauth=logauth)
return mgr
raise ShellOperationError('SASL mechanisms exhausted')
[docs]
def getshell(self, manager: SieveManager, **variables):
"""Get a configured :class:`SieveShell` that wraps `manager`."""
conf = self | self.__class__(**variables)
return SieveShell(manager, clobber=conf.clobber, confirm=conf.confirm)
[docs]
def loadfile(self, fname: str):
"""Read configuration from `fname`.
Raises:
AppConfigError: Syntax error.
AppSecurityError: Permissions are insecure.
"""
super().loadfile(fname)
mode = os.stat(fname).st_mode
if mode & 0o22:
raise AppSecurityError(f'{fname}: is group- or world-writable')
if mode & 0o44:
for account in (self, *self._sections.values()):
if 'password' in account:
message = f'{fname}: is group- or world-readable'
raise AppSecurityError(message)
[docs]
def loadaccount(self, host: str = 'localhost',
login: Optional[str] = None):
"""Load the section for `login` on `host`."""
self |= self.__class__(host=host, login=login)
hosts = readnetrc(self.netrc)
# Host
for name, section in self.sections.items():
if section.alias == self.host:
try:
self.host = section['host']
except KeyError:
self.host = name.rsplit('@', maxsplit=1)[-1]
break
with suppress(KeyError):
self |= self.sections[self.host]
# Login
if not self.login:
try:
self.login = hosts[self.host][0]
except KeyError:
self.login = getpass.getuser()
with suppress(KeyError):
self |= self.sections[f'{self.login}@{self.host}']
# Password
if not self.password:
with suppress(KeyError):
self.password = hosts[self.host][2]
alias: UniqueVar = UniqueVar()
"""Alias for a host."""
backups = NumVar(cls=int, default=0, minval=0)
"""How many backups to keep."""
cadir = FilenameVar()
"""Custom CA directory."""
cafile = FilenameVar()
"""Custom CA file."""
clobber = BoolVar(default=True)
"""Overwrite files?"""
confirm = FlagVar(default=ShellCmd.ALL, cls=ShellCmd)
"""Which shell commands have to be confirmed?"""
cert = FilenameVar()
"""Client TLS certificate."""
getpassphrase = CmdVar()
"""Command that prints the passphrase for the TLS key."""
getpassword = CmdVar()
"""Command that prints a password."""
host = HostVar(default='localhost')
"""Host to connect to by default."""
key = FilenameVar()
"""Client TLS key."""
login = BaseVar()
"""User to login as (authentication ID)."""
memory = NumVar(default=524_288, minval=0)
"""How much memory to use for temporary data."""
netrc: Optional[str] = os.getenv('NETRC')
"""Filename of the .netrc file."""
ocsp = BoolVar(default=True)
"""Check whether server certificate was revoked?"""
owner = BaseVar(default='')
"""User whose scripts to manage (authorisation ID)."""
password = BaseVar()
"""Password to login with."""
port = NumVar(default=4190, minval=0, maxval=65535)
"""Port to connect to by default."""
saslmechs = SASLMechVar(default=BasePwdAuth.getmechs())
"""How to authenticate."""
saslprep = FlagVar(default=SASLPrep.ALL, cls=SASLPrep)
"""Which credentials to prepare."""
timeout = NumVar(default=socket.getdefaulttimeout(), cls=float, minval=0)
"""Network timeout."""
tls = BoolVar(default=True)
"""Use TLS?"""
verbosity = EnumVar(default=LogLevel.INFO, cls=LogLevel)
"""Logging level."""
x509strict = BoolVar(default=True)
"""Be strict when verifying TLS certificates?"""
_section = 'account'
#
# Terminal I/O
#
[docs]
class TermIO(io.TextIOWrapper):
"""I/O for the controlling terminal."""
[docs]
def __init__(self, *args, **kwargs):
"""Open the controlling terminal."""
super().__init__(io.FileIO('/dev/tty', 'r+'), *args, **kwargs)
#
# Logging
#
LogIOWrapperT = TypeVar('LogIOWrapperT', bound='LogIOWrapper')
"""Type variable for :class:`LogIOWrapper`."""
[docs]
class LogIOWrapper():
"""Logger for file-like objects."""
[docs]
@classmethod
def wrap(cls: type[LogIOWrapperT],
file: Union[BinaryIO, io.BufferedRWPair],
logger: logging.Logger = logging.getLogger(__name__),
level: int = logging.DEBUG,
formats: tuple[str, str] = ('S: %s', 'C: %s'),
encoding: str = 'utf8') \
-> Union[BinaryIO, io.BufferedRWPair, LogIOWrapperT]:
"""Wrap a file in a :class:`LogIOWrapper` if logging is enabled.
Takes the same arguments as :meth:`__init__`.
Returns:
The file or a :class:`LogIOWrapper` that wraps the file.
"""
if logger.isEnabledFor(level):
return cls(file, encoding, level, logger, formats)
return file
[docs]
def __init__(self, file: Union[BinaryIO, io.BufferedRWPair],
encoding: str = 'utf8', level: int = logging.DEBUG,
logger: logging.Logger = logging.getLogger(__name__),
formats: tuple[str, str] = ('S: %s', 'C: %s')):
"""Log I/O to `file`.
Arguments:
file: File-like object opened in binary mode.
encoding: `file`'s encoding.
level: Logging priority.
logger: Logger.
formats: Message formats; '%s' is replaced with I/O.
"""
splitlines = re.compile(rb'\r?\n').split
buffers = (bytearray(), bytearray())
def extv(buf: bytearray, vec: Iterable[Iterable[int]]) -> None:
for elem in vec:
buf.extend(elem)
def getdecorator(
buf: bytearray, fmt: str, arg=None,
ext: Callable[[bytearray, Iterable], None] = bytearray.extend
) -> Callable[[Callable[..., T]], Callable[..., T]]:
def decorator(func: Callable[..., T]) -> Callable[..., T]:
def wrapper(*args, **kwargs) -> T:
retval = func(*args, **kwargs)
if not self.quiet:
data = retval if arg is None else args[arg]
ext(buf, data) # type: ignore[reportArgumentType]
ptr: Union[bytes, bytearray] = buf
while True:
try:
line, ptr = splitlines(ptr, maxsplit=1)
except ValueError:
break
self.log(line, fmt)
buf[:] = ptr
return retval
return wrapper
return decorator
logread = getdecorator(buffers[0], formats[0])
logreadinto = getdecorator(buffers[0], formats[0], arg=0)
logreadv = getdecorator(buffers[0], formats[0], ext=extv)
logwrite = getdecorator(buffers[1], formats[1], arg=0)
logwritev = getdecorator(buffers[1], formats[1], arg=0, ext=extv)
self.read = logread(file.read)
self.readline = logread(file.readline)
self.readlines = logreadv(file.readlines)
self.write = logwrite(file.write)
self.writelines = logwritev(file.writelines)
if isinstance(file, io.RawIOBase):
self.readall = logread(file.readall)
self.readinto = logreadinto(file.readinto) # type: ignore
if isinstance(file, io.BufferedIOBase):
self.read1 = logread(file.read1)
self.readinto1 = logread(file.readinto1)
self.readinto = logreadinto(file.readinto)
self.buffers = buffers
self.encoding = encoding
self.formats = formats
self.file = file
self.level = level
self.logger = logger
def __del__(self):
file = self.file
if not file.closed:
file.flush()
file.close()
if not self.quiet:
for buf, fmt in zip(self.buffers, self.formats):
if buf:
self.log(buf, fmt)
def __getattr__(self, name):
return getattr(self.file, name)
def __iter__(self):
return self
def __next__(self):
if line := self.readline():
return line
raise StopIteration()
[docs]
def log(self, line: Union[bytearray, bytes], fmt: str):
"""Log `line` with `fmt`."""
decoded = escapectrl(line.rstrip(b'\r\n').decode(self.encoding))
self.logger.log(self.level, fmt, decoded)
buffers: tuple[bytearray, bytearray]
"""Logging buffers."""
encoding: str
""":attr:`file`'s encoding."""
file: Union[BinaryIO, io.BufferedRWPair]
"""Underlying file-like object."""
formats: tuple[str, str]
"""Logging formats."""
level: int
"""Logging level."""
logger: logging.Logger
"""Logger."""
quiet: bool = False
"""Log I/O?"""
read: Callable[..., bytes]
"""Read from :attr:`file`."""
readinto: Callable[..., int]
"""Read from :attr:`file` into a buffer."""
readline: Callable[..., bytes]
"""Read a line from :attr:`file`."""
readlines: Callable[..., list[bytes]]
"""Read all lines from :attr:`file`."""
write: Callable[..., int]
"""Write to :attr:`file`."""
writelines: Callable[..., None]
"""Write lines to :attr:`file`."""
#
# Signal handling
#
SignalHandlingFunc = Callable[[int, Union[types.FrameType, None]], Any]
"""Alias for signal handling functions."""
SignalHandler = Union[SignalHandlingFunc, int, None]
"""Alias for signal handlers."""
[docs]
@dataclass(frozen=True)
class SignalCaught(Exception):
"""Signal was caught."""
[docs]
@classmethod
def throw(cls, signo: int, frame: Optional[types.FrameType]):
"""Raise a :exc:`SignalCaught` exception."""
raise cls(signo, frame)
[docs]
@classmethod
def register(cls, signals: Iterable[int]) -> tuple[SignalHandler, ...]:
"""Register :meth:`throw` as handler for the given `signals`.
Arguments:
signals: Signals to register :meth:`throw` as handler for.
Returns:
Old signal handlers.
"""
return tuple(signal.signal(s, cls.throw) for s in signals)
[docs]
@classmethod
def catch(cls, *signals: int) -> \
Callable[[Callable[..., T]], Callable[..., T]]:
"""Decorator that :meth:`handles <handle>` `signals`.
If one of the given `signals` is caught, :exc:`SignalCaught` is raised.
If that exception is not caught, the process group is terminated,
the signal handler reset, and the signal re-raised.
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
# pylint: disable=inconsistent-return-statements
def wrapper(*args, **kwargs) -> T: # type: ignore[return]
handlers = cls.register(signals)
try:
return func(*args, **kwargs)
except cls as exc:
logging.critical(exc)
signal.signal(SIGTERM, SIG_IGN)
os.killpg(os.getpgrp(), SIGTERM)
signal.signal(exc.signo, SIG_DFL)
signal.raise_signal(exc.signo)
finally:
for signo, handler in zip(signals, handlers):
signal.signal(signo, handler)
# NOTREACHED
for name in dir(func):
with suppress(AttributeError, TypeError, ValueError):
setattr(wrapper, name, getattr(func, name))
return wrapper
return decorator
def __str__(self):
desc = signal.strsignal(self.signo)
return desc.split(':')[0] if desc else f'caught signal {self.signo}'
signo: int
"""Signal number."""
frame: Optional[types.FrameType] = None
"""Stack frame."""
#
# Errors
#
# Error types
[docs]
class Error(Exception):
"""Base class for errors."""
[docs]
class CapabilityError(Error):
"""Base class for capability errors."""
[docs]
class ConfigError(Error):
"""Base class for configuration errors."""
[docs]
class DataError(Error):
"""Base class for data errors."""
[docs]
class OperationError(Error):
"""Base class for operation errors."""
[docs]
class ProtocolError(Error):
"""Base class for protocol errors.
.. danger::
Continuing after a :exc:`ProtocolError` may cause undefined behaviour.
"""
[docs]
class SecurityError(Error):
"""Base class for security errors.
.. danger::
Continuing after a :exc:`SecurityError` compromises the connection.
"""
[docs]
class SoftwareError(Error):
"""Base class for software errors."""
[docs]
class UsageError(Error):
"""Base class for usage errors."""
# Client errors
[docs]
class AppError(Error):
"""Base class for application errors."""
[docs]
class AppConfigError(AppError, ConfigError):
"""Applicaiton configuration error."""
[docs]
class AppConnectionError(AppError, ConnectionError):
"""Client-side connection error."""
[docs]
class AppOperationError(AppError, OperationError):
"""Client-side operation error."""
[docs]
class AppSecurityError(AppError, SecurityError):
"""Client security error."""
[docs]
class AppSoftwareError(AppError, SoftwareError):
"""Client software error (aka a bug)."""
# DNS errors
[docs]
class DNSError(Error):
"""Base class for DNS errors."""
[docs]
class DNSDataError(Error):
"""DNS data error."""
[docs]
class DNSOperationError(DNSError, OperationError):
"""DNS operation error."""
[docs]
class DNSSoftwareError(DNSError, SoftwareError):
"""DNS software error."""
# HTTP errors
[docs]
class HTTPError(Error):
"""Base class for HTTP errors."""
[docs]
class HTTPOperationError(HTTPError, OperationError):
"""HTTP operation error."""
[docs]
class HTTPUsageError(HTTPError, ProtocolError):
"""HTTP usage error."""
# OCSP errors
[docs]
class OCSPError(Error):
"""Base class for OCSP errors."""
[docs]
class OCSPDataError(OCSPError, DataError):
"""OCSP data error."""
[docs]
class OCSPOperationError(OCSPError, OperationError):
"""OCSP operation error."""
# SASL errors
[docs]
class SASLError(Error):
"""Base class for SASL errors."""
[docs]
class SASLCapabilityError(Error):
"""SASL capability error."""
[docs]
class SASLProtocolError(SASLError, ProtocolError):
"""Server violated the SASL protocol."""
[docs]
class SASLSecurityError(SASLError, SecurityError):
"""SASL security error."""
# Shell errors
[docs]
class ShellError(Error):
"""Base class for shell errors."""
[docs]
class ShellOperationError(ShellError, OperationError):
"""Shell operation error."""
[docs]
class ShellUsageError(ShellError, UsageError):
"""Shell usage error."""
# ManageSieve errors
[docs]
class SieveError(Error):
"""Base class for ManageSieve errors."""
[docs]
class SieveCapabilityError(SieveError, CapabilityError):
"""Capability not supported by the server."""
[docs]
class SieveConnectionError(Response, SieveError, ConnectionError):
"""Server said "BYE"."""
# pylint: disable=redefined-outer-name
[docs]
def __init__(self, response: Atom = Atom('BYE'),
code: tuple[Word, ...] = (),
message: Optional[str] = None):
super().__init__(response=response, code=code, message=message)
[docs]
class SieveOperationError(Response, SieveError, OperationError):
"""Server said "NO"."""
# pylint: disable=redefined-outer-name
[docs]
def __init__(self, response: Atom = Atom('NO'),
code: tuple[Word, ...] = (),
message: Optional[str] = None):
super().__init__(response=response, code=code, message=message)
[docs]
class SieveProtocolError(SieveError, ProtocolError):
"""Server violated the ManageSieve protocol error."""
# TLS errors
[docs]
class TLSError(Error):
"""Base class for TLS errors."""
[docs]
class TLSCapabilityError(TLSError, CapabilityError):
"""TLS capability error."""
[docs]
class TLSSecurityError(TLSError, OperationError):
"""TLS security error."""
[docs]
class TLSSoftwareError(TLSError, SoftwareError):
"""TLS software error."""
#
# Helpers
#
[docs]
def askpass(prompt: str) -> str:
"""Prompt for a password on the controlling terminal."""
with TermIO() as tty:
return getpass.getpass(prompt, stream=tty)
[docs]
def backup(file: str, keep: int, getfiles: Callable[[], Iterable[str]],
copy: Callable[[str, str], Any], remove: Callable[[str], Any]):
"""Make an Emacs-style backup of `file`.
`keep` = 1
:file:`file` is backed up as :file:`file~`.
`keep` > 1
:file:`file` is backed up as :file:`file.~{n}~`, where
`n` starts with 1 and increments with each backup.
Arguments:
file: File to back up.
keep: How many copies to keep.
copy: Function that copies the file.
getfiles: Function that returns a list of files.
remove: Function that removes a file.
Raises:
ValueError: `keep` is < 0.
"""
if keep < 0:
raise ValueError('keep: must be >= 0')
if keep == 0:
return
if keep == 1:
copy(file, file + '~')
else:
backupexpr = re.escape(file) + r'\.~(\d+)~'
matchbackup = re.compile(backupexpr).fullmatch
backups = sorted((int(match.group(1)), file) for file in getfiles()
if (match := matchbackup(file)))
for _, bak in backups[:-(keep - 1)]:
remove(bak)
counter = backups[-1][0] + 1 if backups else 1
copy(file, f'{file}.~{counter}~')
[docs]
def bell():
"""Print a BEL to the controlling terminal, if there is one."""
try:
with TermIO() as tty:
print('\a', end='', file=tty)
except FileNotFoundError:
pass
[docs]
def certrevoked(cert, logger: logging.Logger = logging.getLogger(__name__)) \
-> bool:
"""Check if `cert` has been revoked.
Raises:
OCSPDataError: `cert` contains no authority information.
OCSPOperationError: no authoritative response.
.. seealso::
:rfc:`5019`
Lightweight OCSP Profile
:rfc:`6960`
Online Certificate Status Protocol (OCSP)
"""
try:
issuers, responders = getcertauthinfo(cert)
except ExtensionNotFound as err:
raise OCSPDataError('no authority information') from err
for caurl in issuers:
try:
der = httpget(caurl)
except (urllib.error.URLError, HTTPError) as err:
logger.error(err)
continue
ca = x509.load_der_x509_certificate(der)
builder = ocsp.OCSPRequestBuilder()
# SHA1 is mandated by RFC 5019.
req = builder.add_certificate(cert, ca, SHA1()).build() # nosec B303
# pylint: disable=redefined-outer-name
path = b64encode(req.public_bytes(Encoding.DER)).decode('ascii')
for responder in responders:
statusurl = urllib.parse.urljoin(responder, path)
try:
res = ocsp.load_der_ocsp_response(httpget(statusurl))
except HTTPError as err:
logger.error(err)
continue
if res.response_status == OCSPResponseStatus.SUCCESSFUL:
try:
now = datetime.datetime.now(tz=datetime.UTC) # novermin
end = res.next_update_utc # type: ignore
if now < res.this_update_utc: # type: ignore
continue
if end is not None and now >= end:
continue
except AttributeError:
now = datetime.datetime.now()
if now < res.this_update:
continue
if (end := res.next_update) is not None and now >= end:
continue
if res.certificate_status == OCSPCertStatus.REVOKED:
return True
if res.certificate_status == OCSPCertStatus.GOOD:
return False
raise OCSPOperationError('no authoritative OCSP response')
[docs]
def escapectrl(chars: str) -> str:
"""Escape control characters."""
categories = map(unicodedata.category, chars)
escaped = [fr'\u{ord(char):04x}' if cat.startswith('C') else char
for char, cat in zip(chars, categories)]
return ''.join(escaped)
[docs]
def httpget(url: str) -> bytes:
"""Download a file from `url` using HTTP.
Raises:
HTTPUsageError: `url` is not an HTTP URL.
HTTPOperationError: "GET" failed.
"""
while True:
if not url.startswith('http://'):
raise HTTPUsageError(f'{url}: not an HTTP URL')
with urllib.request.urlopen(url) as res: # nosec B310
if res.status == 200:
return res.read()
if res.status in (301, 302, 303, 307, 308):
if url := res.getheader('Location'):
continue
raise HTTPOperationError(f'GET {url}: {res.reason}')
# NOTREACHED
[docs]
def getcertauthinfo(cert) -> tuple[list[str], list[str]]:
"""Get information about the authority that issued `cert`.
Returns:
CA issuer URLs and OCSP responder base URLs.
"""
exts = cert.extensions.get_extension_for_class(AuthorityInformationAccess)
issuers = []
responders = []
for field in exts.value:
oid = field.access_method
if oid == AuthorityInformationAccessOID.CA_ISSUERS:
issuers.append(field.access_location.value)
elif oid == AuthorityInformationAccessOID.OCSP:
responders.append(field.access_location.value)
return issuers, responders
[docs]
def getfilesize(file: IO) -> int:
"""Get the size of file-like object relative to the current position."""
try:
pos = file.tell()
except io.UnsupportedOperation:
pos = 0
try:
size = os.fstat(file.fileno()).st_size - pos
except io.UnsupportedOperation:
size = file.seek(0, SEEK_END) - pos
file.seek(pos, SEEK_SET)
return size
[docs]
def isdnsname(name: str) -> bool:
"""Check whether `name` is a valid DNS name.
.. seealso::
:rfc:`1035` (sec. 2.3.1)
Domain names - Preferred name syntax
:rfc:`2181` (sec. 11)
Clarifications to the DNS Specification - Name syntax
"""
return (all(0 < len(x) <= 63 for x in name.removesuffix('.').split('.'))
and len(name) <= 253)
[docs]
def ishostname(name: str) -> bool:
"""Check whether `name` is a valid hostname.
.. seealso::
:rfc:`921`
Domain Name System Implementation Schedule
:rfc:`952`
Internet host table specification
:rfc:`1123` (sec. 2.1)
Host Names and Numbers
"""
return (bool(re.fullmatch(r'((?!-)[a-z0-9-]+(?<!-)\.?)+', name, re.I))
and isdnsname(name))
[docs]
def isinetaddr(addr: str) -> bool:
"""Check whether `addr` is an internet address."""
try:
ipaddress.ip_address(addr)
except ValueError:
return False
return True
[docs]
def nwise(iterable: Iterable, n: Any) -> Iterator[tuple]:
"""Iterate over n-tuples."""
iterator = iter(iterable)
ntuple = deque(itertools.islice(iterator, n - 1), maxlen=n)
for x in iterator:
ntuple.append(x)
yield tuple(ntuple)
[docs]
def randomise(elems: Sequence[T], weights: Iterable[int]) -> list[T]:
"""Randomise the order of `elems`."""
nelems = len(elems)
weights = list(weights)
indices = list(range(len(elems)))
randomised = []
for _ in range(nelems):
i, = random.choices(indices, weights, k=1) # noqa DUO102 # nosec B311
randomised.append(elems[i])
weights[i] = 0
return randomised
[docs]
def readdir(dirname: str, predicate: Optional[Callable[[str], bool]] = None) \
-> Iterator[str]:
"""Get every filename in `dirname` that matches `predicate`."""
for dirent in os.listdir(dirname if dirname else '.'):
fname = path.join(dirname, dirent)
if predicate is None or predicate(fname):
yield fname
[docs]
def readnetrc(fname: Optional[str]) -> dict[str, tuple[str, str, str]]:
"""Read a .netrc file.
Arguments:
fname: Filename (default: :file:`~/.netrc`)
Returns:
Mapping from hosts to login-account-password 3-tuples.
Raises:
FileNotFoundError: `fname` was given but not found.
netrc.NetrcParseError: Syntax error.
"""
try:
if fname:
return netrc.netrc(fname).hosts
with suppress(FileNotFoundError):
return netrc.netrc().hosts
except netrc.NetrcParseError as err:
if sys.version_info < (3, 10):
logging.error(err)
else:
raise
return {}
[docs]
def readoutput(*command: str, encoding: str = ENCODING,
logger: logging.Logger = logging.getLogger(__name__)) -> str:
"""Decode and return the output of `command`.
Returns:
Decoded output or, if `command` exited with a non-zero status,
the empty string.
Raises:
subprocess.CalledProcessError: `command` exited with a status >= 127.
"""
logger.debug('exec: %s', ' '.join(command))
try:
cp = subprocess.run(command, capture_output=True, check=True)
except subprocess.CalledProcessError as err:
if err.returncode >= 127:
raise
logger.debug('%s exited with status %d', command[0], err.returncode)
return ''
return cp.stdout.rstrip().decode(encoding)
# pylint: disable=redundant-returns-doc
[docs]
def resolvesrv(host: str) -> Iterator[SRV]:
"""Resolve a DNS SRV record.
Arguments:
host: Hostname (e.g., :samp:`_sieve._tcp.imap.foo.example`)
Returns:
An iterator over `SRV` records sorted by their priority
and randomised according to their weight.
Raises:
DNSDataError: `host` is not a valid DNS name.
DNSOperationError: Lookup error.
DNSSoftwareError: dnspython_ is not available.
.. note::
Requires dnspython_.
"""
if not HAVE_DNSPYTHON:
raise DNSSoftwareError('dnspython unavailable')
if not isdnsname(host):
raise DNSDataError('hostname or label is too long')
try:
answer = dns.resolver.resolve(host, 'SRV')
hosts: dict[str, list] = defaultdict(list)
byprio: dict[int, list[SRV]] = defaultdict(list)
for rec in answer.response.additional:
name = rec.name.to_text()
addrs = [i.address for i in rec.items
if i.rdtype == dns.rdatatype.A]
hosts[name].extend(addrs)
for rec in answer: # type: ignore
name = rec.target.to_text() # type: ignore
priority = rec.priority # type: ignore
weight = rec.weight # type: ignore
port = rec.port # type: ignore
if addrs := hosts.get(name): # type: ignore
srvs = [SRV(priority, weight, a, port) for a in addrs]
byprio[priority].extend(srvs)
else:
srv = SRV(priority, weight, name.rstrip('.'), port)
byprio[priority].append(srv)
for prio in sorted(byprio.keys()):
srvs = byprio[prio]
weights = [s.weight for s in srvs]
if sum(weights) > 0:
yield from randomise(srvs, weights)
else:
yield from srvs
except DNSException as err:
raise DNSOperationError(str(err)) from err
[docs]
def yamlescape(data: str):
"""Strip and quote `data` for use as YAML scalar if needed."""
indicators = ('-', '?', ':', ',', '[', ']', '{', '}', '#', '&',
'*', '!', '|', '>', "'", '"', '%', '@', '`')
data = data.strip()
if not data:
return '""'
if data.casefold() == 'no':
return '"' + data + '"'
if data[0] in indicators or ': ' in data or ' #' in data:
return '"' + data.replace('\\', '\\\\').replace('"', '\\\"') + '"'
return data
#
# Main
#
# pylint: disable=too-many-branches, too-many-statements
[docs]
@SignalCaught.catch(SIGHUP, SIGINT, SIGTERM)
def main() -> NoReturn:
"""sievemgr - manage remote Sieve scripts
Usage: sievemgr [server] [command] [argument ...]
sievemgr -e expression [...] [server]
sievemgr -s file [server]
Options:
-C Do not overwrite existing files.
-c file Read configuration from file.
-d Enable debugging mode.
-e expression Execute expression on the server.
-f Overwrite and remove files without confirmation.
-i Confirm removing or overwriting files.
-o key=value Set configuration key to value.
-q Be quieter.
-s file Execute expressions read from file.
-v Be more verbose.
-e, -o, -q, and -v can be given multiple times.
See sievemgr(1) for the complete list.
Report bugs to: <https://github.com/odkr/sievemgr/issues>
Home page: <https://odkr.codeberg.page/sievemgr>
"""
progname = os.path.basename(sys.argv[0])
logging.basicConfig(format=f'{progname}: %(message)s')
# Options
try:
opts, args = getopt(sys.argv[1:], 'CN:Vc:de:fhio:qs:v',
['help', 'version'])
except GetoptError as err:
error('%s', err, status=2)
optconf = SieveConfig()
debug = False
exprs: list[str] = []
configfiles: list[str] = []
script: Optional[TextIO] = None
volume = 0
for opt, arg in opts:
try:
if opt in ('-h', '--help'):
showhelp(main)
elif opt in ('-V', '--version'):
showversion()
elif opt == '-C':
optconf.clobber = False
elif opt == '-N':
optconf.netrc = arg
elif opt == '-c':
configfiles.append(arg)
elif opt == '-d':
optconf.verbosity = LogLevel.DEBUG
elif opt == '-e':
exprs.append(arg)
elif opt == '-f':
optconf.confirm = ShellCmd.NONE
elif opt == '-i':
optconf.confirm = ShellCmd.ALL
elif opt == '-o':
optconf.parse(arg)
elif opt == '-q':
volume -= 1
elif opt == '-s':
# pylint: disable=consider-using-with
script = open(arg)
elif opt == '-v':
volume += 1
except (AttributeError, TypeError, ValueError) as err:
error('option %s: %s', opt, err, status=2)
# Arguments
url = URL.fromstr(args.pop(0)) if args else None
command = args.pop(0) if args else ''
if script:
if command:
error('-s cannot be used together with a command', status=2)
if exprs:
error('-e and -s cannot be combined', status=2)
# Configuration
try:
conf = SieveConfig().fromfiles(*configfiles) | optconf
except FileNotFoundError as err:
error('%s: %s', err.filename, os.strerror(err.errno))
except (ConfigError, SecurityError) as err:
error('%s', err)
conf.loadaccount(host=url.hostname if url else conf.host,
login=url.username if url else None)
conf |= optconf
if url:
if url.username:
conf.login = url.username
if url.password:
conf.password = url.password
if url.owner:
conf.owner = url.owner
# Logging
conf.verbosity = conf.verbosity.fromdelta(volume)
logger = logging.getLogger()
logger.setLevel(conf.verbosity)
# Infos
for line in ABOUT.strip().splitlines():
logging.info('%s', line)
# Shell
try:
with conf.getmanager() as mgr:
shell = conf.getshell(mgr)
if exprs:
for expr in exprs:
shell.executeline(expr)
elif script:
shell.executescript(script)
elif command:
shell.execute(command, *args)
else:
shell.enter()
except (socket.herror, socket.gaierror, ConnectionError) as err:
error('%s', err.args[1])
except (MemoryError, ssl.SSLError) as err:
error('%s', err)
except OSError as err:
error('%s', os.strerror(err.errno) if err.errno else err)
except subprocess.CalledProcessError as err:
error('%s exited with status %d', err.cmd[0], err.returncode)
except Error as err:
if debug:
raise
error('%s', err)
sys.exit(shell.retval)
[docs]
def error(*args, status: int = 1, **kwargs) -> NoReturn:
"""Log an err and :func:`exit <sys.exit>` with `status`.
Arguments:
args: Positional arguments for :func:`logging.error`.
status: Exit status.
kwargs: Keyword arguments for :func:`logging.error`.
"""
logging.error(*args, **kwargs)
sys.exit(status)
[docs]
def showhelp(func: Callable) -> NoReturn:
"""Print the docstring of `func` and :func:`exit <sys.exit>`."""
assert func.__doc__
lines = func.__doc__.splitlines()
indented = re.compile(r'\s+').match
prefix = os.path.commonprefix(list(filter(indented, lines)))
for line in lines[:-1]:
print(line.removeprefix(prefix))
sys.exit()
[docs]
def showversion() -> NoReturn:
"""Print :attr:`ABOUT` and :func:`exit <sys.exit>`."""
print(ABOUT.strip())
sys.exit()
#
# Boilerplate
#
logging.getLogger(__name__).addHandler(logging.NullHandler())
if __name__ == '__main__':
main()