Coverage for sievemgr.py: 44%
2693 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 00:39 +0100
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-12 00:39 +0100
1#!/usr/bin/env python3
2"""Client for managing Sieve scripts remotely using the ManageSieve protocol"""
4#
5# Copyright 2023 and 2024 Odin Kroeger
6#
7# This file is part of SieveManager.
8#
9# SieveManager is free software: you can redistribute it and/or
10# modify it under the terms of the GNU General Public License as
11# published by the Free Software Foundation, either version 3 of
12# the License, or (at your option) any later version.
13#
14# SieveManager is distributed in the hope that it will be useful,
15# but WITHOUT ALL WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with SieveManager. If not, see <https://www.gnu.org/licenses/>.
21#
24#
25# Modules
26#
28from __future__ import annotations
29from abc import ABC, abstractmethod
30from base64 import b64decode, b64encode
31from collections import UserDict, defaultdict, deque
32from collections.abc import Iterable
33from contextlib import AbstractContextManager, suppress
34from dataclasses import dataclass
35from errno import (ECONNABORTED, EEXIST, EINPROGRESS, EISCONN,
36 ENOENT, ENOTCONN, ETIMEDOUT)
37from fcntl import LOCK_SH, LOCK_NB
38from getopt import GetoptError, getopt
39from inspect import Parameter
40from os import O_CREAT, O_EXCL, O_WRONLY, O_TRUNC, SEEK_END, SEEK_SET
41from os import path
42from signal import SIG_DFL, SIG_IGN, SIGHUP, SIGINT, SIGTERM
43from tempfile import SpooledTemporaryFile, TemporaryDirectory
44from typing import (Any, BinaryIO, Callable, ClassVar, Final, IO, Iterator,
45 NoReturn, Optional, Union, Sequence, TextIO, TypeVar)
47import code
48import dataclasses
49import datetime
50import enum
51import fcntl
52import fnmatch
53import getpass
54import hashlib
55import hmac
56import inspect
57import io
58import ipaddress
59import itertools
60import locale
61import logging
62import math
63import netrc
64import os
65import pwd
66import random
67import re
68import readline
69import rlcompleter
70import secrets
71import select
72import shlex
73import shutil
74import signal
75import socket
76import subprocess # nosec B404
77import ssl
78import string
79import stringprep
80import sys
81import threading
82import types
83import unicodedata
84import urllib.error
85import urllib.parse
86import urllib.request
87import warnings
89try:
90 from cryptography import x509
91 from cryptography.hazmat.primitives.hashes import SHA1
92 from cryptography.hazmat.primitives.serialization import Encoding
93 from cryptography.x509 import (AuthorityInformationAccess,
94 ExtensionNotFound, ocsp)
95 from cryptography.x509.oid import AuthorityInformationAccessOID
96 from cryptography.x509.ocsp import OCSPResponseStatus, OCSPCertStatus
97 from cryptography.utils import CryptographyDeprecationWarning
99 HAVE_CRYPTOGRAPHY: Final = True # type: ignore
100except ImportError:
101 HAVE_CRYPTOGRAPHY: Final = False # type: ignore
103try:
104 import dns.rdatatype
105 import dns.resolver
106 from dns.exception import DNSException
108 HAVE_DNSPYTHON = True # type: ignore
109except ImportError:
110 HAVE_DNSPYTHON = False # type: ignore
112if sys.version_info < (3, 9):
113 sys.exit('SieveManager requires Python 3.9 or later')
116#
117# Metadata
118#
120__version__ = '0.7.4.7+devel'
121__author__ = 'Odin Kroeger'
122__copyright__ = '2023 and 2024 Odin Kroeger'
123__license__ = 'GPLv3+'
124__all__ = [
125 # ABCs
126 'BaseAuth',
127 'BaseSASLAdapter',
129 # ManageSieve
130 'SieveManager',
131 'Atom',
132 'Line',
133 'Word',
134 'Capabilities',
135 'Response',
136 'URL',
138 # SASL
139 'BaseAuth',
140 'BasePwdAuth',
141 'BaseScramAuth',
142 'BaseScramPlusAuth',
143 'CramMD5Auth',
144 'ExternalAuth',
145 'LoginAuth',
146 'PlainAuth',
147 'ExternalAuth',
148 'ScramSHA1Auth',
149 'ScramSHA1PlusAuth',
150 'ScramSHA224Auth',
151 'ScramSHA224PlusAuth',
152 'ScramSHA256Auth',
153 'ScramSHA256PlusAuth',
154 'ScramSHA384Auth',
155 'ScramSHA384PlusAuth',
156 'ScramSHA512Auth',
157 'ScramSHA512PlusAuth',
158 'ScramSHA3_512Auth',
159 'ScramSHA3_512PlusAuth',
160 'SASLPrep',
162 # Errors
163 'Error',
164 'ProtocolError',
165 'SecurityError',
166 'CapabilityError',
167 'ConfigError',
168 'DataError',
169 'OperationError',
170 'UsageError',
171 'ClientError',
172 'ClientConfigError',
173 'ClientConnectionError',
174 'ClientOperationError',
175 'ClientSecurityError',
176 'OCSPError',
177 'OCSPDataError',
178 'OCSPOperationError',
179 'SASLError',
180 'SASLCapabilityError',
181 'SASLProtocolError',
182 'SASLSecurityError',
183 'SieveError',
184 'SieveCapabilityError',
185 'SieveConnectionError',
186 'SieveOperationError',
187 'SieveProtocolError',
188 'TLSError',
189 'TLSCapabilityError',
190 'TLSSecurityError'
191]
194#
195# Globals
196#
198_ABOUT: Final[str] = f'SieveManager {__version__}\nCopyright {__copyright__}'
199"""About message."""
201DEBUG: bool = False
202"""Print stack traces even for expected error types?"""
204EDITOR: list[str] = shlex.split(os.getenv('EDITOR', 'ed'), posix=True)
205""":envvar:`EDITOR` or :command:`ed` if :envvar:`EDITOR` is unset."""
207ENCODING: str = locale.getpreferredencoding(do_setlocale=False)
208"""Encoding."""
210HOME: str = os.getenv('HOME', pwd.getpwuid(os.getuid()).pw_dir)
211"""Home directory."""
213PAGER: list[str] = shlex.split(os.getenv('PAGER', 'more'), posix=True)
214""":envvar:`PAGER` or :command:`more` if :envvar:`PAGER` is unset."""
216VISUAL: list[str] = shlex.split(os.getenv('VISUAL', 'vi'), posix=True)
217""":envvar:`VISUAL` or :command:`vi` if :envvar:`VISUAL` is unset."""
219XDG_CONFIG_HOME: Final[str] = os.getenv('XDG_CONFIG_HOME', f'{HOME}/.config')
220"""X Desktop group base configuration directory."""
222CONFIGFILES: Final[tuple[str, ...]] = (
223 '/etc/sieve/config',
224 '/etc/sieve.cf',
225 f'{XDG_CONFIG_HOME}/sieve/config',
226 f'{HOME}/.sieve/config',
227 f'{HOME}/.sieve.cf'
228)
229"""Default configuration files."""
232#
233# Types
234#
236class Atom(str):
237 """ManageSieve keyword (e.g., ``LISTSCRIPTS``, ``OK``)."""
239 # pylint: disable=eq-without-hash
240 def __eq__(self, other) -> bool:
241 return self.casefold() == other.casefold()
243 def __ne__(self, other) -> bool:
244 return self.casefold() != other.casefold()
247AuthMech = type['BaseAuth']
248"""Alias for subclasses of :class:`BaseAuth`."""
251class AuthState(enum.IntEnum):
252 """State of the authentication process."""
254 PREAUTH = enum.auto()
255 """"AUTHENTICATE" has *not* been issued."""
257 SENT = enum.auto()
258 """Data sent, ready to receive."""
260 RECEIVED = enum.auto()
261 """Data received, ready to send."""
263 DONE = enum.auto()
264 """Authentication concluded."""
267class ConfirmEnum(enum.IntEnum):
268 """Answers that :meth:`BaseShell.confirm` may return."""
270 NO = 0
271 YES = 1
272 ALL = 2
273 NONE = 3
275 def __bool__(self) -> bool:
276 return self in (self.YES, self.ALL)
279Line = list['Word']
280""":class:`List <list>` of :class:`Word`-s."""
283class LogLevel(enum.IntEnum):
284 """Logging levels supported by :class:`SieveConfig`."""
286 AUTH = logging.DEBUG // 2
287 DEBUG = logging.DEBUG
288 INFO = logging.INFO
289 WARNING = logging.WARNING
290 ERROR = logging.ERROR
292 def fromdelta(self, delta: int) -> 'LogLevel':
293 """Get a :class:`LogLevel` from a `delta`.
295 For example:
297 >>> LogLevel.INFO.fromdelta(-1)
298 <LogLevel.WARNING: 30>
299 >>> LogLevel.INFO.fromdelta(0)
300 <LogLevel.INFO: 20>
301 >>> LogLevel.INFO.fromdelta(1)
302 <LogLevel.DEBUG: 10>
303 >>> # Out-of-bounds deltas do not raise an error
304 >>> LogLevel.INFO.fromdelta(-math.inf)
305 <LogLevel.ERROR: 40>
306 >>> LogLevel.INFO.fromdelta(math.inf)
307 <LogLevel.AUTH: 5>
308 """
309 levels = list(self.__class__)
310 index = levels.index(self) - delta
311 return levels[min(max(index, 0), len(levels) - 1)]
314class SASLPrep(enum.IntEnum):
315 """Controls which strings are prepared for authentication.
317 .. seealso::
318 :rfc:`3454`
319 Preparation of Internationalized Strings
320 :rfc:`4013`
321 Stringprep Profile for User Names and Passwords
322 :rfc:`4422` (sec. 4)
323 SASL protocol requirements
324 """
326 NONE = 0
327 USERNAMES = 1
328 PASSWORDS = 2
329 ALL = 3
332class ShellCmd(enum.IntEnum):
333 """Shell actions that may overwrite or remove files."""
335 NONE = 0
336 CP = 1
337 GET = 2
338 MV = 4
339 PUT = 8
340 RM = 16
341 ALL = 31
344class ShellPattern(str):
345 """:class:`BaseShell` pattern."""
347 def expand(self, tokens: Iterable[str]) -> Iterator[str]:
348 """Yield all matching `tokens`."""
349 for token in tokens:
350 if not token.startswith('.') and fnmatch.fnmatchcase(token, self):
351 yield token
354ShellWord = Union[ShellPattern, str]
355"""Alias for :class:`ShellPattern` and `str`."""
358Word = Union[Atom, None, int, str, Line]
359"""Alias for :class:`Atom`, ``None``, ``int``, ``str``, and :class:`Line`."""
362T = TypeVar('T')
363"""Type variable."""
366#
367# Abstract base classes
368#
370BaseAuthT = TypeVar('BaseAuthT', bound='BaseAuth')
371"""Type variable for :class:`BaseAuth`."""
374class BaseAuth(ABC):
375 """Base class for authentication mechanisms.
377 The ManageSieve "AUTHENTICATE" command performs a `Simple Authentication
378 and Security Layer`_ (SASL) protocol exchange. SASL is a framework and
379 comprises different authentication mechanisms ("SASL mechanisms").
381 :meth:`SieveManager.authenticate` does *not* implement any such
382 mechanism, but delegates the SASL protocol exchange to classes that do.
383 Such classes must subclass :class:`BaseAuth` *and* have a :attr:`name`
384 attribute that indicates the mechanism they implement.
386 :class:`BaseAuth` provides methods to prepare strings according
387 to :rfc:`3454` and :rfc:`4013` and a layer over :class:`BaseSASLAdapter`
388 objects that calls :meth:`BaseSASLAdapter.begin` and
389 :meth:`BaseSASLAdapter.end` transparently.
391 Credentials must be prepared in :meth:`__init__`. Subclasses
392 should pass `connection`, `authcid`, `authzid`, and `prepare` to
393 :code:`super().__init__` and use :meth:`prepare` to prepare the
394 remaining credentials. For example:
396 .. literalinclude:: ../sievemgr.py
397 :pyobject: BasePwdAuth.__init__
398 :dedent: 4
400 The SASL exchange must be implemented in :meth:`exchange`.
401 Subclasses should use :meth:`send` and :meth:`receive` to
402 exchange SASL messages. For example:
404 .. literalinclude:: ../sievemgr.py
405 :pyobject: PlainAuth.exchange
406 :dedent: 4
407 """
409 @staticmethod
410 # pylint: disable=redefined-outer-name (string)
411 def prepare(string: str) -> str:
412 """Prepare `string` according to :rfc:`3454` and :rfc:`4013`.
414 Returns:
415 Prepared `string`.
417 Raises:
418 ValueError: `String` is malformed.
419 """
420 if any(rlcat := list(map(stringprep.in_table_d1, string))):
421 if not (rlcat[0] and rlcat[-1]):
422 raise ValueError(f'{string}: Malformed RandLCat string')
423 if any(map(stringprep.in_table_d2, string)):
424 raise ValueError(f'{string}: Mixes RandLCat and LCat')
425 prep = ''
426 for i, char in enumerate(string, start=1):
427 if stringprep.in_table_b1(char):
428 pass
429 elif stringprep.in_table_c12(char):
430 prep += ' '
431 elif stringprep.in_table_c21_c22(char):
432 raise ValueError(f'{string}:{i}: Control character')
433 elif stringprep.in_table_c3(char):
434 raise ValueError(f'{string}:{i}: Private use character')
435 elif stringprep.in_table_c4(char):
436 raise ValueError(f'{string}:{i}: Non-character code point')
437 elif stringprep.in_table_c5(char):
438 raise ValueError(f'{string}:{i}: Surrogate code point')
439 elif stringprep.in_table_c6(char):
440 raise ValueError(f'{string}:{i}: Not plain text')
441 elif stringprep.in_table_c7(char):
442 raise ValueError(f'{string}:{i}: Not canonical')
443 elif stringprep.in_table_c8(char):
444 raise ValueError(f'{string}:{i}: Changes display/deprecated')
445 elif stringprep.in_table_c9(char):
446 raise ValueError(f'{string}:{i}: Tagging character')
447 elif stringprep.in_table_a1(char):
448 raise ValueError(f'{string}:{i}: Unassigned code point')
449 else:
450 prep += char
451 return unicodedata.ucd_3_2_0.normalize('NFKC', prep)
453 def __init__(self, adapter: BaseSASLAdapter,
454 authcid: str, authzid: str = '',
455 prepare: SASLPrep = SASLPrep.ALL):
456 """Prepare authentication.
458 `authcid` and `authzid` are prepared according to :rfc:`3454` and
459 :rfc:`4013` if ``prepare & SASLPrep.USERNAMES`` evaluates to true.
461 Arguments:
462 conn: Connection over which to authenticate.
463 authcid: Authentication ID (user to login as).
464 authzid: Authorization ID (user whose rights to acquire).
465 prepare: Which credentials to prepare.
467 Raises:
468 ValueError: Bad characters in username.
469 """
470 prepare &= SASLPrep.USERNAMES # type: ignore[assignment]
471 self.adapter = adapter
472 self.authcid = self.prepare(authcid) if prepare else authcid
473 self.authzid = self.prepare(authzid) if prepare else authzid
475 # pylint: disable=useless-return
476 def __call__(self) -> Optional[Any]:
477 """Authenticate as :attr:`authcid`.
479 :attr:`authcid` is authorized as :attr:`authzid`
480 if :attr:`authzid` is set (proxy authentication).
482 Returns:
483 Data returned by the server, if any.
485 Raises:
486 ConnectionError: Server has closed the connection.
487 OperationError: Authentication failed.
488 SASLCapabilityError: Some feature is not supported.
489 SASLProtocolError: Server violated the SASL protocol.
490 SASLSecurityError: Server verification failed.
491 TLSCapabilityError: Channel-binding is not supported.
493 .. note::
494 Calls :meth:`exchange` and :meth:`end`.
495 """
496 self.exchange()
497 if self.state == AuthState.RECEIVED:
498 self.send(b'')
499 self.end()
500 return None
502 def abort(self):
503 """Abort authentication.
505 Raises:
506 ProtocolError: Protocol violation.
507 """
508 self.adapter.abort()
509 self.state = AuthState.DONE
511 def begin(self, data: Optional[bytes] = None):
512 """Begin authentication.
514 Arguments:
515 data: Optional client-first message.
517 Raises:
518 ConnectionError: Connection was closed.
519 ProtocolError: Protocol violation.
520 """
521 if self.state == AuthState.PREAUTH:
522 self.adapter.begin(self.name, data)
523 self.state = AuthState.SENT
524 else:
525 raise SASLProtocolError(f'SASL state {self.state}: Unexpected')
527 def end(self):
528 """Conclude authentication.
530 Raises:
531 ConnectionError: Connection was closed.
532 OperationError: Authentication failed.
533 ProtocolError: Protocol violation.
534 """
535 if self.state == AuthState.SENT:
536 self.adapter.end()
537 else:
538 raise SASLProtocolError(f'SASL state {self.state}: Unexpected')
540 @abstractmethod
541 def exchange(self):
542 """Exchange SASL messages."""
544 def send(self, data: bytes):
545 """Encode and send an SASL message.
547 Raises:
548 ConnectionError: Connection was closed.
549 ProtocolError: Protocol violation.
551 .. note::
552 Calls :meth:`begin` if needed.
553 """
554 if self.state == AuthState.PREAUTH:
555 self.begin(data)
556 elif self.state == AuthState.RECEIVED:
557 self.adapter.send(data)
558 self.state = AuthState.SENT
559 else:
560 raise SASLProtocolError(f'SASL state {self.state}: Unexpected')
562 @classmethod
563 def getmechs(cls: type[BaseAuthT], sort: bool = True,
564 obsolete: bool = False) -> list[type[BaseAuthT]]:
565 """Get authentication classes that subclass this class.
567 Arguments:
568 sort: Sort mechanisms by :attr:`order`?
569 obsolete: Return obsolete mechanisms?
570 """
571 mechs: list[type[BaseAuthT]] = []
572 for subcls in cls.__subclasses__():
573 # pylint: disable=bad-indentation
574 if (not subcls.__abstractmethods__
575 and (obsolete or not subcls.obsolete)):
576 mechs.append(subcls)
577 mechs.extend(subcls.getmechs(sort=False, obsolete=obsolete))
578 if sort:
579 mechs.sort(key=lambda s: s.order)
580 return mechs
582 # pylint: disable=missing-raises-doc
583 def receive(self) -> bytes:
584 """Receive and decode an SASL message.
586 Raises:
587 ConnectionError: Connection was closed.
588 OperationError: Authentication failed.
589 ProtocolError: Protocol violation.
591 .. note::
592 Calls :meth:`begin` if needed.
593 """
594 if self.state == AuthState.PREAUTH:
595 self.begin()
596 if self.state == AuthState.SENT:
597 try:
598 data = self.adapter.receive()
599 self.state = AuthState.RECEIVED
600 except SieveOperationError as err:
601 if err.response != 'OK':
602 raise
603 if not err.matches('SASL'):
604 # pylint: disable=raise-missing-from
605 raise SASLProtocolError('Expected data')
606 try:
607 word = err.code[1]
608 except ValueError as valuerr:
609 raise SASLProtocolError('Unexpected data') from valuerr
610 if isinstance(word, Atom) or not isinstance(word, str):
611 # pylint: disable=raise-missing-from
612 raise SASLProtocolError('Expected string')
613 data = b64decode(word)
614 self.state = AuthState.DONE
615 return data
616 raise SASLProtocolError(f'SASL state {self.state}: Unexpected')
618 @property
619 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
620 """Underlying socket."""
621 assert self.adapter
622 return self.adapter.sock
624 @sock.setter
625 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
626 assert self.adapter
627 self.adapter.sock = sock
629 adapter: BaseSASLAdapter
630 """Underlying SASL adapter."""
632 authcid: str
633 """Authentication ID (user to login as)."""
635 authzid: str = ''
636 """Authorization ID (user whose rights to acquire)."""
638 name: ClassVar[str]
639 """Mechanism name."""
641 obsolete: bool = False
642 """Is this mechanism obsolete?"""
644 order: int = 0
645 """Mechanism precedence."""
647 state: AuthState = AuthState.PREAUTH
648 """Current authentication state."""
651class BaseSASLAdapter(ABC):
652 """Abstract base class for sending and receiving SASL messages.
654 SASL messages must be translated to the underlying protocol.
655 This class defines the types of messages that may occur in an
656 SASL protocol exchange. Classes that translate between SASL
657 and the underlying protocol must subclass this class.
659 .. seealso::
660 :class:`BaseAuth`
661 Abstract base class for SASL mechanisms.
662 :rfc:`4422`
663 Simple Authentication and Security Layer (SASL)
664 """
666 @abstractmethod
667 def abort(self):
668 """Abort authentication.
670 Raises:
671 ProtocolError: Protocol violation.
672 """
674 @abstractmethod
675 def begin(self, name: str, data: Optional[bytes] = None):
676 """Begin authentication.
678 Arguments:
679 name: SASL mechanism name.
680 data: Optional client-first message.
682 Raises:
683 ConnectionError: Connection was closed.
684 ProtocolError: Protocol violation.
685 """
687 @abstractmethod
688 def end(self):
689 """Conclude authentication.
691 Raises:
692 ConnectionError: Connection was closed.
693 OperationError: Authentication failed.
694 ProtocolError: Protocol violation.
695 """
697 @abstractmethod
698 def send(self, data: bytes):
699 """Encode and send an SASL message.
701 Raises:
702 ConnectionError: Connection was closed.
703 ProtocolError: Protocol violation.
704 """
706 @abstractmethod
707 def receive(self) -> bytes:
708 """Receive and decode an SASL message.
710 Raises:
711 ConnectionError: Connection was closed.
712 OperationError: Authentication failed.
713 ProtocolError: Protocol violation.
714 """
716 @property
717 @abstractmethod
718 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
719 """Underlying socket."""
721 @sock.setter
722 @abstractmethod
723 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
724 pass
727#
728# ACAP
729#
731class BaseACAPConn(ABC):
732 """Base class for ACAP parsers/serializers.
734 The ManageSieve protocol uses the syntax and data types of the
735 Application Control Access Protocol (ACAP). This class provides
736 a :meth:`parser <receiveline>` for converting ACAP lines into
737 Python objects and a :meth:`serializer <sendline>` for converting
738 Python objects into ACAP lines.
740 .. seealso::
741 :rfc:`2244` (secs. 2.2 and 2.6)
742 ACAP commands, responses, and data formats.
743 :rfc:`5804` (secs. 1.2 and 4)
744 ManageSieve syntax.
745 """
747 # pylint: disable=missing-raises-doc
748 def receiveline(self) -> Line:
749 """Receive a line and parse it.
751 ================== =================
752 ACAP type Python type
753 ================== =================
754 Atom :class:`Atom`
755 Literal :class:`str`
756 Nil ``None``
757 Number :class:`int`
758 Parenthesised List :class:`list`
759 String :class:`str`
760 ================== =================
762 For example:
764 >>> mgr.sendline(Atom('listscripts'))
765 >>> mgr.receiveline()
766 ['foo.sieve', 'ACTIVE']
767 >>> mgr.receiveline()
768 ['bar.sieve']
769 >>> mgr.receiveline()
770 ['baz.sieve']
771 >>> mgr.receiveline()
772 ['OK', 'Listscripts completed.']
774 Raises:
775 ValueError: Line is malformed.
776 """
777 assert self.file
778 words: Line = []
779 stack: list[Line] = []
780 ptr: Line = words
781 while line := self.file.readline().decode('utf8'):
782 size: int = -1
783 for token in self._lex(line):
784 key = token.lastgroup
785 value = token.group(key) # type: ignore[arg-type]
786 if key == 'leftparen':
787 parens: list[Word] = []
788 stack.append(ptr)
789 ptr.append(parens)
790 ptr = parens
791 elif key == 'rightparen':
792 try:
793 ptr = stack.pop()
794 except IndexError as err:
795 raise ValueError('Unexpected parenthesis') from err
796 elif key == 'atom':
797 if value.casefold() == 'nil':
798 ptr.append(None)
799 else:
800 ptr.append(Atom(value))
801 elif key == 'number':
802 ptr.append(int(value))
803 elif key == 'string':
804 ptr.append(value)
805 elif key == 'literal':
806 size = int(value)
807 literal = self.file.read(size).decode('utf8')
808 ptr.append(literal)
809 elif key == 'garbage':
810 raise ValueError('Unrecognized data')
811 else:
812 # NOTREACHED
813 raise ClientSoftwareError('Unknown data type')
814 if size == -1:
815 break
816 if stack:
817 raise ValueError('Unbalanced parantheses')
818 return words
820 def sendline(self, *objs: Union[IO[Any], 'Word'], whole: bool = True):
821 """Convert `objs` to ACAP types and send them.
823 ================== ======================================
824 Python type ACAP type
825 ================== ======================================
826 :class:`Atom` Atom
827 :class:`typing.IO` Literal
828 ``None`` Nil
829 :class:`bytes` Literal or String [#literal]_
830 :class:`list` Parenthesised List
831 :class:`int` Number [#nums]_
832 :class:`str` Literal or String [#literal]_ [#utf8]_
833 ================== ======================================
835 For example:
837 >>> mgr.sendline(Atom('havespace'), 'script.sieve', 12345)
838 >>> mgr.receiveline()
839 ['OK', 'Putscript would succeed.']
841 The low-level interface can be used to pipeline commands:
843 >>> mgr.isconn(check=True)
844 >>> with open('foo.sieve') as script:
845 >>> mgr.sendline(Atom('putscript', script, 'foo.sieve'))
846 >>> mgr.sendline(Atom('logout'))
847 >>> for _ in range(2):
848 >>> mgr.collect(check=True)
850 Arguments:
851 objs: Objects to serialize.
852 whole: Conclude data with CRLF?
854 Raises:
855 ValueError: Number is not within the range [0, 4,294,967,295].
856 TypeError: Object cannot be represented as ACAP data type.
858 .. [#literal] Depending on content.
859 .. [#nums] Numbers must be within the range [0, 4,294,967,295]
860 .. [#utf8] Strings are encoded in UTF-8 and normalised to form C.
861 """
862 assert self.file
864 write = self.file.write
865 normalize = unicodedata.normalize
866 isstr = self._isstr
868 def encode(s: str) -> bytes:
869 return normalize('NFC', s).encode('utf8')
871 def writestr(b: bytes):
872 write(b'"%s"' % b if isstr(b) else b'{%d+}\r\n%s' % (len(b), b))
874 for i, obj in enumerate(objs):
875 if i > 0:
876 write(b' ')
877 if obj is None:
878 write(b'NIL')
879 elif isinstance(obj, Atom):
880 write(encode(obj))
881 elif isinstance(obj, int):
882 if not 0 <= obj < 4_294_967_296:
883 raise ValueError(f'{obj}: Not in [0, 4,294,967,295]')
884 write(encode(str(obj)))
885 elif isinstance(obj, bytes):
886 writestr(obj)
887 elif isinstance(obj, str):
888 writestr(encode(obj))
889 elif isinstance(obj, (IO, io.IOBase, SpooledTemporaryFile)):
890 write(b'{%d+}\r\n' % getfilesize(obj))
891 while block := obj.read(io.DEFAULT_BUFFER_SIZE):
892 write(encode(block) if isinstance(block, str) else block)
893 elif isinstance(obj, Iterable): # type: ignore
894 write(b'(')
895 self.sendline(*obj, whole=False)
896 write(b')')
897 else:
898 raise TypeError(type(obj).__name__ + ': Not an ACAP type')
900 if whole:
901 write(b'\r\n')
902 self.file.flush()
904 @property
905 @abstractmethod
906 def file(self) -> Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]:
907 """File-like access to the underlying socket."""
909 @file.setter
910 @abstractmethod
911 def file(self, file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]):
912 pass
914 _lex: ClassVar[Callable] = re.compile('|'.join((
915 r'\b(?P<atom>[a-z][^(){}\s\\]*)\b',
916 r'\b(?P<number>\d+)\b',
917 r'"(?P<string>[^\0\r\n"]*)"',
918 r'{(?P<literal>\d+)\+?}',
919 r'(?P<leftparen>\()',
920 r'(?P<rightparen>\))',
921 r'(?P<garbage>\S+)'
922 )), flags=re.IGNORECASE).finditer
923 """ACAP Lexer."""
925 # Strings may be 1024 octets long, but I limit the length to 1020 octets
926 # to allow for errors (two octets for quotations marks, one for the
927 # terminating null byte, one for other off-by-one errors).
928 _isstr: Callable[..., Optional[re.Match]] = \
929 re.compile(br'[^\0\r\n"]{0,1020}').fullmatch
930 """Check whether bytes can be represented as ACAP string."""
933#
934# ManageSieve
935#
937class SieveConn(BaseACAPConn):
938 """Low-level connection to a ManageSieve server.
940 For example:
942 >>> conn = SieveConn('imap.foo.example')
943 >>> conn.authenticate('user', 'password')
944 >>> with open('script.sieve', 'br') as file:
945 >>> conn.execute('putscript', file.name, file)
946 >>> conn.execute('logout')
948 .. warning::
949 :class:`SieveConn` is not thread-safe.
951 .. seealso::
952 :rfc:`2244`
953 Application Configuration Access Protocol
954 :rfc:`2782`
955 DNS SRV
956 :rfc:`5804`
957 ManageSieve
958 """
960 def __init__(self, *args, **kwargs):
961 """Create a :class:`SieveConn` object.
963 `args` and `kwargs` are passed to :meth:`open` if given.
964 Otherwise, no connection is established.
966 For example:
968 >>> with SieveConn('imap.host.example') as conn:
969 >>> conn.authenticate('user', 'password')
970 >>> ...
972 >>> with SieveConn() as conn:
973 >>> conn.open('imap.host.example')
974 >>> conn.authenticate('user', 'password')
975 >>> ...
977 Arguments:
978 args: Positional arguments for :meth:`open`.
979 kwargs: Keyword arguments for :meth:`open`.
981 Raises:
982 ClientConnectionError: Socket error.
983 SieveCapabilityError: "STARTTLS" not supported.
984 SieveProtocolError: Server violated the ManageSieve protocol.
985 TLSSecurityError: Server certificate has been revoked.
986 """
987 if args or kwargs:
988 self.open(*args, **kwargs)
990 def __del__(self):
991 """Shut the connection down."""
992 with suppress(OSError):
993 self.shutdown()
995 def authenticate(self, login: str, *auth, owner: str = '',
996 sasl: Union[AuthMech, Iterable[AuthMech]] = (),
997 logauth: bool = False, **kwauth):
998 """Authenticate as `login`.
1000 How the user is authenticated depends on the type of SASL_ mechanisms
1001 given in `sasl` (e.g., password-based or external).
1003 If no mechanisms are given, authentication is attempted with every
1004 non-obsolete password-based mechanism that is supported, starting with
1005 those with better security properties and progressing to those with
1006 worse security properties.
1008 Unrecognized arguments are passed on to SASL mechanism constructors.
1009 Password-based mechanisms require a password:
1011 >>> mgr.authenticate('user', 'password')
1013 By contrast, the "EXTERNAL" mechanism takes no arguments:
1015 >>> mgr.authenticate('user', sasl=ExternalAuth)
1017 If an `owner` is given, the scripts of that `owner` are managed
1018 instead of those owned by `login`. This requires elevated privileges.
1020 Arguments:
1021 login: User to login as (authentication ID).
1022 owner: User whose scripts to manage (authorization ID).
1023 sasl: SASL mechanisms (default: :meth:`BasePwdAuth.getmechs`).
1024 logauth: Log authentication exchange?
1025 auth: Positional arguments for SASL mechanism constructors.
1026 kwauth: Keyword arguments for SASL mechanism constructors.
1028 Raises:
1029 ClientConnectionError: Socket error.
1030 ClientOperationError: Another operation is already in progress.
1031 SASLCapabilityError: Authentication mechanisms exhausted.
1032 SASLProtocolError: Server violated the SASL protocol.
1033 SASLSecurityError: Server could not be verified.
1034 SieveConnectionError: Server has closed the connection.
1035 SieveOperationError: Authentication failed.
1036 SieveProtocolError: Server violated the ManageSieve protocol.
1037 ValueError: Bad characters in credentials.
1039 .. note::
1040 If an `owner` is given, but the selected authentication mechanism
1041 does not support proxy authentication, an error is logged to the
1042 console and authentication is attempted with the next mechanism.
1043 """
1044 kwauth['authzid'] = owner
1045 logger = self.logger
1046 self._auth = auth
1047 self._kwauth = kwauth
1048 self._logauth = logauth
1049 self._sasl = (BasePwdAuth.getmechs() if not sasl else
1050 sasl if isinstance(sasl, Iterable) else
1051 (sasl,))
1053 def authenticate():
1054 # pylint: disable=consider-using-with
1055 if not self.lock.acquire(blocking=False):
1056 raise ClientOperationError(os.strerror(EINPROGRESS))
1057 if isinstance(self.file, LogIOWrapper) and not logauth:
1058 self.file.quiet = True
1059 try:
1060 for mech in self._sasl:
1061 assert self.capabilities
1062 if mech.name.casefold() in self.capabilities.sasl:
1063 conn = SieveSASLAdapter(self)
1064 try:
1065 run = mech(conn, login, *auth, **kwauth)
1066 if caps := run():
1067 self.capabilities = caps
1068 except SASLCapabilityError as err:
1069 logger.error(err)
1070 continue
1071 except SieveOperationError as err:
1072 # TRANSITION-NEEDED need not be treated specially.
1073 if err.matches('AUTH-TOO-WEAK',
1074 'ENCRYPT-NEEDED',
1075 'TRANSITION-NEEDED'):
1076 logger.error(err)
1077 continue
1078 raise
1079 self.login = run.authcid
1080 logger.info('Authenticated as %s using %s',
1081 run.authcid, mech.name.upper())
1082 if authzid := run.authzid:
1083 self.owner = authzid
1084 logger.info('Authorized as %s', authzid)
1085 return
1086 raise SASLCapabilityError('SASL mechanisms exhausted')
1087 finally:
1088 if isinstance(self.file, LogIOWrapper):
1089 self.file.quiet = False
1090 self.lock.release()
1091 # NOTREACHED
1093 self.isconn(check=True)
1094 self._withfollow(authenticate)
1096 def close(self):
1097 """Close the client side of the connection.
1099 .. warning::
1100 Call only when the server has closed the connection.
1101 """
1102 if self.file is not None:
1103 self.file.close()
1104 self.file = None
1105 if self.poll is not None:
1106 assert self.sock
1107 self.poll.unregister(self.sock)
1108 self.poll = None
1109 if self.sock is not None:
1110 self.sock.close()
1111 self.sock = None
1112 self._auth = ()
1113 self._kwauth = {}
1114 self.capabilities = None
1115 self.host = None
1116 self.port = None
1117 self.login = ''
1118 self.owner = ''
1120 def collect(self, check: bool = False) -> tuple['Response', list[Line]]:
1121 """Collect the server's response to the last command.
1123 For example:
1125 >>> conn.sendline(Atom('listscripts'))
1126 >>> conn.collect()
1127 (Response(response=Atom('OK'), code=(), message=None),
1128 [['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']])
1130 Arguments:
1131 check: Raise an error if the response is not "OK"?
1133 Raises:
1134 ClientConnectionError: Socket error.
1135 SieveConnectionError: Server said "BYE". [#collect-check]_
1136 SieveOperationError: Server said "NO". [#collect-check]_
1137 SieveProtocolError: Server violated the ManageSieve protocol.
1139 .. [#collect-check] Only raised if `check` is `True`.
1140 """
1141 lines: list[Line] = []
1142 while True:
1143 try:
1144 line = self.receiveline()
1145 except ValueError as err:
1146 raise SieveProtocolError(str(err)) from err
1147 if line and isinstance(line[0], Atom):
1148 res = Response.fromline(line)
1149 if check and res.response != 'OK':
1150 raise res.toerror()
1151 self.warning = res.message if res.matches('WARNINGS') else None
1152 return res, lines
1153 lines.append(line)
1154 # NOTREACHED
1156 def execute(self, command: str, *args: Union[IO, Word]) \
1157 -> tuple['Response', list[Line]]:
1158 """Execute `command` and return the server's response.
1160 For example:
1162 >>> conn.execute('listscripts')
1163 (Response(response=Atom('OK'), code=(), message=None),
1164 [['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']])
1166 Raises:
1167 ClientConnectionError: Socket error.
1168 ClientOperationError: Another operation is already in progress.
1169 SieveConnectionError: Server said "BYE".
1170 SieveOperationError: Server said "NO".
1171 SieveProtocolError: Server violated the ManageSieve protocol.
1173 .. note::
1174 Referrals are followed automatically.
1175 """
1176 def execute() -> tuple['Response', list[Line]]:
1177 # pylint: disable=consider-using-with
1178 if not self.lock.acquire(blocking=False):
1179 raise ClientOperationError(os.strerror(EINPROGRESS))
1180 try:
1181 self.isconn(check=True)
1182 self.sendline(Atom(command.upper()), *args)
1183 res, data = self.collect()
1184 finally:
1185 self.lock.release()
1186 if res.response != 'OK':
1187 raise res.toerror()
1188 return res, data
1190 assert command
1191 return self._withfollow(execute)
1193 def geturl(self) -> Optional['URL']:
1194 """URL of the current connection.
1196 For example:
1198 >>> with SieveManager('imap.foo.example') as mgr:
1199 >>> mgr.authenticate('user', 'password')
1200 >>> mgr.geturl()
1201 'sieve://user@imap.foo.example'
1203 .. note::
1204 Only changes to the connection state effected by :meth:`open`,
1205 :meth:`close`, :meth:`shutdown`, :meth:`authenticate`,
1206 :meth:`unauthenticate`, and referrals are tracked.
1207 """
1208 if self.host:
1209 return URL(hostname=self.host,
1210 port=self.port if self.port != 4190 else None,
1211 username=self.login,
1212 owner=self.owner)
1213 return None
1215 def isconn(self, check: bool = False) -> bool:
1216 """Check whether :attr:`sock` is connected.
1218 Arguments:
1219 check: Raise an error if :attr:`sock` is *not* connected.
1221 Raises:
1222 ClientConnectionError: Socket error. [#isconn-check]_
1224 .. [#isconn-check] Only raised if `check` is `True`.
1225 """
1226 assert self.poll
1227 (_, events), = self.poll.poll()
1228 try:
1229 if events & select.POLLERR:
1230 raise ClientConnectionError(ENOTCONN, 'Socket error')
1231 if events & select.POLLHUP:
1232 raise ClientConnectionError(ETIMEDOUT, os.strerror(ETIMEDOUT))
1233 if events & select.POLLNVAL:
1234 raise ClientConnectionError(ENOTCONN, os.strerror(ENOTCONN))
1235 except ClientConnectionError:
1236 if check:
1237 raise
1238 return True
1240 # pylint: disable=missing-raises-doc, redefined-outer-name
1241 def open(self, host: str, port: int = 4190,
1242 source: tuple[str, int] = ('', 0),
1243 timeout: Optional[float] = socket.getdefaulttimeout(),
1244 tls: bool = True, ocsp: bool = True):
1245 """Connect to `host` at `port`.
1247 Arguments:
1248 host: Server name or address.
1249 port: Server port.
1250 source: Source address and port.
1251 timeout: Timeout in seconds.
1252 tls: Secure the connection?
1253 ocsp: Check whether the server certificate was revoked?
1255 Raises:
1256 ConnectionError: Connection failed.
1257 SieveCapabilityError: "STARTTLS" not supported.
1258 SieveProtocolError: Server violated the ManageSieve protocol.
1259 TLSSecurityError: Server certificate has been revoked.
1260 """
1261 # pylint: disable=consider-using-with
1262 if not self.lock.acquire(blocking=False):
1263 raise ClientOperationError(os.strerror(EINPROGRESS))
1264 try:
1265 self._connect(host, port, source, timeout)
1266 _, lines = self.collect(check=True)
1267 self._source = source
1268 self.capabilities = Capabilities.fromlines(lines)
1269 self.logger.info('Connected to %s:%d', host, port)
1270 finally:
1271 self.lock.release()
1272 if tls:
1273 self._starttls(ocsp=ocsp)
1275 def shutdown(self):
1276 """Shut the connection down.
1278 .. note::
1279 Use only when :meth:`logging out <logout>` would be unsafe.
1280 """
1281 if self.sock is not None:
1282 self.sock.shutdown(socket.SHUT_RDWR)
1283 self.logger.info('Shut connection down')
1284 self.close()
1286 def _connect(self, host: str, port: int = 4190,
1287 source: tuple[str, int] = ('', 0),
1288 timeout: Optional[float] = socket.getdefaulttimeout()):
1289 """Connect to `host` at `port`.
1291 Arguments:
1292 host: Server address.
1293 port: Server port.
1294 source: Source address and port.
1295 timeout: Timeout in seconds.
1297 Raises:
1298 ConnectionError: Connection failed.
1299 """
1300 def connect(host: str):
1301 self.sock = socket.create_connection(
1302 (host, port),
1303 timeout=timeout, source_address=source
1304 )
1306 if self.sock or self.file:
1307 raise ClientConnectionError(EISCONN, os.strerror(EISCONN))
1308 if isinetaddr(host):
1309 connect(host)
1310 else:
1311 try:
1312 # This is the algorithm specified by RFC 2782, NOT the one
1313 # specified by RFC 5804, sec. 1.8, which is wrong.
1314 records = list(resolvesrv(f'_sieve._tcp.{host}.'))
1315 for rec in records:
1316 try:
1317 connect(rec.host)
1318 except OSError:
1319 if rec == records[-1]:
1320 raise
1321 except DNSError:
1322 connect(host)
1323 if not self.sock:
1324 raise ClientConnectionError(ECONNABORTED,
1325 os.strerror(ECONNABORTED))
1326 file = self.sock.makefile('rwb') # type: ignore[attr-defined]
1327 self.file = LogIOWrapper.wrap(file, self.logger)
1328 self.poll = select.poll()
1329 self.poll.register(self.sock)
1330 self.host = host
1331 self.port = port
1333 def _follow(self, url: str):
1334 """Close the connection, :meth:`open <open>` `url`, and reauthenticate.
1336 Raises:
1337 ClientConnectionError: Socket error.
1338 SieveProtocolError: Server violated the ManageSieve protocol.
1339 """
1340 try:
1341 ref = URL.fromstr(url)
1342 except ValueError as err:
1343 raise SieveProtocolError(err) from err
1344 self.logger.info('Referred to %s', url)
1345 (oargs, okwargs), (auargs, aukwargs) = self._getstate()
1346 self.close()
1347 self.open(ref.hostname, ref.port or 4190, *oargs[2:], **okwargs)
1348 self.authenticate(*auargs, **aukwargs)
1350 def _getstate(self) -> Iterator[tuple[list, dict[str, Any]]]:
1351 """Get arguments to re-establish the current connection.
1353 For example:
1355 >>> mgr.geturl()
1356 sieve://user@imap.foo.example
1357 >>> (oargs, okwargs), (auth, kwauth) = mgr._getstate()
1358 >>> mgr.shutdown()
1359 >>> mgr.geturl()
1360 >>> mgr.open(*oargs, **okwargs)
1361 >>> mgr.authenticate(*auth, **kwauth)
1362 >>> mgr.geturl()
1363 sieve://user@imap.foo.example
1364 """
1365 for meth in (self.open, self.authenticate):
1366 args = []
1367 kwargs = {}
1368 signature = inspect.signature(meth) # type: ignore[arg-type]
1369 for name, param in list(signature.parameters.items()):
1370 try:
1371 value = getattr(self, name)
1372 except AttributeError:
1373 value = getattr(self, f'_{name}')
1374 if param.kind in (Parameter.POSITIONAL_ONLY,
1375 Parameter.POSITIONAL_OR_KEYWORD):
1376 args.append(value)
1377 elif param.kind == Parameter.VAR_POSITIONAL:
1378 args.extend(value)
1379 elif param.kind == Parameter.KEYWORD_ONLY:
1380 kwargs[name] = value
1381 elif param.kind == Parameter.VAR_KEYWORD:
1382 kwargs.update(value)
1383 yield (args, kwargs)
1385 # pylint: disable=redefined-outer-name
1386 def _starttls(self, ocsp: bool = True):
1387 """Start TLS encryption.
1389 Arguments:
1390 ocsp: Check whether the server certificate was revoked?
1392 Raises:
1393 ClientConnectionError: Socket error.
1394 ClientOperationError: Another operation is already in progress.
1395 SieveCapabilityError: "STARTTLS" not supported.
1396 TLSSecurityError: Server certificate has been revoked.
1398 .. note::
1399 Called automatically by :meth:`open` unless `tls` is `False`.
1400 """
1401 assert self.sock
1402 assert self.capabilities
1403 if not self.capabilities.starttls:
1404 raise SieveCapabilityError('STARTTLS: not supported')
1405 self.execute('STARTTLS')
1406 # pylint: disable=consider-using-with
1407 if not self.lock.acquire(blocking=False):
1408 raise ClientOperationError(os.strerror(EINPROGRESS))
1409 try:
1410 host = self.host
1411 self.sock = self.sslcontext.wrap_socket(
1412 self.sock, # type: ignore[arg-type]
1413 server_hostname=host
1414 )
1415 file = self.sock.makefile('rwb')
1416 self.file = LogIOWrapper.wrap(file, self.logger) # type: ignore
1417 _, lines = self.collect(check=True)
1418 self.capabilities = Capabilities.fromlines(lines)
1419 self.ocsp = ocsp
1420 proto = self.sock.version()
1421 cipher = self.sock.cipher()
1422 if proto and cipher:
1423 self.logger.info('Started %s using %s', proto, cipher[0])
1424 if ocsp:
1425 if HAVE_CRYPTOGRAPHY:
1426 der = self.sock.getpeercert(binary_form=True)
1427 if not der:
1428 raise TLSSoftwareError('No peer certificate')
1429 cert = x509.load_der_x509_certificate(der)
1430 if certrevoked(cert, logger=self.logger):
1431 raise TLSSecurityError(f'{host}: Certificate revoked')
1432 else:
1433 self.logger.error('Module "cryptography" not found')
1434 finally:
1435 self.lock.release()
1437 def _withfollow(self, func: Callable[..., T], *args, **kwargs) -> T:
1438 """Call `func` and follow referrals.
1440 For example:
1442 >>> mgr._withfollow(mgr.execute, 'listscripts')
1444 Arguments:
1445 func: Function to call.
1446 args: Positional arguments for `func`.
1447 kwargs: Keyword arguments for `func`.
1449 Returns:
1450 The return value of `func`.
1452 Raises:
1453 ClientConnectionError: Socket error.
1454 ClientOperationError: Another operation is already in progress.
1455 SieveConnectionError: Server said "BYE".
1456 SieveOperationError: Server said "NO".
1457 SieveProtocolError: Server violated the ManageSieve protocol.
1459 .. seealso::
1460 :rfc:`5804` (sec. 1.3)
1461 ManageSieve "REFERRAL" response codes
1462 """
1463 while True:
1464 try:
1465 return func(*args, **kwargs)
1466 except SieveConnectionError as err:
1467 if err.matches('REFERRAL'):
1468 try:
1469 url = err.code[1]
1470 except IndexError as exc:
1471 raise SieveProtocolError('Unexpected data') from exc
1472 if isinstance(url, Atom) or not isinstance(url, str):
1473 # pylint: disable=raise-missing-from
1474 raise SieveProtocolError('Expected string')
1475 self._follow(url)
1476 continue
1477 raise
1478 # NOTREACHED
1480 def _withreopen(self, func: Callable[..., T], *args, **kwargs) -> T:
1481 """Call `func` and retry if the connection is closed.
1483 For example:
1485 >>> mgr._withreopen(mgr.execute, 'listscripts')
1487 Arguments:
1488 func: Function to call.
1489 args: Positional arguments for `func`.
1490 kwargs: Keyword arguments for `func`.
1492 Raises:
1493 ClientConnectionError: Socket error.
1494 ClientOperationError: Another operation is already in progress.
1495 SieveConnectionError: Server said "BYE".
1496 SieveOperationError: Server said "NO".
1497 SieveProtocolError: Server violated the ManageSieve protocol.
1499 Returns:
1500 The return value of `func`.
1501 """
1502 (oargs, okwargs), (auth, kwauth) = self._getstate()
1503 try:
1504 return func(*args, **kwargs)
1505 except (ConnectionError, TimeoutError, socket.herror, socket.gaierror):
1506 self.close()
1507 self.open(*oargs, **okwargs)
1508 self.authenticate(*auth, **kwauth)
1509 return func(*args, **kwargs)
1511 @property
1512 def timeout(self) -> Optional[float]:
1513 """Connection timeout in seconds.
1515 Set timeout to 500 ms:
1517 >>> mgr.timeout = 0.5
1519 .. note::
1520 The timeout can only be set while a connection is open.
1521 """
1522 if self.sock:
1523 return self.sock.gettimeout()
1524 return None
1526 @timeout.setter
1527 def timeout(self, secs: Optional[float]):
1528 if self.sock:
1529 self.sock.settimeout(secs)
1531 @property
1532 def tls(self) -> Optional[str]: # type: ignore[override]
1533 """TLS version."""
1534 if isinstance(self.sock, ssl.SSLSocket):
1535 return self.sock.version()
1536 return None
1538 capabilities: Optional[Capabilities] = None
1539 """Server capabilities."""
1541 file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]] = None
1542 """File-like access to :attr:`sock`."""
1544 host: Optional[str] = None
1545 """Remote address."""
1547 lock: threading.Lock = threading.Lock()
1548 """Operation lock."""
1550 logger: logging.Logger = logging.getLogger(__name__)
1551 """Logger to use.
1553 Messages are logged with the following priorities:
1555 ====================== =====================================
1556 Priority Used for
1557 ====================== =====================================
1558 :const:`logging.ERROR` Non-fatal errors
1559 :const:`logging.INFO` State changes
1560 :const:`logging.DEBUG` Data sent to/received from the server
1561 ====================== =====================================
1563 Suppress logging:
1565 >>> from logging import getLogger
1566 >>> getLogger('sievemgr').setLevel(logging.CRITICAL)
1568 Use a custom logger:
1570 >>> from logging import getLogger
1571 >>> mgr.logger = getLogger('foo').addHandler(logging.NullHandler())
1573 Print data send to/received from the server to standard error:
1575 >>> from logging import getLogger
1576 >>> getLogger('sievemgr').setLevel(logging.DEBUG)
1577 >>> mgr.listscripts()
1578 C: LISTSCRIPTS
1579 S: "foo.sieve" ACTIVE
1580 S: "bar.sieve"
1581 S: "baz.sieve"
1582 S: OK "Listscripts completed"
1583 (Response(response=Atom('OK'), code=(), message=None),
1584 [('foo.sieve', True), ('bar.sieve', False), ('baz.sieve', False)])
1585 """
1587 login: str = ''
1588 """Login name (authentication ID)."""
1590 ocsp: bool
1591 """Check whether the server certificate was revoked?"""
1593 owner: str = ''
1594 """User whose scripts are managed (authorization ID)."""
1596 poll: Optional[select.poll] = None
1597 """Polling object for :attr:`sock`."""
1599 port: Optional[int] = None
1600 """Remote port."""
1602 sock: Optional[socket.SocketType] = None
1603 """Underlying socket."""
1605 sslcontext: ssl.SSLContext = ssl.create_default_context()
1606 """Settings for negotiating Transport Layer Security (TLS).
1608 Disable workarounds for broken X.509 certificates:
1610 >>> with SieveManager() as mgr:
1611 >>> mgr.sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT
1612 >>> mgr.open('imap.foo.example')
1613 >>> ...
1615 Load client certificate/key pair:
1617 >>> with SieveManager() as mgr:
1618 >>> mgr.sslcontext.load_cert_chain(cert='cert.pem')
1619 >>> mgr.open('imap.foo.example')
1620 >>> ...
1622 Use a custom certificate authority:
1624 >>> with SieveManager() as mgr:
1625 >>> mgr.sslcontext.load_verify_locations(cafile='ca.pem')
1626 >>> mgr.open('imap.foo.example')
1627 >>> ...
1628 """
1630 warning: Optional[str] = None
1631 """Warning issued in response to the last "CHECKSCRIPT" or "PUTSCRIPT".
1633 For example:
1635 >>> with open('script.sieve', 'br') as file:
1636 >>> mgr.execute('putscript', file, 'script.sieve')
1637 (Response(response=Atom('OK'), code=('warnings,'),
1638 message='line 7: may need to be frobnicated'), [])
1639 >>> mgr.warning
1640 'line 7: may need to be frobnicated'
1642 .. note::
1643 Only set by :meth:`collect`, :meth:`execute`,
1644 :meth:`checkscript`, and :meth:`putscript`.
1646 .. seealso::
1647 :rfc:`5804` (sec. 1.3)
1648 ManageSieve "WARNINGS" response code.
1649 """
1651 _auth: tuple = ()
1652 """Positional arguments for SASL mechanism constructors."""
1654 _kwauth: dict[str, Any] = {}
1655 """Keyword arguments for SASL mechanism constructors."""
1657 _logauth: bool
1658 """Log the authentication exchange?"""
1660 _sasl: Iterable[AuthMech] = ()
1661 """SASL mechanisms."""
1663 _source: tuple[str, int] = ('', 0)
1664 """Source address and port."""
1667class SieveManager(SieveConn, AbstractContextManager):
1668 """Connection to a ManageSieve server.
1670 For example:
1672 >>> with SieveManager('imap.foo.example') as mgr:
1673 >>> mgr.authenticate('user', 'password')
1674 >>> with open('sieve.script', 'br') as script:
1675 >>> mgr.putscript(script, 'sieve.script')
1676 >>> mgr.setactive('sieve.script')
1678 .. warning::
1679 :class:`SieveManager` is not thread-safe.
1680 """
1682 # pylint: disable=redefined-outer-name
1683 def __init__(self, *args, backup: int = 0,
1684 memory: int = 524_288, **kwargs):
1685 """Create a :class:`SieveManager` object.
1687 `args` and `kwargs` are passed to :meth:`open` if given.
1688 Otherwise, no connection is established.
1690 Arguments:
1691 backup: How many backups to keep by default.
1692 memory: See `max_size` in :class:`tempfile.SpooledTemporaryFile`.
1693 args: Positional arguments for :meth:`open`.
1694 kwargs: Keyword arguments for :meth:`open`.
1696 Raises:
1697 ClientConnectionError: Socket error.
1698 SieveCapabilityError: "STARTTLS" not supported.
1699 SieveProtocolError: Server violated the ManageSieve protocol.
1700 TLSSecurityError: Server certificate has been revoked.
1701 """
1702 super().__init__(*args, **kwargs)
1703 self.backup: int = backup
1704 self.memory: int = memory
1706 def __exit__(self, _exctype, excvalue, _traceback):
1707 """Exit the context and close the connection appropriately."""
1708 if isinstance(excvalue, (ConnectionError, TimeoutError)):
1709 self.close()
1710 elif isinstance(excvalue, ProtocolError):
1711 self.shutdown()
1712 else:
1713 try:
1714 self.logout()
1715 except ClientOperationError:
1716 try:
1717 self.shutdown()
1718 except OSError:
1719 self.close()
1721 def backupscript(self, script: str, keep: int = 1):
1722 """Make an Emacs-style backup of `script`.
1724 `keep` = 0
1725 Do nothing.
1727 `keep` = 1
1728 :file:`script` is backed up as :file:`script~`.
1730 `keep` > 1
1731 :file:`script` is backed up as :file:`script.~{n}~`.
1732 `n` starts with 1 and increments with each backup.
1733 Old backups are deleted if there are more than `keep` backups.
1735 For example:
1737 >>> mgr.listscripts()
1738 [('script.sieve', True)]
1739 >>> mgr.backupscript('script.sieve', keep=0)
1740 >>> mgr.listscripts()
1741 [('script.sieve', True)]
1743 >>> mgr.listscripts()
1744 [('script.sieve', True)]
1745 >>> mgr.backupscript('script.sieve', keep=1)
1746 >>> mgr.listscripts()
1747 [('script.sieve', True), ('script.sieve~', False)]
1749 >>> mgr.listscripts()
1750 [('script.sieve', True)]
1751 >>> mgr.backupscript('script.sieve', keep=2)
1752 >>> mgr.listscripts()
1753 [('script.sieve', True), ('script.sieve.~1~', False)]
1754 >>> mgr.backupscript('script.sieve', keep=2)
1755 >>> mgr.listscripts()
1756 [('script.sieve', True),
1757 ('script.sieve.~1~', False),
1758 ('script.sieve.~2~', False)]
1759 >>> mgr.backupscript('script.sieve', keep=2)
1760 >>> mgr.listscripts()
1761 [('script.sieve', True),
1762 ('script.sieve.~2~', False),
1763 ('script.sieve.~3~', False)]
1765 Arguments:
1766 script: Script name.
1767 keep: How many backups to keep.
1769 Raises:
1770 ClientConnectionError: Socket error.
1771 ClientOperationError: Another operation is already in progress.
1772 SieveConnectionError: Server has closed the connection.
1773 SieveProtocolError: Server violated the ManageSieve protocol.
1774 """
1775 def getfiles() -> Iterator[str]:
1776 for script, _ in self.listscripts():
1777 yield script
1779 def copy(src: str, targ: str):
1780 self.copyscript(src, targ, backup=0)
1782 backup(script, keep, getfiles, copy, self.deletescript)
1784 def checkscript(self, script: Union[str, IO]):
1785 """Check whether `script` is valid.
1787 Syntax errors trigger a :exc:`SieveOperationError`.
1788 Semantic errors are reported in :attr:`warning`.
1790 For example:
1792 >>> checkscript('foo')
1793 Traceback (most recent call last):
1794 [...]
1795 SieveOperationError: line 1: error: expected end of command ';'
1796 error: parse failed.
1798 >>> checkscript('# foo')
1799 >>>
1801 Arguments:
1802 script: Script (*not* script name).
1804 Raises:
1805 ClientConnectionError: Socket error.
1806 ClientOperationError: Another operation is already in progress.
1807 SieveCapabilityError: "CHECKSCRIPT" not supported.
1808 SieveConnectionError: Server has closed the connection.
1809 SieveOperationError: `Script` contains syntax errors.
1810 SieveProtocolError: Server violated the ManageSieve protocol.
1812 .. important::
1813 Sieve scripts must be encoded in UTF-8.
1814 """
1815 assert self.capabilities
1816 if not self.capabilities.version:
1817 raise SieveCapabilityError('CHECKSCRIPT: Not supported')
1818 self.execute('CHECKSCRIPT', script)
1820 def copyscript(self, source: str, target: str,
1821 backup: Optional[int] = None):
1822 """Download `source` and re-upload it as `target`.
1824 Arguments:
1825 source: Source name.
1826 target: Target name.
1827 backup: How many backups to keep (default: :attr:`backup`).
1829 Raises:
1830 ClientConnectionError: Socket error.
1831 ClientOperationError: Another operation is already in progress.
1832 SieveConnectionError: Server has closed the connection.
1833 SieveProtocolError: Server violated the ManageSieve protocol.
1834 """
1835 with SpooledTemporaryFile(max_size=self.memory, mode='bw+') as temp:
1836 temp.write(self.getscript(source).encode('utf8'))
1837 temp.seek(0)
1838 self.putscript(temp, target, backup=backup)
1840 def deletescript(self, script: str):
1841 """Delete `script`.
1843 Raises:
1844 ClientConnectionError: Socket error.
1845 ClientOperationError: Another operation is already in progress.
1846 SieveConnectionError: Server has closed the connection.
1847 SieveProtocolError: Server violated the ManageSieve protocol.
1848 """
1849 self._scripts = None
1850 self.validname(script, check=True)
1851 self.execute('DELETESCRIPT', script)
1852 self.logger.info('Removed %s', script)
1854 def editscripts(self, command: list[str], scripts: list[str], *args,
1855 catch: Optional[Callable[[Exception, str], bool]] = None,
1856 check: bool = True, create: bool = True,
1857 **kwargs) -> subprocess.CompletedProcess:
1858 """Download `scripts`, edit them with `command`, and re-upload them.
1860 The `scripts` are appended to the `command`, which is then passed
1861 to :func:`subprocess.run`. Scripts that have been changed are then
1862 re-uploaded to the server. If the server has closed the connection
1863 in the meantime, the connection is re-established automatically.
1865 If :meth:`putscript` raises an error and `catch` has been given,
1866 then the error and the name of the offending script are passed to
1867 `catch`, which should return `True` if the `command` should be
1868 re-invoked for that script and `False` otherwise. Either way,
1869 the error will be suppressed.
1871 For example:
1873 >>> mgr.editscripts(['vi'], ['foo.sieve'])
1875 >>> cp = mgr.editscripts(['cmp'], ['a.sieve', 'b.sieve'], check=False)
1876 >>> if cp.returncode != 0:
1877 >>> print('a.sieve and b.sieve differ')
1879 Arguments:
1880 command: Command to run.
1881 scripts: Scripts to edit.
1882 catch: Error handler.
1883 check: See :func:`subprocess.run`.
1884 create: Create scripts that do not exist?
1885 args: Positional arguments for :func:`subprocess.run`.
1886 kwargs: Keywords arguments for :func:`subprocess.run`.
1888 Raises:
1889 ClientConnectionError: Socket error.
1890 ClientOperationError: Another operation is already in progress.
1891 SieveConnectionError: Server has closed the connection.
1892 SieveOperationError: At least one script contains a syntax error.
1893 [#editscripts-catch]_
1894 SieveProtocolError: Server violated the ManageSieve protocol.
1895 ValueError: Script name contains path separator.
1897 .. [#editscripts-catch] Only raised if `catch` has *not* been given.
1898 """
1899 for script in scripts:
1900 self.validname(script, check=True)
1901 if path.sep in script:
1902 raise ValueError(f'{script}: Filename contains {path.sep}')
1903 with TemporaryDirectory() as tmpdir:
1904 fnames = []
1905 ctimes = []
1906 for script in scripts:
1907 fname = path.join(tmpdir, script)
1908 with open(fname, 'w', encoding='utf8') as file:
1909 try:
1910 file.write(self.getscript(script))
1911 except SieveOperationError as err:
1912 if not create:
1913 raise
1914 if not err.code:
1915 if self.scriptexists(script):
1916 raise
1917 elif not err.matches('NONEXISTENT'):
1918 raise
1919 fnames.append(fname)
1920 ctimes.append(os.stat(fname).st_ctime)
1921 while True:
1922 cp = subprocess.run(command + fnames, *args,
1923 check=check, **kwargs)
1924 retry: list[tuple[str, str, float]] = []
1925 for script, fname, ctime in zip(scripts, fnames, ctimes):
1926 if os.stat(fname).st_ctime > ctime:
1927 with open(fname, 'rb') as file:
1928 try:
1929 self._withreopen(self.putscript, file, script)
1930 except SieveOperationError as err:
1931 if catch is None:
1932 raise
1933 if catch(err, script):
1934 retry.append((script, fname, ctime))
1935 if not retry:
1936 return cp
1937 scripts, fnames, ctimes = map(list, zip(*retry))
1938 # NOTREACHED
1940 def getactive(self) -> Optional[str]:
1941 """Get the name of the active script.
1943 Raises:
1944 ClientConnectionError: Socket error.
1945 ClientOperationError: Another operation is already in progress.
1946 SieveConnectionError: Server has closed the connection.
1947 SieveProtocolError: Server violated the ManageSieve protocol.
1948 """
1949 for name, active in self.listscripts():
1950 if active:
1951 return name
1952 return None
1954 def getscript(self, script: str) -> str:
1955 """Download `script`.
1957 For example:
1959 >>> with open('foo.sieve', 'w', encoding='utf8') as file:
1960 >>> file.write(mgr.getscript('foo.sieve'))
1962 Arguments:
1963 script: Script name.
1965 Raises:
1966 ClientConnectionError: Socket error.
1967 ClientOperationError: Another operation is already in progress.
1968 SieveConnectionError: Server has closed the connection.
1969 SieveProtocolError: Server violated the ManageSieve protocol.
1970 """
1971 self.validname(script, check=True)
1972 try:
1973 # pylint: disable=unbalanced-tuple-unpacking
1974 _, ((content,),) = self.execute('GETSCRIPT', script)
1975 except ValueError as err:
1976 raise SieveProtocolError('Unexpected data') from err
1977 if isinstance(content, Atom):
1978 raise SieveProtocolError('Unexpected atom')
1979 if isinstance(content, str):
1980 return content
1981 raise SieveProtocolError('Unexpected ' + type(content).__name__)
1983 def havespace(self, script: str, size: int):
1984 """Check whether there is enough space for `script`.
1986 Arguments:
1987 script: Script name.
1988 size: Script size in bytes.
1990 Raises:
1991 ClientConnectionError: Socket error.
1992 ClientOperationError: Another operation is already in progress.
1993 SieveConnectionError: Server has closed the connection.
1994 SieveOperationError: There is *not* enough space.
1995 SieveProtocolError: Server violated the ManageSieve protocol.
1996 """
1997 self.validname(script, check=True)
1998 self.execute('HAVESPACE', script, size)
2000 def listscripts(self, cached: bool = False) -> list[tuple[str, bool]]:
2001 """List scripts and whether they are the active script.
2003 For example:
2005 >>> mgr.listscripts()
2006 [('foo.sieve', False), ('bar.sieve', True)]
2008 >>> scripts = [script for script, _ in mgr.listscripts()]
2010 Arguments:
2011 cached: Return cached response? [#cached]_
2013 Returns:
2014 A list of script name/status tuples.
2016 Raises:
2017 ClientConnectionError: Socket error.
2018 ClientOperationError: Another operation is already in progress.
2019 SieveConnectionError: Server has closed the connection.
2020 SieveProtocolError: Server violated the ManageSieve protocol.
2022 .. [#cached] The cache is cleared after :meth:`copyscript`,
2023 :meth:`deletescript`, :meth:`putscript`,
2024 :meth:`renamescript`, :meth:`setactive`, and
2025 :meth:`unsetactive`.
2026 """
2027 if not cached or self._scripts is None:
2028 self._scripts = []
2029 for line in self.execute('LISTSCRIPTS')[1]:
2030 try:
2031 name = line[0]
2032 except IndexError as err:
2033 raise SieveProtocolError('No data') from err
2034 if not isinstance(name, str):
2035 raise SieveProtocolError('Expected string')
2036 try:
2037 status = line[1]
2038 if not isinstance(status, str):
2039 raise SieveProtocolError('Expected string')
2040 active = status.casefold() == 'active'
2041 except IndexError:
2042 active = False
2043 self._scripts.append((name, active))
2044 return self._scripts
2046 def logout(self):
2047 """Log out.
2049 .. note::
2050 :meth:`logout` should be called to close the connection
2051 unless :class:`SieveManager` is used as a context manager.
2053 .. warning::
2054 Logging out is unsafe after a :exc:`ProtocolError`.
2055 Use :meth:`shutdown` instead.
2056 """
2057 if self.sock is not None:
2058 try:
2059 self.execute('LOGOUT')
2060 self.logger.info('Logged out')
2061 except (OperationError, ProtocolError):
2062 with suppress(OSError):
2063 self.shutdown()
2064 except ConnectionError:
2065 pass
2066 self.close()
2068 def noop(self, tag: Optional[str] = None) -> Optional[str]:
2069 """Request a no-op.
2071 For example:
2073 >>> mgr.noop('foo')
2074 'foo'
2076 Arguments:
2077 tag: String for the server to echo back.
2079 Returns:
2080 Server echo.
2082 Raises:
2083 ClientConnectionError: Socket error.
2084 ClientOperationError: Another operation is already in progress.
2085 SieveCapabilityError: "NOOP" not supported.
2086 SieveConnectionError: Server has closed the connection.
2087 SieveProtocolError: Server violated the ManageSieve protocol.
2088 """
2089 assert self.capabilities
2090 if not self.capabilities.version:
2091 raise SieveCapabilityError('NOOP: not supported')
2092 args = () if tag is None else (tag,)
2093 res, _ = self.execute('NOOP', *args)
2094 try:
2095 data = res.code[1]
2096 except IndexError:
2097 return None
2098 if isinstance(data, Atom) or not isinstance(data, str):
2099 raise SieveProtocolError('Expected string')
2100 return data
2102 def putscript(self, source: Union[str, IO], target: str,
2103 backup: Optional[int] = None):
2104 """Upload `source` to the server as `target`.
2106 The server should reject syntactically invalid scripts and
2107 may issue a :attr:`warning` for semantically invalid ones.
2108 Updates should be atomic.
2110 For example:
2112 >>> mgr.putscript('# empty', 'foo.sieve')
2114 >>> with open('foo.sieve', 'br') as file:
2115 >>> mgr.putscript(file, 'foo.sieve')
2117 Arguments:
2118 source: Script (*not* script name).
2119 target: Script name.
2120 backup: How many backups to keep (default: :attr:`backup`).
2122 Raises:
2123 ClientConnectionError: Socket error.
2124 ClientOperationError: Another operation is already in progress.
2125 SieveConnectionError: Server has closed the connection.
2126 SieveOperationError: `Script` contains syntax errors.
2127 SieveProtocolError: Server violated the ManageSieve protocol.
2129 .. important::
2130 Sieve scripts must be encoded in UTF-8.
2131 """
2132 self.validname(target, check=True)
2133 try:
2134 keep = self.backup if backup is None else backup
2135 self.backupscript(target, keep=keep)
2136 except SieveOperationError as err:
2137 if not err.code:
2138 for script, _ in self.listscripts():
2139 if script == target:
2140 raise
2141 elif not err.matches('NONEXISTENT'):
2142 raise
2143 self._scripts = None
2144 self.execute('PUTSCRIPT', target, source)
2145 self.logger.info('Uploaded %s', target)
2147 def renamescript(self, source: str, target: str, emulate: bool = True):
2148 """Rename `source` to `target`.
2150 Some servers do not the support the "RENAMESCRIPT" command.
2151 On such servers, renaming is emulated by downloading `source`,
2152 re-uploading it as `target`, marking `target` as the active script
2153 if `source` is the active script, and then deleting `source`.
2155 For example:
2157 >>> mgr.renamescript('foo.sieve', 'bar.sieve', emulate=False)
2159 Arguments:
2160 source: Script name.
2161 target: Script name.
2162 emulate: Emulate "RENAMESCRIPT" if the server does not support it?
2164 Raises:
2165 SieveCapabilityError: "RENAMESCRIPT" not supported. [#emulate]_
2166 SieveOperationError: `source` does not exist or `target` exists.
2168 .. [#emulate] Only raised if `emulate` is `False`.
2169 """
2170 assert self.capabilities
2171 self.validname(source, check=True)
2172 self.validname(target, check=True)
2173 if self.capabilities.version:
2174 self._scripts = None
2175 self.execute('RENAMESCRIPT', source, target)
2176 self.logger.info('Renamed %s to %s', source, target)
2177 elif emulate:
2178 sourceactive: Optional[bool] = None
2179 for script, active in self.listscripts():
2180 if script == source:
2181 sourceactive = active
2182 if script == target:
2183 raise SieveOperationError(
2184 code=(Atom('alreadyexists'),),
2185 message=f'{target}: {os.strerror(EEXIST)}'
2186 )
2187 if sourceactive is None:
2188 raise SieveOperationError(
2189 code=(Atom('nonexistent'),),
2190 message=f'{source}: {os.strerror(ENOENT)}'
2191 )
2192 self.copyscript(source, target, backup=0)
2193 if sourceactive:
2194 self.setactive(target)
2195 self.deletescript(source)
2196 else:
2197 raise SieveCapabilityError('RENAMESCRIPT: Not supported')
2199 def scriptexists(self, script: str, cached: bool = False) -> bool:
2200 """Check if `script` exists.
2202 Arguments:
2203 script: Script name.
2204 cached: Return cached response? [#cached]_
2206 Raises:
2207 ClientConnectionError: Socket error.
2208 ClientOperationError: Another operation is already in progress.
2209 SieveConnectionError: Server has closed the connection.
2210 SieveProtocolError: Server violated the ManageSieve protocol.
2211 """
2212 self.validname(script, check=True)
2213 return any(s == script for s, _ in self.listscripts(cached=cached))
2215 def setactive(self, script: str):
2216 """Mark `script` as the active script.
2218 Raises:
2219 ClientConnectionError: Socket error.
2220 ClientOperationError: Another operation is already in progress.
2221 SieveConnectionError: Server has closed the connection.
2222 SieveProtocolError: Server violated the ManageSieve protocol.
2223 """
2224 self.validname(script, check=True)
2225 self._scripts = None
2226 self.execute('SETACTIVE', script)
2227 self.logger.info('Activated %s', script)
2229 def unauthenticate(self):
2230 """Unauthenticate.
2232 Raises:
2233 ClientConnectionError: Socket error.
2234 ClientOperationError: Another operation is already in progress.
2235 SieveCapabilityError: "UNAUTHENTICATE" not supported.
2236 SieveConnectionError: Server has closed the connection.
2237 SieveProtocolError: Server violated the ManageSieve protocol.
2238 """
2239 assert self.capabilities
2240 if not self.capabilities.unauthenticate:
2241 raise SieveCapabilityError('UNAUTHENTICATE: Not supported')
2242 self.execute('UNAUTHENTICATE')
2243 self.login = ''
2244 self.owner = ''
2245 self.logger.info('Un-authenticated')
2247 def unsetactive(self):
2248 """Deactivate the active script.
2250 Raises:
2251 ClientConnectionError: Socket error.
2252 ClientOperationError: Another operation is already in progress.
2253 SieveConnectionError: Server has closed the connection.
2254 SieveProtocolError: Server violated the ManageSieve protocol.
2255 """
2256 self._scripts = None
2257 self.execute('SETACTIVE', '')
2258 self.logger.info('Deactivated active script')
2260 @classmethod
2261 def validname(cls, script: str, check: bool = False) -> bool:
2262 """Check whether `script` is a valid script name.
2264 Arguments:
2265 script: Script name
2266 check: Raise an error if `script` is not a valid script name?
2268 Raises:
2269 ValueError: `script` is *not* valid. [#validname-check]_
2271 .. [#validname-check] Only raised if `check` is `True`.
2272 """
2273 if cls._isname(script):
2274 return True
2275 if check:
2276 raise ValueError(escapectrl(script) + ': Bad name')
2277 return False
2279 backup: int = 0
2280 """How many backups to keep."""
2282 _isname: Callable[..., Optional[re.Match]] = re.compile(
2283 '[^\u0000-\u001f\u0080-\u009f\u2028\u2029]+'
2284 ).fullmatch
2285 """Check whether a string is a valid script name.
2287 .. seealso::
2288 :rfc:`5198` (sec. 2)
2289 Definition of unicode format for network interchange.
2290 :rfc:`5804` (sec. 1.6)
2291 ManageSieve script names.
2292 """
2294 _scripts: Optional[list[tuple[str, bool]]] = None
2295 """Scripts returned by the last :meth:`listscripts`."""
2298class SieveSASLAdapter(BaseSASLAdapter):
2299 """Adapter to send SASL messages over a :class:`SieveConn`."""
2301 def __init__(self, connection: 'SieveConn'):
2302 """Initialize the adapter."""
2303 self.conn = connection
2305 def abort(self):
2306 self.send(b'*')
2307 self.end()
2309 def begin(self, name: str, data: Optional[bytes] = None):
2310 assert self.conn
2311 args: list[Any] = [Atom('AUTHENTICATE'), name.upper()]
2312 if data is not None:
2313 args.append(b64encode(data))
2314 self.conn.sendline(*args) # type: ignore[arg-type]
2316 def end(self):
2317 assert self.conn
2318 try:
2319 res = Response.fromline(self.conn.receiveline())
2320 except ValueError as err:
2321 raise SieveProtocolError(str(err)) from err
2322 if res.response != 'OK':
2323 raise res.toerror()
2325 def send(self, data: bytes):
2326 assert self.conn
2327 conn = self.conn
2328 conn.sendline(b64encode(data)) # type: ignore[arg-type]
2330 def receive(self) -> bytes:
2331 assert self.conn
2332 try:
2333 line = self.conn.receiveline()
2334 except ValueError as err:
2335 raise SieveProtocolError(str(err)) from err
2336 if isinstance(line[0], Atom):
2337 res = Response.fromline(line)
2338 raise res.toerror()
2339 try:
2340 word, = line
2341 except ValueError as err:
2342 raise SieveProtocolError('Unexpected data') from err
2343 if isinstance(word, Atom) or not isinstance(word, str):
2344 raise SieveProtocolError('Expected string')
2345 return b64decode(word)
2347 @property
2348 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]:
2349 assert self.conn
2350 assert self.conn.sock
2351 return self.conn.sock
2353 @sock.setter
2354 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]):
2355 assert self.conn
2356 self.conn.sock = sock
2358 conn: Optional[SieveConn] = None
2359 """Underlying connection."""
2362CapabilitiesT = TypeVar('CapabilitiesT', bound='Capabilities')
2363""":class:`Capabilities` type variable."""
2366@dataclass
2367class Capabilities():
2368 """Server capabilities."""
2370 @classmethod
2371 def fromlines(cls: type[CapabilitiesT],
2372 lines: Iterable[Line]) -> CapabilitiesT:
2373 """Create a :class:`Capabilities` object from a server response."""
2374 def getvalue(words: Line) -> str:
2375 try:
2376 value, = words[1:]
2377 except ValueError as err:
2378 raise SieveProtocolError('Expected word') from err
2379 if not isinstance(value, str):
2380 raise SieveProtocolError('Expected string')
2381 return value
2383 obj = cls()
2384 for words in lines:
2385 try:
2386 key = words[0].casefold() # type: ignore[union-attr]
2387 except IndexError as err:
2388 raise SieveProtocolError('Expected word') from err
2389 except AttributeError as err:
2390 raise SieveProtocolError('Expected string') from err
2391 if key in ('implementation', 'language', 'owner', 'version'):
2392 setattr(obj, key, getvalue(words))
2393 elif key in ('notify', 'sasl', 'sieve'):
2394 setattr(obj, key, tuple(getvalue(words).casefold().split()))
2395 elif key == 'maxredirects':
2396 setattr(obj, key, int(getvalue(words)))
2397 elif key in ('starttls', 'unauthenticate'):
2398 setattr(obj, key, True)
2399 else:
2400 try:
2401 obj.notunderstood[key] = words[1]
2402 except IndexError:
2403 obj.notunderstood[key] = True
2404 return obj
2406 implementation: Optional[str] = None
2407 """Server application (e.g. "Dovecot Pigeonhole")."""
2409 sieve: tuple[str, ...] = ()
2410 """Supported Sieve modules."""
2412 language: Optional[str] = None
2413 """Natural language used for messages (:rfc:`5646` tag)."""
2415 maxredirects: Optional[int] = None
2416 """Maximum redirects per operation."""
2418 notify: tuple[str, ...] = ()
2419 """URI schema parts for supported notification methods."""
2421 owner: str = ''
2422 """Canonical name of the user whose scripts are managed."""
2424 sasl: tuple[str, ...] = ()
2425 """Supported authentication methods."""
2427 starttls: bool = False
2428 """Is "STARTTLS" available?"""
2430 unauthenticate: bool = False
2431 """Is "UNAUTHENTICATE" available?"""
2433 version: Optional[str] = None
2434 """ManageSieve protocol version."""
2436 notunderstood: dict = dataclasses.field(default_factory=dict)
2437 """Capabilities not understood by SieveManager."""
2440ResponseT = TypeVar('ResponseT', bound='Response')
2441"""Type variable for :class:`Response`."""
2444@dataclass(frozen=True)
2445class Response():
2446 """Server response to a command.
2448 .. seealso::
2449 :rfc:`5804` (secs. 1.2, 1.3, 4, 6.4, and passim)
2450 ManageSieve responses
2451 """
2453 @classmethod
2454 def fromline(cls: type[ResponseT], line: Line) -> ResponseT:
2455 """Create a :class:`Response` object from a :class:`Line`."""
2456 # pylint: disable=redefined-outer-name
2457 code = cls.code
2458 message = cls.message
2459 response = None
2460 for i, word in enumerate(line):
2461 if isinstance(word, Atom):
2462 if i == 0:
2463 response = word
2464 continue
2465 elif isinstance(word, str):
2466 if 1 <= i <= 2:
2467 message = word
2468 continue
2469 elif isinstance(word, Sequence):
2470 if i == 1:
2471 code = tuple(word)
2472 continue
2473 raise SieveProtocolError('Malformed response')
2474 if response is None:
2475 raise SieveProtocolError('Expected atom')
2476 return cls(response=response, code=code, message=message)
2478 def __str__(self) -> str:
2479 """:attr:`message` or, if no message was returned, a stub message."""
2480 return self.message if self.message else f'server says {self.response}'
2482 def matches(self, *categories: str) -> bool:
2483 """Check if :attr:`code` matches any of the given `categories`.
2485 Returns `False` if :attr:`code` is empty.
2486 Matching is case-insensitive.
2488 For example:
2490 >>> with open('script.sieve') as script:
2491 >>> try:
2492 >>> mgr.putscript(script, script.name)
2493 >>> except SieveOperationError as err:
2494 >>> if err.matches('QUOTA'):
2495 >>> print('over quota')
2497 Print more informative messages:
2499 >>> with open('script.sieve') as script:
2500 >>> try:
2501 >>> mgr.putscript(script, script.name)
2502 >>> except SieveOperationError as err:
2503 >>> if err.matches('QUOTA/MAXSCRIPTS'):
2504 >>> print('too many scripts')
2505 >>> elif err.matches('QUOTA/MAXSIZE'):
2506 >>> print(f'{script.name} is too large')
2507 >>> elif err.matches('QUOTA'):
2508 >>> print('over quota')
2509 """
2510 try:
2511 rescode = self.code[0]
2512 except IndexError:
2513 return False
2514 assert isinstance(rescode, str)
2515 for cat in categories:
2516 pattern = re.escape(cat.removesuffix('/')) + r'(/|$)'
2517 if re.match(pattern, rescode, flags=re.IGNORECASE):
2518 return True
2519 return False
2521 def toerror(self) -> 'SieveError':
2522 """Convert a :class:`Response` into an error."""
2523 cls = (SieveConnectionError if self.response == 'BYE' else
2524 SieveOperationError)
2525 return cls(self.response, self.code, self.message)
2527 response: Atom
2528 """'OK', 'NO', or 'BYE'.
2530 ======== ===========================
2531 Response Meaning
2532 ======== ===========================
2533 'OK' Success
2534 'NO' Failure
2535 'BYE' Connection closed by server
2536 ======== ===========================
2537 """
2539 code: tuple[Word, ...] = ()
2540 """Response code.
2542 ManageSieve response codes are lists of categories, separated by
2543 slashes ("/"), where each category is the super-category of the
2544 next (e.g., "quota/maxsize").
2546 Some response codes carry data (e.g., ``TAG "SYNC-123"``).
2548 See :rfc:`5804` (sec. 1.3) for a list of response codes.
2550 .. warning::
2551 Servers need *not* return response codes.
2552 """
2554 message: Optional[str] = None
2555 """Human-readable message.
2557 .. warning::
2558 Servers need *not* return a message.
2559 """
2562@dataclass(frozen=True)
2563class SRV():
2564 """DNS SRV record.
2566 .. seealso::
2567 :rfc:`2782`
2568 DNS SRV
2569 """
2571 priority: int
2572 weight: int
2573 host: str
2574 port: int
2577URLT = TypeVar('URLT', bound='URL')
2578"""Type variable for :class:`URL`."""
2581@dataclass(frozen=True)
2582class URL():
2583 """Sieve URL.
2585 .. seealso::
2586 :rfc:`5804` (sec. 3)
2587 Sieve URL Scheme
2588 """
2590 @classmethod
2591 def fromstr(cls: type[URLT], url: str) -> URLT:
2592 """Create a :class:`URL` object from a URL string.
2594 For example:
2596 >>> URL.fromstr('sieve://user@imap.foo.example')
2597 URL(hostname='imap.foo.example', scheme='sieve',
2598 username='user', password=None, port=None,
2599 owner=None, scriptname=None)
2601 Raises:
2602 ValueError: Not a valid Sieve URL.
2603 """
2604 if not re.match(r'([a-z][a-z0-9+.-]*:)?//', url):
2605 url = 'sieve://' + url
2606 parts = urllib.parse.urlsplit(url)
2607 if parts.query or parts.fragment:
2608 raise ValueError(f'{url}: Not a Sieve URL')
2609 if not parts.hostname:
2610 raise ValueError(f'{url}: No host')
2611 if not (isinetaddr(parts.hostname) or ishostname(parts.hostname)):
2612 raise ValueError(f'{parts.hostname}: Neither address nor hostname')
2613 try:
2614 owner, scriptname = parts.path.split('/', maxsplit=2)[1:]
2615 except ValueError:
2616 owner, scriptname = parts.path[1:], None
2617 return cls(
2618 scheme=parts.scheme,
2619 username=parts.username,
2620 password=parts.password,
2621 hostname=parts.hostname,
2622 port=parts.port,
2623 owner=owner if owner else None,
2624 scriptname=scriptname if scriptname else None
2625 )
2627 def __str__(self):
2628 """Get a string representation of the URL."""
2629 url = ''
2630 if self.scheme:
2631 url += f'{self.scheme}://'
2632 else:
2633 url += 'sieve://'
2634 if self.username:
2635 url += self.username
2636 if self.password is not None:
2637 url += f':{self.password}'
2638 url += '@'
2639 if self.hostname:
2640 url += self.hostname
2641 else:
2642 url += 'localhost'
2643 if self.port is not None:
2644 url += f':{self.port}'
2645 if self.owner:
2646 url += f'/{self.owner}'
2647 if self.scriptname:
2648 url += f'/{self.scriptname}'
2649 return url
2651 hostname: str
2652 scheme: str = 'sieve'
2653 username: Optional[str] = None
2654 password: Optional[str] = None
2655 port: Optional[int] = None
2656 owner: Optional[str] = None
2657 scriptname: Optional[str] = None
2660#
2661# Authentication
2662#
2664class BasePwdAuth(BaseAuth, ABC):
2665 """Base class for password-based authentication mechanisms.
2667 Prepares credentials, so that subclasses need only
2668 implement :meth:`exchange`. For example:
2670 .. literalinclude:: ../sievemgr.py
2671 :pyobject: PlainAuth
2672 """
2674 def __init__(self, connection: BaseSASLAdapter,
2675 authcid: str, password: str, authzid: str = '',
2676 prepare: SASLPrep = SASLPrep.ALL):
2677 """Prepare authentication.
2679 `authcid`, `password`, and `authzid` are prepared according to
2680 :rfc:`3454` and :rfc:`4013` if ``prepare & SASLPrep.USERNAMES``
2681 and/or ``prepare & SASLPrep.PASSWORDS`` respectively
2682 evaluate to true.
2684 Arguments:
2685 conn: Connection over which to authenticate.
2686 authcid: Authentication ID (user to login as).
2687 password: Password.
2688 authzid: Authorization ID (user whose rights to acquire).
2689 prepare: Which credentials to prepare.
2691 Raises:
2692 ValueError: Bad characters in username or password.
2693 """
2694 super().__init__(connection, authcid, authzid, prepare)
2695 prepare &= SASLPrep.PASSWORDS # type: ignore[assignment]
2696 self.password = self.prepare(password) if prepare else password
2698 password: str
2699 """Password."""
2702class BaseScramAuth(BasePwdAuth, ABC):
2703 """Base class for SCRAM authentication mechanisms.
2705 Implements :meth:`exchange`, so that subclasses need only define a digest.
2706 For example:
2708 .. literalinclude:: ../sievemgr.py
2709 :pyobject: ScramSHA1Auth
2711 .. seealso::
2712 :rfc:`5802`
2713 Salted Challenge Response Authentication Mechanism (SCRAM).
2714 :rfc:`7677`
2715 SCRAM-SHA-256 and SCRAM-SHA-256-PLUS.
2716 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-bis
2717 Updated recommendations for implementing SCRAM.
2718 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha-512
2719 SCRAM-SHA-512 and SCRAM-SHA-512-PLUS.
2720 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha3-512
2721 SCRAM-SHA3-512 and SCRAM-SHA3-512-PLUS.
2722 https://csb.stevekerrison.com/post/2022-01-channel-binding
2723 Discussion of TLS channel binding.
2724 https://csb.stevekerrison.com/post/2022-05-scram-detail
2725 Discussion of SCRAM.
2726 """
2728 def exchange(self):
2729 # Compare to
2730 # * https://github.com/stevekerrison/auth-examples
2731 # * https://github.com/horazont/aiosasl
2733 def todict(msg: bytes) -> dict[bytes, bytes]:
2734 return dict([a.split(b'=', maxsplit=1) for a in msg.split(b',')])
2736 def escape(b: bytes) -> bytes:
2737 return b.replace(b'=', b'=3D').replace(b',', b'=2C')
2739 # Parameters
2740 authcid = self.authcid.encode('utf8')
2741 authzid = self.authzid.encode('utf8')
2742 password = self.password.encode('utf8')
2743 chan_bind_type = self.cbtype.encode('utf8')
2744 chan_bind_data = self.cbdata
2745 c_nonce_len = self.noncelen
2746 digest = self.digest
2748 # Send client-first message
2749 chan_bind_attr = b'p=%s' % chan_bind_type if chan_bind_type else b'n'
2750 c_first_prefix = chan_bind_attr + b',' + escape(authzid)
2751 c_nonce = b64encode(secrets.token_bytes(c_nonce_len))
2752 c_first_bare = b'n=%s,r=%s' % (escape(authcid), c_nonce)
2753 c_first = c_first_prefix + b',' + c_first_bare
2754 self.send(c_first)
2756 # Receive server-first message
2757 try:
2758 s_first = self.receive()
2759 except SieveOperationError as err:
2760 # pylint: disable=bad-exception-cause (???)
2761 raise SASLCapabilityError(f'{self.name}: {err}') from err
2762 s_first_dict = todict(s_first)
2763 iters = int(s_first_dict[b'i'])
2764 s_nonce = s_first_dict[b'r']
2765 salt = b64decode(s_first_dict[b's'])
2767 # Send client-final message
2768 salted_pwd = hashlib.pbkdf2_hmac(digest, password, salt, iters)
2769 c_key = hmac.digest(salted_pwd, b'Client Key', digest)
2770 stored_key = hashlib.new(digest, c_key).digest()
2771 chan_bind = b64encode(c_first_prefix + b',' + chan_bind_data)
2772 c_final_prefix = b'c=%s,r=%s' % (chan_bind, s_nonce)
2773 auth_message = b','.join((c_first_bare, s_first, c_final_prefix))
2774 c_signature = hmac.digest(stored_key, auth_message, digest)
2775 c_proof = b64encode(bytes(a ^ b for a, b in zip(c_key, c_signature)))
2776 c_final = c_final_prefix + b',p=%s' % c_proof
2777 self.send(c_final)
2779 # Receive server-final message
2780 s_final = self.receive()
2781 s_key = hmac.digest(salted_pwd, b'Server Key', digest)
2782 s_signature = hmac.digest(s_key, auth_message, digest)
2783 s_final_dict = todict(s_final)
2784 if s_signature != b64decode(s_final_dict[b'v']):
2785 host = self.sock.getpeername()[0]
2786 raise SASLSecurityError(f'{host}: Verification failed')
2788 @property
2789 @abstractmethod
2790 def digest(self) -> str:
2791 """Digest name as used by :mod:`hashlib` and :mod:`hmac`."""
2793 cbtype: str = ''
2794 """TLS channel-binding type."""
2796 cbdata: bytes = b''
2797 """TLS channel-binding data."""
2799 noncelen: int = 18
2800 """Client nonce length in bytes."""
2803class BaseScramPlusAuth(BaseScramAuth, ABC):
2804 """Base class for SCRAM mechanisms with channel binding.
2806 For example:
2808 .. literalinclude:: ../sievemgr.py
2809 :pyobject: ScramSHA1PlusAuth
2810 """
2812 # Channel-binding is, for the most part, implemented in BaseScramAuth.
2813 def __init__(self, *args, **kwargs):
2814 super().__init__(*args, **kwargs)
2815 if not isinstance(self.sock, ssl.SSLSocket):
2816 raise SASLProtocolError('Non-TLS channel cannot be bound')
2817 for cbtype in ('tls-exporter', 'tls-unique', 'tls-server-endpoint'):
2818 if cbtype in ssl.CHANNEL_BINDING_TYPES:
2819 if cbdata := self.sock.get_channel_binding(cbtype):
2820 self.cbtype = cbtype
2821 self.cbdata = cbdata
2822 break
2823 else:
2824 raise TLSCapabilityError('No supported channel-binding type')
2827class AuthzUnsupportedMixin():
2828 """Mixin for SASL mechanisms that do not support authorization.
2830 For example:
2832 .. literalinclude: ../sievemgr.py
2833 :pyobject: LoginAuth
2834 """
2836 def __init__(self, *args, **kwargs):
2837 """Prepare authentication.
2839 Raises:
2840 SASLCapabilityError: :attr:`authzid` is set."""
2841 assert isinstance(self, BaseAuth)
2842 super().__init__(*args, **kwargs)
2843 if self.authzid:
2844 raise SASLCapabilityError(f'{self.name}: No authorization')
2847class CramMD5Auth(AuthzUnsupportedMixin, BasePwdAuth):
2848 """CRAM-MD5 authentication.
2850 .. seealso::
2851 :rfc:`2195` (sec. 2)
2852 Definition of CRAM-MD5.
2853 """
2855 def exchange(self):
2856 challenge = self.receive()
2857 password = self.password.encode('utf8')
2858 digest = hmac.new(password, challenge, hashlib.md5)
2859 data = ' '.join((self.authcid, digest.hexdigest()))
2860 self.send(data.encode('utf8'))
2862 obsolete = True
2863 name = 'CRAM-MD5'
2866class ExternalAuth(BaseAuth):
2867 """EXTERNAL authentication.
2869 .. seealso::
2870 :rfc:`4422` (App. A)
2871 Definition of the EXTERNAL mechanism.
2872 """
2874 def __call__(self):
2875 """Authenticate."""
2876 args = (self.authzid.encode('utf8'),) if self.authzid else ()
2877 self.begin(*args)
2878 self.receive()
2879 self.send(b'')
2880 self.end()
2882 def exchange(self):
2883 """No-op."""
2885 name = 'EXTERNAL'
2888class LoginAuth(AuthzUnsupportedMixin, BasePwdAuth):
2889 """LOGIN authentication.
2891 .. seealso::
2892 https://datatracker.ietf.org/doc/draft-murchison-sasl-login
2893 Definition of the LOGIN mechanism.
2894 """
2896 def __init__(self, *args, **kwargs):
2897 """Prepare authentication.
2899 Arguments:
2900 conn: Connection over which to authenticate.
2901 authcid: Authentication ID (user to login as).
2902 password: Password.
2903 authzid: Authorization ID (user whose rights to acquire).
2904 prepare: Which credentials to prepare.
2906 Raises:
2907 ValueError: Password contains CR, LF, or NUL.
2908 """
2909 super().__init__(*args, **kwargs)
2910 if {self.password} & {'\r', '\n', '\0'}:
2911 raise ValueError('Password contains CR, LF, or NUL')
2913 def exchange(self):
2914 self.receive()
2915 self.send(self.authcid.encode('utf8'))
2916 self.receive()
2917 self.send(self.password.encode('utf8'))
2919 obsolete = True
2920 name = 'LOGIN'
2923class PlainAuth(BasePwdAuth):
2924 """PLAIN authentication.
2926 .. seealso::
2927 :rfc:`4616`
2928 PLAIN authentication mechanism.
2929 """
2931 def exchange(self):
2932 data = '\0'.join((self.authzid, self.authcid, self.password))
2933 self.send(data.encode('utf8'))
2935 name = 'PLAIN'
2938class ScramSHA1Auth(BaseScramAuth):
2939 """SCRAM-SHA-1 authentication."""
2941 @property
2942 def digest(self) -> str:
2943 return 'sha1'
2945 name = 'SCRAM-SHA-1'
2946 order = -10
2949class ScramSHA1PlusAuth(BaseScramPlusAuth, ScramSHA1Auth):
2950 """SCRAM-SHA-1-PLUS authentication."""
2951 name = 'SCRAM-SHA-1-PLUS'
2952 order = -1000
2955class ScramSHA224Auth(BaseScramAuth):
2956 """SCRAM-SHA-224 authentication."""
2958 @property
2959 def digest(self) -> str:
2960 return 'sha224'
2962 name = 'SCRAM-SHA-224'
2963 order = -20
2966class ScramSHA224PlusAuth(BaseScramPlusAuth, ScramSHA224Auth):
2967 """SCRAM-SHA-224-PLUS authentication."""
2968 name = 'SCRAM-SHA-224-PLUS'
2969 order = -2000
2972class ScramSHA256Auth(BaseScramAuth):
2973 """SCRAM-SHA-256 authentication."""
2975 @property
2976 def digest(self) -> str:
2977 return 'sha256'
2979 name = 'SCRAM-SHA-256'
2980 order = -30
2983class ScramSHA256PlusAuth(BaseScramPlusAuth, ScramSHA256Auth):
2984 """SCRAM-SHA-256-PLUS authentication."""
2985 name = 'SCRAM-SHA-256-PLUS'
2986 order = -3000
2989class ScramSHA384Auth(BaseScramAuth):
2990 """SCRAM-SHA-384 authentication."""
2992 @property
2993 def digest(self) -> str:
2994 return 'sha384'
2996 name = 'SCRAM-SHA-384'
2997 order = -40
3000class ScramSHA384PlusAuth(BaseScramPlusAuth, ScramSHA384Auth):
3001 """SCRAM-SHA-384-PLUS authentication."""
3002 name = 'SCRAM-SHA-384-PLUS'
3003 order = -4000
3006class ScramSHA512Auth(BaseScramAuth):
3007 """SCRAM-SHA-512 authentication."""
3009 @property
3010 def digest(self):
3011 return 'sha512'
3013 name = 'SCRAM-SHA-512'
3014 order = -50
3017class ScramSHA512PlusAuth(BaseScramPlusAuth, ScramSHA512Auth):
3018 """SCRAM-SHA-512-PLUS authentication."""
3019 name = 'SCRAM-SHA-512-PLUS'
3020 order = -5000
3023# pylint: disable=invalid-name
3024class ScramSHA3_512Auth(BaseScramAuth):
3025 """SCRAM-SHA-512 authentication."""
3027 @property
3028 def digest(self):
3029 return 'sha3_512'
3031 name = 'SCRAM-SHA3-512'
3032 order = -60
3035# pylint: disable=invalid-name
3036class ScramSHA3_512PlusAuth(BaseScramPlusAuth, ScramSHA3_512Auth):
3037 """SCRAM-SHA-512-PLUS authentication."""
3038 name = 'SCRAM-SHA3-512-PLUS'
3039 order = -6000
3042#
3043# SieveManager Shell
3044#
3046class BaseShell():
3047 """Base class for interactive shells.
3049 :class:`BaseShell` is similar-ish to :class:`cmd.Cmd`. However, lines read
3050 from standard input are :meth:`expanded <expand>` before being passed to
3051 methods that implement commands. :class:`BaseShell` also provides an
3052 extensible :meth:`completion system <complete>` as well as a built-in
3053 :meth:`help system <do_help>`. :attr:`aliases` can be extended, too.
3055 Define a :samp:`do_{command}` method to add `command`.
3057 For example:
3059 >>> class Calculator(BaseShell):
3060 >>> def do_add(self, n, m):
3061 >>> \"\"\"add n m - add n and m\"\"\"
3062 >>> print(n + m)
3063 >>>
3064 >>> def do_sum(self, *numbers):
3065 >>> \"\"\"sum [n ...] - \"\"\"
3066 >>> print sum(numbers)
3067 >>>
3068 >>> aliases = {
3069 >>> '+': 'sum'
3070 >>> }
3071 >>>
3072 >>> calc = Calculator()
3073 >>> calc.executeline('add 0 1')
3074 1
3075 >>> calc.executeline('sum 0 1 2')
3076 3
3077 >>> calc.executeline('+ 0 1 2 3')
3078 6
3079 >>> calc.executeline('add 0')
3080 usage: add n m
3081 >>> calc.executeline('help add')
3082 add n m - add n and m
3083 """
3085 # Shell behaviour
3086 def __init__(self):
3087 """Initialize a :class:`BaseShell` object."""
3088 self.commands = tuple(self.getcommands())
3090 @staticmethod
3091 def columnize(words: Sequence[str], file: TextIO = sys.stdout,
3092 width: int = shutil.get_terminal_size().columns):
3093 """Print `words` in columns to `file`.
3095 Arguments:
3096 words: Words.
3097 file: Output file.
3098 width: Terminal width in chars.
3099 """
3100 if (nwords := len(words)) > 0:
3101 colwidth = max(map(len, words)) + 1
3102 maxncols = max((width + 1) // colwidth, 1)
3103 nrows = math.ceil(nwords / maxncols)
3104 rows: tuple[list[str], ...] = tuple([] for _ in range(nrows))
3105 for i, word in enumerate(words):
3106 rows[i % nrows].append(word)
3107 for row in rows:
3108 line = ''.join([col.ljust(colwidth) for col in row])
3109 print(line.rstrip(), file=file)
3111 def complete(self, text: str, n: int) -> Optional[str]:
3112 """Completion function for :func:`readline.set_completer`.
3114 Tab-completion for command names (e.g., "exit") is built in. To
3115 enable tab-completion for the arguments of some `command`, define
3116 a method :samp:`complete_{command}`, which takes the index of the
3117 argument that should be completed and the given `text` and returns
3118 a sequence of :class:`str`-:class:`bool` pairs, where the string
3119 is a completion and the boolean indicates whether a space should
3120 be appended to that completion.
3122 For example:
3124 >>> class Foo(BaseShell)
3125 >>> def do_cmd(self, arg1, arg2):
3126 >>> pass
3127 >>>
3128 >>> def complete_cmd(self, argidx, text):
3129 >>> if argidx == 1:
3130 >>> return [(s, True) for s in ('foo', 'bar', 'baz')]
3131 >>> if argidx == 2:
3132 >>> return [('quux/', False)]
3133 >>> return []
3134 >>>
3135 >>> foo = Foo()
3136 >>> foo.complete('', 0)
3137 'cmd '
3138 >>> foo.complete('', 1)
3139 None
3140 >>> foo.complete('cmd ', 0)
3141 'foo '
3142 >>> foo.complete('cmd ', 1)
3143 'bar '
3144 >>> foo.complete('cmd ', 2)
3145 'baz '
3146 >>> foo.complete('cmd ', 3)
3147 None
3148 >>> foo.complete('cmd b', 0)
3149 'bar '
3150 >>> foo.complete('cmd b', 1)
3151 'baz '
3152 >>> foo.complete('cmd b', 2)
3153 None
3154 >>> foo.complete('cmd bar ', 0)
3155 'quux/'
3156 >>> foo.complete('cmd bar ', 1)
3157 None
3159 Arguments:
3160 text: Possibly partial word to be completed.
3161 n: Index of the completion to return.
3163 Returns:
3164 Either the `n`-th completion for `text` or
3165 ``None`` if there is no such completion.
3167 .. admonition:: Side-effects
3169 * Logging of messages with a priority lower than
3170 :data:`logging.ERROR` is suppressed during tab-completion.
3171 * A BEL is printed to the controlling terminal if
3172 no completion for `text` is found.
3173 """
3174 if n == 0:
3175 logger = self.logger
3176 loglevel = logger.level
3177 if loglevel < logging.ERROR:
3178 logger.setLevel(logging.ERROR)
3179 try:
3180 if args := self._getargs():
3181 command = self.aliases.get(args[0], args[0])
3182 try:
3183 complete = getattr(self, f'complete_{command}')
3184 except AttributeError:
3185 complete = None
3186 self._completions = []
3187 self._completions = sorted(
3188 shlex.quote(word) + (' ' if space else '')
3189 for word, space in complete(len(args), text)
3190 if fnmatch.fnmatchcase(word, text + '*')
3191 ) if complete else []
3192 else:
3193 self._completions = sorted(c + ' ' for c in self.commands
3194 if c.startswith(text))
3195 if text and not self._completions:
3196 bell()
3197 finally:
3198 logger.setLevel(loglevel)
3199 try:
3200 return self._completions[n]
3201 except IndexError:
3202 return None
3204 @staticmethod
3205 def confirm(prompt: str, default: ConfirmEnum = ConfirmEnum.NO,
3206 multi: bool = False, attempts: int = 3) -> ConfirmEnum:
3207 """Prompt the user for confirmation.
3209 Arguments:
3210 prompt: Prompt.
3211 default: Default.
3212 multi: Give choices "all" and "none"?
3213 attempts: How often to try before raising a :exc:`ValueError`.
3215 Raises:
3216 ValueError: Unrecognized answer.
3218 .. note::
3219 Prompts are printed to/answers are read from :file:`/dev/tty`,
3220 regardless of I/O redirection.
3221 """
3222 assert attempts > 0
3223 with TermIO() as tty:
3224 for _ in range(attempts):
3225 tty.write(prompt + ' ')
3226 answer = tty.readline().strip().casefold()
3227 if answer == '':
3228 return default
3229 if answer in ('y', 'yes'):
3230 return ConfirmEnum.YES
3231 if answer in ('n', 'no'):
3232 return ConfirmEnum.NO
3233 if multi:
3234 if answer == 'all':
3235 return ConfirmEnum.ALL
3236 if answer == 'none':
3237 return ConfirmEnum.NONE
3238 tty.write('Enter "yes", "no", "all", or "none"\n')
3239 else:
3240 tty.write('Enter "yes" or "no"\n')
3241 raise ValueError('Too many retries')
3242 # NOTREACHED
3244 def enter(self) -> int:
3245 """Start reading commands from standard input.
3247 Reading stops at the end of the file or when
3248 a command raises :exc:`StopIteration`.
3250 .. admonition:: Side-effects
3252 Entering the shell binds :kbd:`Tab` to :func:`readline.complete`.
3253 """
3254 hasatty = self.hasatty()
3255 info = self.logger.info
3256 if hasatty:
3257 oldcompleter = readline.get_completer()
3258 olddelims = readline.get_completer_delims()
3259 readline.set_auto_history(True)
3260 readline.set_completer(self.complete)
3261 readline.set_completer_delims(' ')
3262 readline.parse_and_bind('tab: complete')
3263 if readline.__doc__ and 'libedit' in readline.__doc__:
3264 readline.parse_and_bind('python:bind ^I rl_complete')
3265 info('Enter "? [command]" for help and "exit" to exit')
3266 self.retval = 0
3267 try:
3268 while True:
3269 try:
3270 if hasatty:
3271 print(self.getprompt(), end='')
3272 line = input().strip()
3273 except EOFError:
3274 break
3275 try:
3276 self.retval = self.executeline(line)
3277 except StopIteration:
3278 break
3279 if self.retval != 0 and not hasatty:
3280 break
3281 finally:
3282 if hasatty:
3283 readline.set_completer(oldcompleter)
3284 readline.set_completer_delims(olddelims)
3285 return self.retval
3287 def execute(self, command: str, *args: str) -> int:
3288 """Execute `command`.
3290 For example:
3292 >>> shell.execute('ls', 'foo', 'bar')
3293 0
3295 Arguments:
3296 command: Command name.
3297 args: Arguments to the command.
3299 Returns:
3300 Return value.
3302 Raises:
3303 ShellUsageError: Command not found or arguments invalid.
3305 .. note::
3306 Patterns are expanded by :meth:`executeline`.
3307 Patterns passed to :meth:`execute` are
3308 treated as ordinary characters.
3309 """
3310 name = self.aliases.get(command, command)
3311 try:
3312 method = getattr(self, f'do_{name}')
3313 except AttributeError as err:
3314 raise ShellUsageError(f'{command}: No such command') from err
3315 try:
3316 inspect.signature(method).bind(*args)
3317 except TypeError as err:
3318 raise ShellUsageError(self.getusage(method)) from err
3319 self.retval = 0 if (retval := method(*args)) is None else retval
3320 return self.retval
3322 def executeline(self, line: str) -> int:
3323 """:meth:`Expand <expand>` `line` and :meth:`execute` it.
3325 For example:
3327 >>> shell.executeline('ls foo bar')
3328 0
3330 Returns:
3331 Return value.
3333 Raises:
3334 ShellUsageError: Command not found or arguments invalid.
3335 """
3336 if match := re.match(r'\s*([^\w\s#_]+)\s*(.*)', line):
3337 command = match.group(1)
3338 args = self.expand(match.group(2))
3339 else:
3340 try:
3341 command, *args = self.expand(line)
3342 except ValueError:
3343 return 0
3344 return self.execute(command, *args)
3346 def executescript(self, script: TextIO) -> int:
3347 """Split `script` into lines and :meth:`execute <excuteline>` them.
3349 For example:
3351 >>> with open('scriptfile') as scriptfile:
3352 >>> shell.executescript(scriptfile)
3353 0
3355 Returns:
3356 Return value.
3358 Raises:
3359 ShellUsageError: Command not found or arguments invalid.
3360 """
3361 self.retval = 0
3362 for line in script:
3363 try:
3364 self.retval = self.executeline(line)
3365 except StopIteration:
3366 break
3367 if self.retval != 0:
3368 break
3369 return self.retval
3371 def expand(self, line: str) -> list[str]:
3372 """Expand the words that comprise `line`.
3374 Similar to :manpage:`wordexp(3)`. Patterns are expanded using
3375 :func:`fnmatch.fnmatchcase`, with filenames being provided by
3376 the :meth:`completion system <complete>`.
3378 For example:
3380 >>> class Foo(BaseShell)
3381 >>> def complete_cmd(self, _, text):
3382 >>> return [(s, True) for s in ('foo', 'bar', 'baz')]
3383 >>>
3384 >>> foo = Foo()
3385 >>> foo.expand('cmd *')
3386 ['cmd', 'foo', 'bar', 'baz']
3388 Raises:
3389 ShellOperationError: Pattern does not match.
3390 """
3391 try:
3392 command, *args = self.split(line)
3393 except ValueError:
3394 return []
3395 try:
3396 complete = getattr(self, f'complete_{command}')
3397 except AttributeError:
3398 def complete(_argidx, _text):
3399 raise ShellUsageError(f'{command} does not accept patterns')
3400 expanded: list[str] = [command]
3401 for i, arg in enumerate(args, start=1):
3402 if isinstance(arg, ShellPattern):
3403 completions = (c for c, _ in complete(i, '')) # type: ignore
3404 matches = sorted(arg.expand(completions))
3405 if not matches:
3406 raise ShellOperationError(f'{arg}: No matches')
3407 expanded.extend(matches)
3408 else:
3409 expanded.append(arg)
3410 return expanded
3412 @classmethod
3413 def getcommands(cls) -> Iterator[str]:
3414 """Get the shell commands provided by `cls`.
3416 For example:
3418 >>> class Foo(BaseShell)
3419 >>> def do_cmd(self, arg1, arg2):
3420 >>> pass
3421 >>>
3422 >>> tuple(Foo.getcommands())
3423 ('cmd', 'exit', 'help')
3424 >>> foo = Foo()
3425 >>> foo.commands
3426 ('cmd', 'exit', 'help')
3427 """
3428 for attr in dir(cls):
3429 with suppress(ValueError):
3430 prefix, name = attr.split('_', maxsplit=1)
3431 if prefix == 'do' and name:
3432 yield name
3434 def getprompt(self) -> str:
3435 """Get the shell prompt.
3437 .. tip::
3439 Override this class to change the prompt.
3440 """
3441 return '> '
3443 @staticmethod
3444 def getusage(func: Callable) -> Optional[str]:
3445 """Derive a usage message from `func`'s docstring.
3447 :func:`getusage` assumes that the docstring has the form
3448 :samp:`{command} {args} - {description}`. The usage message
3449 is either the text up to the last dash ("-") or, if the
3450 docstring does not contain a dash, the whole docstring.
3451 Leading and trailing whitespace is stripped.
3453 For example:
3455 >>> def frobnicate(foo, bar):
3456 >>> \"\"\"frobnicate foo bar - frobnicate foo with bar\"\"\"
3457 >>> ...
3458 >>>
3459 >>> BaseShell.getusage(frobnicate)
3460 'frobnicate foo bar'
3461 """
3462 if (doc := func.__doc__) and (syn := doc.rsplit('-', 1)[0].strip()):
3463 return 'usage: ' + syn
3464 return None
3466 @staticmethod
3467 def hasatty() -> bool:
3468 """Is standard input a terminal?"""
3469 try:
3470 return os.isatty(sys.stdin.fileno())
3471 except io.UnsupportedOperation:
3472 return False
3474 @staticmethod
3475 def split(line: str) -> list['ShellWord']:
3476 """Split `line` into words in the way a POSIX-compliant shell would.
3478 Raises:
3479 ShellDataError: Quotation marks are unbalanced.
3480 """
3481 addempty: bool = False
3482 buffer: ShellWord = ''
3483 escaped: bool = False
3484 tokens: list[ShellWord] = []
3485 quoted: str = ''
3486 ispattern: bool = False
3488 unquotepattern = re.compile(r'\[(.)\]').subn
3489 def addtoken():
3490 if buffer or addempty:
3491 token = (ShellPattern(buffer) if ispattern else
3492 unquotepattern(r'\1', buffer)[0])
3493 tokens.append(token)
3495 for i, char in enumerate(line):
3496 if quoted:
3497 if escaped:
3498 if char in ('\\', '"', '\n'):
3499 buffer += char
3500 else:
3501 buffer += f'\\{char}'
3502 escaped = False
3503 elif char == quoted:
3504 quoted = ''
3505 elif char == '\\' and quoted == '"':
3506 escaped = True
3507 else:
3508 buffer += f'[{char}]' if char in '*?[' else char
3509 elif escaped:
3510 buffer += f'[{char}]' if char in '*?[' else char
3511 escaped = False
3512 elif char in ('"', "'"):
3513 quoted = char
3514 addempty = True
3515 elif char == '\\':
3516 escaped = True
3517 elif char.isspace():
3518 try:
3519 if line[i + 1].isspace():
3520 continue
3521 except IndexError:
3522 break
3523 addtoken()
3524 buffer = ''
3525 addempty = False
3526 ispattern = False
3527 elif char == '#':
3528 break
3529 else:
3530 if char in '*?[':
3531 ispattern = True
3532 buffer += char
3533 if quoted:
3534 raise ShellDataError('Unbalanced quotes')
3535 addtoken()
3536 return tokens
3538 # Basic commands
3539 def do_exit(self):
3540 """exit - exit the shell"""
3541 raise StopIteration()
3543 def do_help(self, name: Optional[str] = None):
3544 """help [command] - list commands/show help for command"""
3545 if name:
3546 try:
3547 print(getattr(self, f'do_{name}').__doc__)
3548 except AttributeError as err:
3549 raise ShellUsageError(f'{name}: Unknown command') from err
3550 else:
3551 self.columnize(self.commands)
3553 # Completers
3554 def complete_help(self, *_) -> tuple[tuple[str, bool], ...]:
3555 """Completer for help."""
3556 return tuple((c, True) for c in self.commands)
3558 # Private methods
3559 @classmethod
3560 def _getargs(cls) -> list[ShellWord]:
3561 """:meth:`Split <split>` line before current completion scope."""
3562 begin = readline.get_begidx()
3563 buffer = readline.get_line_buffer()
3564 return cls.split(buffer[:begin])
3566 # Attributes
3567 aliases: dict[str, str] = {
3568 '!': 'sh',
3569 '?': 'help'
3570 }
3571 """Mapping of aliases to :attr:`commands`."""
3573 commands: tuple[str, ...]
3574 """Shell commands. Populated by :meth:`__init__`."""
3576 logger: logging.Logger = logging.getLogger(__name__)
3577 """Logger.
3579 Messages are logged with the following priorities:
3581 ====================== ======================================
3582 Priority Used for
3583 ====================== ======================================
3584 :const:`logging.INFO` Help message when the shell is entered
3585 ====================== ======================================
3586 """
3588 retval: int = 0
3589 """Return value of the most recently completed command."""
3591 _completions: list[str] = []
3592 """Most recent completions."""
3594 _unescpattern: ClassVar[re.Pattern] = re.compile(r'\\([*?\[\]])')
3595 """Regular expression that un-escapes fnmatch patterns."""
3598# pylint: disable=too-many-public-methods
3599class SieveShell(BaseShell):
3600 """Shell around a `SieveManager` connection."""
3602 def __init__(self, manager: 'SieveManager', clobber: bool = True,
3603 confirm: ShellCmd = ShellCmd.ALL):
3604 """Initialize a :class:`SieveShell` object.
3606 Arguments:
3607 manager: Connection to a ManageSieve server.
3608 clobber: Overwrite files?
3609 confirm: Shell commands that require confirmation.
3610 """
3611 super().__init__()
3612 self.clobber = clobber
3613 self.reqconfirm = confirm
3614 self.manager = manager
3616 # Methods
3617 def getprompt(self, *args, **kwargs):
3618 prompt = super().getprompt(*args, **kwargs)
3619 mgr = self.manager
3620 if mgr and (url := mgr.geturl()) is not None:
3621 prompt = ('' if mgr.tls else '(insecure) ') + str(url) + prompt
3622 return prompt
3624 def enter(self) -> int:
3625 # pylint: disable=redefined-outer-name
3626 error = self.logger.error
3627 while True:
3628 try:
3629 return super().enter()
3630 except (ConnectionError, DNSError, ProtocolError, SecurityError):
3631 raise
3632 # pylint: disable=broad-exception-caught
3633 except Exception as err:
3634 if not self.hasatty():
3635 raise
3636 if isinstance(err, (GetoptError, UsageError)):
3637 error('%s', err)
3638 self.retval = 2
3639 elif isinstance(err, subprocess.CalledProcessError):
3640 error('%s exited with status %d',
3641 err.cmd[0], err.returncode)
3642 self.retval = 1
3643 elif isinstance(err, (FileNotFoundError, FileExistsError)):
3644 assert err.errno
3645 error('%s: %s', err.filename, os.strerror(err.errno))
3646 self.retval = 1
3647 elif isinstance(err, OSError):
3648 assert err.errno
3649 error('%s', os.strerror(err.errno))
3650 self.retval = 1
3651 elif isinstance(err, (Error, ValueError)):
3652 for line in str(err).splitlines():
3653 error('%s', line)
3654 self.retval = 1
3655 else:
3656 raise
3657 # NOTREACHED
3659 def execute(self, *args, **kwargs) -> int:
3660 retval = super().execute(*args, **kwargs)
3661 if warning := self.manager.warning:
3662 for line in warning.splitlines():
3663 self.logger.warning('%s', escapectrl(line))
3664 return retval
3666 def editscripts(self, editor: list[str], *args: str):
3667 """Edit scripts with the given `editor`."""
3668 mgr = self.manager
3670 def retry(err: Exception, script: str) -> bool:
3671 print(str(err).rstrip('\r\n'), file=sys.stderr)
3672 return bool(self.confirm(f'Re-edit {script}?',
3673 default=ConfirmEnum.YES))
3675 (opts, scripts) = getopt(list(args), 'a')
3676 for opt, _ in opts:
3677 if opt == '-a':
3678 scripts = [active] if (active := mgr.getactive()) else []
3679 if scripts:
3680 mgr.editscripts(editor, list(scripts), catch=retry)
3681 else:
3682 self.logger.error('No scripts given')
3684 # Shell commands
3685 def do_activate(self, script: str):
3686 """activate script - mark script as active"""
3687 self.manager.setactive(script)
3689 # pylint: disable=redefined-loop-name
3690 def do_caps(self):
3691 """caps - show server capabilities"""
3692 print('---')
3693 if (caps := self.manager.capabilities) is not None:
3694 for key, value in caps.__dict__.items():
3695 if not value:
3696 continue
3697 if key in ('implementation', 'language',
3698 'maxredirects', 'owner', 'version'):
3699 if isinstance(value, str):
3700 value = yamlescape(value)
3701 print(f'{key}: {value}')
3702 elif key in ('notify', 'sasl', 'sieve'):
3703 items = [yamlescape(i) if isinstance(i, str) else str(i)
3704 for i in value]
3705 # pylint: disable=consider-using-f-string
3706 print('{}: [{}]'.format(key, ', '.join(items)))
3707 elif key in ('starttls', 'unauthenticate'):
3708 print(f'{key}: yes')
3709 for key, value in caps.notunderstood.items():
3710 if value is not None:
3711 if isinstance(value, str):
3712 value = yamlescape(value)
3713 print(f'{key}: {value}')
3714 print('...')
3716 def do_cat(self, *scripts: str):
3717 """cat [script ...] - concatenate scripts on standard output"""
3718 for script in scripts:
3719 sys.stdout.write(self.manager.getscript(script))
3721 def do_cd(self, localdir: str = HOME):
3722 """cd [localdir] - change local directory"""
3723 os.chdir(localdir)
3724 self.logger.info('Changed directory to %s', localdir)
3726 # pylint: disable=redefined-loop-name
3727 def do_cert(self):
3728 """cert - show the server's TLS certificate."""
3729 indent = ' ' * 4
3730 mgr = self.manager
3731 if not isinstance(mgr.sock, ssl.SSLSocket):
3732 raise ShellUsageError('Not a secure connection')
3733 cert = mgr.sock.getpeercert()
3734 assert cert
3735 value: Any
3736 print('---')
3737 for key, value in sorted(cert.items()):
3738 if key in ('OCSP', 'caIssuers', 'crlDistributionPoints'):
3739 print(f'{key}:')
3740 for item in sorted(value):
3741 if isinstance(item, str):
3742 item = yamlescape(item)
3743 print(f'{indent}- {item}')
3744 elif key in ('issuer', 'subject'):
3745 print(f'{key}:')
3746 for pairs in sorted(value):
3747 print(f'{indent}- ', end='')
3748 for i, (k, v) in enumerate(pairs):
3749 if isinstance(v, str):
3750 v = yamlescape(v)
3751 if i == 0:
3752 print(f'{k}: {v}')
3753 else:
3754 print(f'{indent} {k}: {v}\n')
3755 elif key == 'subjectAltName':
3756 print(f'{key}:')
3757 for k, v in sorted(value):
3758 v = yamlescape(v)
3759 print(f'{indent}- {k}: {v}')
3760 elif isinstance(value, str):
3761 value = yamlescape(value)
3762 print(f'{key}: {value}')
3763 elif isinstance(value, int):
3764 print(f'{key}: {value}')
3765 print('...')
3767 def do_check(self, localscript: str):
3768 """check localscript - check whether localscript is valid"""
3769 with open(localscript, 'rb', encoding='utf8') as file:
3770 # checkscript performs a blocking send/sendline,
3771 # but holding a lock should be okay in an interactive app.
3772 fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB)
3773 self.manager.checkscript(file)
3774 self.logger.info('%s is valid', localscript)
3776 def do_cmp(self, *args: str) -> int:
3777 """cmp [-s] script1 [...] scriptN - compare scripts"""
3778 silent = False
3779 opts, scripts = getopt(list(args), 's')
3780 for opt, _ in opts:
3781 if opt == '-s':
3782 silent = True
3783 if len(scripts) < 2:
3784 message = self.getusage(self.do_cmp)
3785 assert message
3786 raise ShellUsageError(message)
3787 prefix = ', '.join(scripts)
3788 contents = map(self.manager.getscript, scripts)
3789 iters = map(str.splitlines, contents)
3790 for i, lines in enumerate(itertools.zip_longest(*iters), start=1):
3791 for j, chars in enumerate(itertools.zip_longest(*lines), start=1):
3792 char1 = chars[0]
3793 for char2 in chars[1:]:
3794 if char1 != char2:
3795 if not silent:
3796 print(f'{prefix}: line {i}, column {j} differs')
3797 return 1
3798 if not silent:
3799 print(f'{prefix}: equal')
3800 return 0
3802 def do_cp(self, *args: str):
3803 """cp [-f|-i] source target - re-upload source as target"""
3804 clobber = self.clobber
3805 confirm = bool(self.reqconfirm & ShellCmd.CP)
3806 opts, scripts = getopt(list(args), 'Cfi')
3807 for opt, _ in opts:
3808 if opt == '-f':
3809 clobber = True
3810 confirm = False
3811 elif opt == '-i':
3812 clobber = True
3813 confirm = True
3814 try:
3815 source, target = scripts
3816 except ValueError as err:
3817 message = self.getusage(self.do_cp)
3818 assert message
3819 raise ShellUsageError(message) from err
3820 if self.manager.scriptexists(target):
3821 if not clobber:
3822 raise FileExistsError(EEXIST, os.strerror(EEXIST), target)
3823 if confirm and not self.confirm(f'Overwrite {target}?'):
3824 return
3825 self.manager.copyscript(source, target)
3827 def do_deactivate(self):
3828 """deactivate - deactivate the active script"""
3829 self.manager.unsetactive()
3831 def do_diff(self, *args: str) -> int:
3832 """diff <options> script1 script2 - show how scripts differ"""
3833 pairs, scripts = getopt(list(args), 'C:U:bcu')
3834 opts = tuple(filter(bool, itertools.chain(*pairs)))
3835 if len(scripts) != 2:
3836 message = self.getusage(self.do_diff)
3837 assert message
3838 raise ShellUsageError(message)
3839 cp = self.manager.editscripts(['diff', *opts], scripts, check=False)
3840 return cp.returncode
3842 def do_echo(self, *args: str):
3843 """echo word [...] - print words to standard output."""
3844 print(*args)
3846 def do_ed(self, *args: str):
3847 """ed [-a] script [...] - edit scripts with a line editor"""
3848 self.editscripts(EDITOR, *args)
3850 def do_get(self, *args: str):
3851 """get [-a] [-f|-i] [-o file] [script ...] - download script"""
3852 mgr = self.manager
3853 opts, sources = getopt(list(args), 'afio:')
3854 output = ''
3855 clobber = self.clobber
3856 confirm = bool(self.reqconfirm & ShellCmd.GET)
3857 multi = len(sources) > 1
3858 for opt, arg in opts:
3859 if opt == '-a':
3860 sources = [active] if (active := mgr.getactive()) else []
3861 elif opt == '-f':
3862 clobber = True
3863 confirm = False
3864 elif opt == '-i':
3865 clobber = True
3866 confirm = True
3867 elif opt == '-o':
3868 if multi:
3869 raise ShellUsageError('-o: Too many sources')
3870 output = arg
3871 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
3872 keep = mgr.backup
3873 for src in sources:
3874 # Try not to make a backup if the source doesn't exist.
3875 # Writing to a temporary file would create a worse race condition.
3876 if keep > 0:
3877 if not mgr.scriptexists(src):
3878 raise FileNotFoundError(ENOENT, os.strerror(ENOENT), src)
3879 targ = output if output else src
3880 flags = O_CREAT | O_EXCL | O_WRONLY | O_TRUNC
3881 if path.exists(targ):
3882 if not clobber:
3883 raise FileExistsError(EEXIST, os.strerror(EEXIST), targ)
3884 if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE):
3885 answer = self.confirm(f'Overwrite {targ}?', multi=multi)
3886 if answer:
3887 def getfiles() -> Iterator[str]:
3888 # pylint: disable=cell-var-from-loop
3889 return readdir(path.dirname(targ), path.isfile)
3890 backup(targ, keep, getfiles, shutil.copy, os.remove)
3891 flags &= ~O_EXCL
3892 else:
3893 continue
3894 fd = os.open(targ, flags, mode=0o644)
3895 # getscript performs a blocking read,
3896 # but holding a lock should be okay in an interactive app.
3897 fcntl.flock(fd, LOCK_SH | LOCK_NB)
3898 with os.fdopen(fd, 'w', encoding='utf8') as file:
3899 file.write(mgr.getscript(src))
3900 self.logger.info('Downloaded %s as %s', src, targ)
3902 def do_ls(self, *args: str):
3903 """ls [-1al] [script ...] - list scripts"""
3904 active = False
3905 long = False
3906 one = not self.hasatty()
3907 (opts, fnames) = getopt(list(args), '1al')
3908 for opt, _ in opts:
3909 if opt == '-1':
3910 one = True
3911 elif opt == '-a':
3912 active = True
3913 elif opt == '-l':
3914 long = True
3915 if active:
3916 if script := self.manager.getactive():
3917 print(script)
3918 else:
3919 self.logger.warning('No active script')
3920 else:
3921 scripts = self.manager.listscripts()
3922 if fnames:
3923 existing = {fname for fname, _ in scripts}
3924 for name in set(fnames) - existing:
3925 raise FileNotFoundError(ENOENT, os.strerror(ENOENT), name)
3926 scripts = [s for s in scripts if s[0] in fnames]
3927 scripts.sort()
3928 if long:
3929 for fname, active in scripts:
3930 print('a' if active else '-', fname)
3931 print('e')
3932 elif one:
3933 for script, _ in scripts:
3934 print(script)
3935 else:
3936 words = [f + ('*' if a else '') for f, a in scripts]
3937 self.columnize(words)
3939 def do_more(self, *args: str):
3940 """more <options> script [...] - display scripts page-by-page."""
3941 mgr = self.manager
3942 pairs, scripts = getopt(list(args), 'aceis')
3943 opts = list(filter(bool, itertools.chain(*pairs)))
3944 if '-a' in opts:
3945 active = mgr.getactive()
3946 if active is None:
3947 self.logger.warning('No active script')
3948 return
3949 scripts = [active]
3950 opts.remove('-a')
3951 elif not scripts:
3952 message = self.getusage(self.do_more)
3953 assert message
3954 raise ShellUsageError(message)
3955 mgr.editscripts(PAGER + opts, list(scripts))
3957 def do_mv(self, *args: str):
3958 """mv [-f|-i] source target - rename source to target"""
3959 clobber = self.clobber
3960 confirm = bool(self.reqconfirm & ShellCmd.MV)
3961 mgr = self.manager
3962 opts, scripts = getopt(list(args), 'Cfi')
3963 for opt, _ in opts:
3964 if opt == '-f':
3965 clobber = True
3966 confirm = False
3967 elif opt == '-i':
3968 clobber = True
3969 confirm = True
3970 try:
3971 source, target = scripts
3972 except ValueError as err:
3973 message = self.getusage(self.do_mv)
3974 assert message
3975 raise ShellUsageError(message) from err
3976 if mgr.scriptexists(target):
3977 if not clobber:
3978 raise FileExistsError(EEXIST, os.strerror(EEXIST), target)
3979 if confirm and not self.confirm(f'Overwrite {target}?'):
3980 return
3981 mgr.backupscript(target)
3982 mgr.deletescript(target)
3983 mgr.renamescript(source, target, emulate=True)
3985 def do_put(self, *args: str):
3986 """put [-f|-i] [-a] [-o name] [localscript ...] - upload scripts"""
3987 active: Optional[str] = None
3988 clobber: bool = self.clobber
3989 confirm: bool = bool(self.reqconfirm & ShellCmd.PUT)
3990 mgr: SieveManager = self.manager
3991 output: str = ''
3992 activate: bool = False
3993 opts, sources = getopt(list(args), 'Cafio:')
3994 multi = len(sources) > 1
3995 for opt, arg in opts:
3996 if opt == '-a':
3997 if multi:
3998 raise ShellUsageError('-a: Only one script can be active')
3999 active = mgr.getactive()
4000 activate = True
4001 if active:
4002 output = active
4003 elif opt == '-f':
4004 clobber = True
4005 confirm = False
4006 elif opt == '-i':
4007 clobber = True
4008 confirm = True
4009 elif opt == '-o':
4010 if multi:
4011 raise ShellUsageError('-o: Too many sources')
4012 output = arg
4013 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
4014 for src in sources:
4015 targ = output if output else src
4016 if mgr.scriptexists(targ):
4017 if not clobber:
4018 raise FileExistsError(EEXIST, os.strerror(EEXIST), targ)
4019 if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE):
4020 answer = self.confirm(f'Overwrite {targ}?', multi=multi)
4021 if not answer:
4022 continue
4023 with open(src, encoding='utf8') as file:
4024 # checkscript performs a blocking send/sendline,
4025 # but holding a lock should be okay in an interactive app.
4026 fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB)
4027 mgr.putscript(file, targ)
4028 if activate and targ != active:
4029 mgr.setactive(targ)
4031 def do_python(self):
4032 """python - enter Python read-evaluate-print loop"""
4033 hasatty = self.hasatty()
4034 wrapper = ObjWrapper(self.manager)
4035 if hasatty:
4036 oldcompleter = readline.get_completer()
4037 readline.set_completer(rlcompleter.Completer(wrapper).complete)
4038 with suppress(AttributeError):
4039 readline.clear_history()
4040 readline.set_auto_history(True)
4041 try:
4042 with suppress(SystemExit):
4043 banner = (f'Python {sys.version}\n'
4044 'Enter "help()" for help and "exit()" to exit')
4045 code.interact(local=wrapper, banner=banner, exitmsg='')
4046 finally:
4047 if hasatty:
4048 readline.set_completer(oldcompleter)
4049 with suppress(AttributeError):
4050 readline.clear_history()
4052 def do_rm(self, *args: str):
4053 """rm [-f|-i] [script ...] - remove script"""
4054 confirm = bool(self.reqconfirm & ShellCmd.RM)
4055 opts, scripts = getopt(list(args), 'fi')
4056 for opt, _ in opts:
4057 if opt == '-f':
4058 confirm = False
4059 elif opt == '-i':
4060 confirm = True
4061 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL
4062 multi = len(scripts) > 1
4063 for script in scripts:
4064 if answer is not ConfirmEnum.ALL:
4065 answer = self.confirm(f'Remove {script}?', multi=multi)
4066 if answer is ConfirmEnum.NONE:
4067 self.logger.info('Stopped')
4068 break
4069 if answer:
4070 self.manager.deletescript(script)
4072 def do_sh(self, *args: str):
4073 """sh [command] [argument ...] - run system command or system shell"""
4074 if not args:
4075 args = (pwd.getpwuid(os.getuid()).pw_shell,)
4076 subprocess.run(args, check=True)
4078 def do_su(self, user: str):
4079 """su user - manage scripts of user."""
4080 mgr = self.manager
4081 # pylint: disable=protected-access
4082 _, (args, kwargs) = mgr._getstate()
4083 kwargs['owner'] = user
4084 if mgr.login:
4085 mgr.unauthenticate()
4086 # Some ManageSieve servers reject the first
4087 # "AUTHENTICATE" after an "UNAUTHENTICATE".
4088 for i in range(2):
4089 try:
4090 mgr.authenticate(*args, **kwargs)
4091 except SieveOperationError:
4092 if i:
4093 raise
4094 continue
4095 break
4097 def do_vi(self, *args: str):
4098 """vi [-a] script [...] - edit scripts with a visual editor"""
4099 self.editscripts(VISUAL, *args)
4101 def do_xargs(self, command: str, *args: str):
4102 """xargs cmd [arg ...] - call cmd with arguments from standard input"""
4103 try:
4104 func = getattr(self, f'do_{command}')
4105 except AttributeError as err:
4106 raise ShellUsageError(f'{command}: No such command') from err
4107 lines = []
4108 with suppress(EOFError):
4109 while line := sys.stdin.readline().rstrip('\n'):
4110 lines.append(line)
4111 return func(*args, *lines)
4113 # Globbing and tab-completion
4114 @staticmethod
4115 def complete_dirs(_: int, text: str) -> list[tuple[str, bool]]:
4116 """Complete local directory names."""
4117 return [(d + '/', False)
4118 for d in readdir(path.dirname(text), path.isdir)]
4120 @staticmethod
4121 def complete_files(_: int, text: str) -> list[tuple[str, bool]]:
4122 """Complete local filenames."""
4123 return [(f + '/', False) if path.isdir(f) else (f, True)
4124 for f in readdir(path.dirname(text))]
4126 def complete_scripts(self, *_) -> list[tuple[str, bool]]:
4127 """Complete remote filenames."""
4128 return [(s, True) for s, _ in self.manager.listscripts(cached=True)]
4130 complete_activate = complete_scripts
4131 """Completer for activate."""
4133 complete_cat = complete_scripts
4134 """Completer for cat."""
4136 complete_cd = complete_dirs
4137 """Completer for cd."""
4139 complete_cmp = complete_scripts
4140 """Completer for cmp."""
4142 complete_cp = complete_scripts
4143 """Completer for cp."""
4145 complete_check = complete_files
4146 """Completer for check."""
4148 complete_diff = complete_scripts
4149 """Completer for diff."""
4151 complete_ed = complete_scripts
4152 """Completer for ed."""
4154 complete_get = complete_scripts
4155 """Completer for get."""
4157 complete_ls = complete_scripts
4158 """Completer for ls."""
4160 complete_more = complete_scripts
4161 """Completer for more."""
4163 complete_mv = complete_scripts
4164 """Completer for mv."""
4166 complete_put = complete_files
4167 """Completer for put."""
4169 complete_rm = complete_scripts
4170 """Completer for rm."""
4172 complete_vi = complete_scripts
4173 """Completer for vi."""
4175 # Properties
4176 clobber: bool
4177 """Overwrite files?"""
4179 reqconfirm: ShellCmd
4180 """Commands that require confirmation."""
4182 manager: SieveManager
4183 """Connection to a ManageSieve server."""
4186class ObjWrapper(dict):
4187 """Object wrapper for use with :func:`code.interact`.
4189 Arguments:
4190 obj: Object to wrap.
4191 """
4193 def __init__(self, obj: Any):
4194 """Initialize a proxy."""
4195 pairs: dict[str, Any] = {}
4196 for key, value in globals().items():
4197 pairs[key] = value
4198 for cls in obj.__class__.__mro__:
4199 pairs |= cls.__dict__
4200 for name in dir(obj):
4201 pairs[name] = getattr(obj, name)
4202 for name in dir(self):
4203 pairs[name] = getattr(self, name)
4204 super().__init__(pairs)
4206 @staticmethod
4207 def exit():
4208 """Exit the Python read-evaluate-print loop."""
4209 raise SystemExit()
4211 # Needed, or else `help` ignores the wrapper.
4212 @staticmethod
4213 def help(*args, **kwargs):
4214 """Show help."""
4215 help(*args, **kwargs)
4218#
4219# Configuration
4220#
4222BaseConfigT = TypeVar('BaseConfigT', bound='BaseConfig')
4223"""Type variable for :class:`BaseConfig`."""
4226class BaseConfig(UserDict):
4227 """Base class for configurations."""
4229 def __or__(self: BaseConfigT, other) -> BaseConfigT:
4230 obj = self.__class__()
4231 obj.__ior__(self)
4232 obj.__ior__(other)
4233 return obj
4235 def __ior__(self: BaseConfigT, other) -> BaseConfigT:
4236 super().__ior__(other)
4237 with suppress(AttributeError):
4238 for key, value in other._sections.items():
4239 UserDict.__ior__(self._sections[key], value)
4240 return self
4242 def loadfile(self, fname: str):
4243 """Read configuration variables from `fname`.
4245 Raises:
4246 ClientConfigError: Syntax error.
4247 """
4248 ptr = self
4249 cwd = os.getcwd()
4250 with open(fname) as file:
4251 if basedir := path.dirname(fname):
4252 os.chdir(basedir)
4253 try:
4254 for i, line in enumerate(file, start=1):
4255 if (pair := line.strip()) and not pair.startswith('#'):
4256 try:
4257 key, value = pair.split(maxsplit=1)
4258 if key == self._section:
4259 if value not in self._sections:
4260 self._sections[value] = self.__class__()
4261 ptr = self._sections[value]
4262 else:
4263 ptr.set(key, value)
4264 except (AttributeError, TypeError, ValueError) as err:
4265 message = f'{fname}:{i}: {err}'
4266 raise ClientConfigError(message) from err
4267 finally:
4268 os.chdir(cwd)
4270 def parse(self, expr: str):
4271 """Split `expr` into a name and a value and set the variable.
4273 `Expr` is split at the first equals sign ("=").
4274 :samp:`{var}` is equivalent to :samp:`{var}=yes`,
4275 :samp:`no{var}` to :samp:`{var}=no`.
4277 raises:
4278 AttributeError: Bad variable.
4279 """
4280 value: Union[bool, str]
4281 try:
4282 name, value = expr.split('=', maxsplit=1)
4283 except ValueError:
4284 if expr.startswith('no'):
4285 name, value = expr[2:], False
4286 else:
4287 name, value = expr, True
4288 if value == '':
4289 raise ValueError(f'{name}: Empty')
4290 self.set(name, value)
4292 def set(self, name: str, value):
4293 """Set the configuration variable `name` to `value`.
4295 raises:
4296 AttributeError: Bad variable.
4297 """
4298 if name.startswith('_'):
4299 raise AttributeError(f'{name}: Private variable')
4300 try:
4301 attr = getattr(self, name)
4302 except AttributeError as err:
4303 raise AttributeError(f'{name}: No such variable') from err
4304 if callable(attr):
4305 raise AttributeError(f'{name}: Not a variable')
4306 try:
4307 setattr(self, name, value)
4308 except AttributeError as err:
4309 raise AttributeError(f'{name}: Read-only variable') from err
4310 except (TypeError, ValueError) as err:
4311 raise err.__class__(f'{name}: {err}')
4313 @property
4314 def sections(self: BaseConfigT) -> dict[str, BaseConfigT]:
4315 """Sections in the loaded configuration files."""
4316 return self._sections
4318 _section: ClassVar[str]
4319 """Name of the statement that starts a section."""
4321 _sections: dict[str, Any] = {}
4322 """Sections in the loaded configuration files."""
4325class BaseVar(ABC):
4326 """Base class for :class:`BaseConfig` attributes.
4328 For example:
4330 >>> @dataclasses.dataclass
4331 >>> class FooConfig(BaseConfig):
4332 >>> foo = BoolVar(default=False)
4333 >>> bar = NumVar(cls=int, default=0)
4334 >>>
4335 >>> foo = FooConfig(bar=1)
4336 >>> foo.foo
4337 False
4338 >>> foo.bar
4339 1
4340 >>> foo.foo = 'yes'
4341 >>> foo.foo
4342 True
4343 >>> foo.bar = '2'
4344 >>> foo.bar
4345 2
4346 """
4348 def __init__(self, default: Any = None):
4349 """Initialize a configuration variable."""
4350 self.default = default
4352 def __get__(self, obj: BaseConfig, _: type) -> Any:
4353 try:
4354 return obj[self.name]
4355 except KeyError:
4356 return self.default
4358 def __set__(self, obj: BaseConfig, value: Any):
4359 if value is None:
4360 with suppress(KeyError):
4361 del obj[self.name]
4362 else:
4363 obj[self.name] = value
4365 def __set_name__(self, _: object, name: str):
4366 self.name = name
4368 name: str
4369 """Variable name."""
4371 default: Any
4372 """Default value."""
4375class ExpandingVarMixin():
4376 """Mixin for variables that do word expansion."""
4378 def expand(self, obj: BaseConfig, value: str) -> str:
4379 """Expand '~' and configuration variables."""
4380 assert isinstance(self, BaseVar)
4381 template = string.Template(value)
4382 # template.get_identifiers is only available in Python >= v3.11.
4383 varnames: set[str] = set()
4384 # pylint: disable=consider-using-f-string
4385 for pattern in (r'\$(%s)' % template.idpattern,
4386 r'\$\{(%s)\}' % template.idpattern):
4387 for match in re.finditer(pattern, value, flags=re.IGNORECASE):
4388 varnames.add(match.group(1))
4389 variables = {}
4390 for name in varnames:
4391 try:
4392 var = getattr(obj, name)
4393 except AttributeError as err:
4394 raise ValueError(f'${name}: No such variable') from err
4395 if var is None:
4396 raise ValueError(f'${name}: Not set')
4397 if not isinstance(var, (int, str)):
4398 raise ValueError(f'${name}: Not a scalar')
4399 variables[name] = var
4400 return path.expanduser(template.substitute(variables))
4403class ListVarMixin():
4404 """Mixin for lists."""
4406 splititems: Callable = re.compile(r'\s*,\s*').split
4407 """Split a comma-separated list into items."""
4410class BoolVar(BaseVar):
4411 """Convert "yes" and "no" to :class:`bool`."""
4413 def __set__(self, obj: BaseConfig, value: Union[bool, str]):
4414 if value in (True, 'yes'):
4415 super().__set__(obj, True)
4416 elif value in (False, 'no'):
4417 super().__set__(obj, False)
4418 else:
4419 raise ValueError(f'{value}: Not a boolean')
4422class CmdVar(BaseVar, ExpandingVarMixin):
4423 """Split up value into a list using :func:`shlex.split`."""
4425 def __set__(self, obj: BaseConfig, value: str):
4426 super().__set__(obj, shlex.split(value, posix=True))
4428 def __get__(self, obj: BaseConfig, objtype: type) -> Optional[list[str]]:
4429 return (None if (value := super().__get__(obj, objtype)) is None else
4430 [self.expand(obj, word) for word in value])
4433class EnumVar(BaseVar):
4434 """Convert comma-separated values to an :class:`enum.Enum`."""
4436 def __init__(self, *args, cls: type[enum.Enum], **kwargs):
4437 """Initialize the variable.
4439 Arguments:
4440 name: Variable name.
4441 cls: Enumeration type.
4442 default: Default value.
4443 """
4444 assert issubclass(cls, enum.Enum)
4445 super().__init__(*args, **kwargs)
4446 self.cls = cls
4448 def __set__(self, obj: BaseConfig, value: Union[enum.Enum, str]):
4449 if isinstance(value, enum.Enum):
4450 super().__set__(obj, value)
4451 elif isinstance(value, str): # type: ignore
4452 for member in self.cls:
4453 if member.name.casefold() == value.casefold():
4454 super().__set__(obj, member)
4455 break
4456 else:
4457 raise ValueError(f'{value}: No such item')
4458 else:
4459 raise TypeError(f'{type(value)}: Not an enumeration')
4462class FilenameVar(BaseVar):
4463 """Expand ``~user`` and make filenames absolute."""
4465 def __set__(self, obj: BaseConfig, value: Optional[str]):
4466 if value is None:
4467 super().__set__(obj, None)
4468 if isinstance(value, str):
4469 super().__set__(obj, path.abspath(path.expanduser(value)))
4470 raise TypeError('{value}: Not a str')
4473class FlagVar(BaseVar, ListVarMixin):
4474 """Convert comma-separated values to an :class:`int`."""
4476 def __init__(self, *args, cls: type[enum.IntEnum], **kwargs):
4477 assert issubclass(cls, enum.IntEnum)
4478 super().__init__(*args, **kwargs)
4479 self.cls = cls
4481 def __set__(self, obj: BaseConfig, value: Union[str, int, enum.IntEnum]):
4482 if isinstance(value, (int, enum.IntFlag)):
4483 super().__set__(obj, value)
4484 elif isinstance(value, str): # type: ignore
4485 flag = 0
4486 for name in self.splititems(value):
4487 for member in self.cls:
4488 if name.casefold() == member.name.casefold():
4489 flag |= member.value
4490 break
4491 else:
4492 raise ValueError(f'{name}: No such item')
4493 super().__set__(obj, flag)
4494 else:
4495 raise TypeError(f'{value}: Neither an int nor a str')
4497 cls: type[enum.IntEnum]
4498 """Enumeration type"""
4501class HostVar(BaseVar):
4502 """Check whether value is a valid hostname."""
4504 def __set__(self, obj: BaseConfig, value: Optional[str]):
4505 if isinstance(value, str):
4506 if not (isinetaddr(value) or ishostname(value)):
4507 raise ValueError(f'{value}: Neither hostname nor address')
4508 super().__set__(obj, value)
4509 elif value is None:
4510 super().__set__(obj, value)
4511 else:
4512 raise TypeError(f'{value}: Not a string')
4515class NumVar(BaseVar):
4516 """Convert value to a number of type :attr:`cls`."""
4518 def __init__(self, *args, cls: type = int,
4519 minval: Optional[Union[float, int]] = None,
4520 maxval: Optional[Union[float, int]] = None,
4521 **kwargs):
4522 """Initialize the variable.
4524 Arguments:
4525 name: Variable name.
4526 cls: Number type.
4527 minval: Smallest permissible value.
4528 maxval: Greatest permissible value.
4529 default: Default value.
4530 """
4531 super().__init__(*args, **kwargs)
4532 self.cls = cls
4533 self.minval = minval
4534 self.maxval = maxval
4536 def __set__(self, obj: BaseConfig, value: Union[int, float, str]):
4537 try:
4538 num = self.cls(value)
4539 except ValueError as err:
4540 raise ValueError(f'{value}: Not a number') from err
4541 if self.minval is not None and num < self.minval:
4542 raise ValueError(f'{value} < {self.minval}')
4543 if self.maxval is not None and num > self.maxval:
4544 raise ValueError(f'{value} > {self.maxval}')
4545 super().__set__(obj, num)
4547 cls: type
4548 """Number type."""
4550 minval: Optional[Union[float, int]]
4551 """Minimum value."""
4553 maxval: Optional[Union[float, int]]
4554 """Maximum value."""
4557class SASLMechVar(BaseVar, ListVarMixin):
4558 """Convert SASL mechanism names to :class:`BaseAuth` subclasses."""
4560 def __set__(self, obj: BaseConfig,
4561 value: Union[Iterable[type[BaseAuth]], str]):
4562 if isinstance(value, str):
4563 classes = BaseAuth.getmechs(obsolete=True)
4564 mechs = []
4565 for name in self.splititems(value.casefold()):
4566 matches = []
4567 for cls in classes:
4568 if fnmatch.fnmatchcase(cls.name.casefold(), name):
4569 if cls in mechs:
4570 raise ValueError(f'{cls.name}: Duplicate')
4571 matches.append(cls)
4572 if not matches:
4573 raise ValueError(f'{name}: No matches')
4574 mechs.extend(matches)
4575 elif isinstance(value, Sequence):
4576 mechs = value # type: ignore[assignment]
4577 else:
4578 raise TypeError(f'{value}: Not a SASL mechanism')
4579 super().__set__(obj, mechs)
4582class UniqueVar(BaseVar):
4583 """Variable the value of which must be unique."""
4585 def __set__(self, obj: BaseConfig, value: str):
4586 values = self.__class__.values
4587 if value in values:
4588 raise ValueError(f'{value}: Already in use')
4589 super().__set__(obj, value)
4590 values.add(value)
4592 values: ClassVar[set] = set()
4595SieveConfigT = TypeVar('SieveConfigT', bound='SieveConfig')
4598class SieveConfig(BaseConfig):
4599 """Configuration for the SieveManager command-line client."""
4601 @classmethod
4602 def fromfiles(cls: type[SieveConfigT], *fnames: str) -> SieveConfigT:
4603 """Create a new configuration from `fnames`.
4605 Arguments:
4606 fnames: Filenames (default: :data:`CONFIGFILES`)
4608 Raises:
4609 FileNotFoundError: A given file could not be found.
4610 """
4611 obj = cls()
4612 for fname in (fnames if fnames else CONFIGFILES):
4613 try:
4614 obj.loadfile(fname)
4615 except FileNotFoundError:
4616 if fnames:
4617 raise
4618 return obj
4620 def __init__(self, *args, **kwargs):
4621 """Create a new configuration.
4623 Arguments:
4624 args: Positional arguments used to initialize the back-end.
4625 kwargs: Keyword arguments used as initial configuration values.
4626 """
4627 super().__init__(*args)
4628 for key, value in kwargs.items():
4629 setattr(self, key, value)
4631 def getmanager(self, **variables) -> SieveManager:
4632 """Open a :class:`SieveManager` connection with this configuration.
4634 Arguments:
4635 variables: Configuration variables.
4637 Raises:
4638 ShellOperationError: All supported SASL mechanisms failed.
4639 netrc.NetrcParseError: :file:`.netrc` could not be parsed.
4640 """
4642 conf = self | self.__class__(**variables)
4643 mgr = SieveManager(backup=conf.backups, memory=conf.memory)
4645 # Helper to obtain passwords and passphrases
4646 def getpass_(passmgr: Optional[list[str]], prompt: str) -> str:
4647 if passmgr and (pass_ := readoutput(*passmgr, logger=mgr.logger)):
4648 return pass_
4649 return askpass(prompt)
4651 # Logging level
4652 mgr.logger.setLevel(conf.verbosity)
4654 # TLS
4655 sslcontext = mgr.sslcontext
4656 if (cadir := conf.cadir) or (cafile := conf.cafile):
4657 sslcontext.load_verify_locations(cafile, cadir)
4658 if cert := conf.cert:
4659 def getpassphrase():
4660 return getpass_(conf.getpassphrase, 'Certificate passphrase: ')
4661 sslcontext.load_cert_chain(cert, conf.key, getpassphrase)
4662 if conf.x509strict:
4663 sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT
4665 # Connect
4666 mgr.open(conf.host, port=conf.port, timeout=conf.timeout,
4667 tls=conf.tls, ocsp=conf.ocsp)
4669 # Authenticate
4670 logauth = conf.verbosity <= LogLevel.AUTH
4671 if (sasl := [s for s in conf.saslmechs if ExternalAuth in s.__mro__]):
4672 with suppress(SASLCapabilityError):
4673 mgr.authenticate(conf.login, owner=conf.owner,
4674 prepare=conf.saslprep, sasl=sasl,
4675 logauth=logauth)
4676 return mgr
4677 if (sasl := [s for s in conf.saslmechs if BasePwdAuth in s.__mro__]):
4678 password = (conf.password if conf.password else
4679 getpass_(conf.getpassword, 'Password: '))
4680 with suppress(SASLCapabilityError):
4681 mgr.authenticate(conf.login, password, owner=conf.owner,
4682 prepare=conf.saslprep, sasl=sasl,
4683 logauth=logauth)
4684 return mgr
4685 raise ShellOperationError('SASL mechanisms exhausted')
4687 def getshell(self, manager: SieveManager, **variables):
4688 """Get a configured :class:`SieveShell` that wraps `manager`."""
4689 conf = self | self.__class__(**variables)
4690 return SieveShell(manager, clobber=conf.clobber, confirm=conf.confirm)
4692 def loadfile(self, fname: str):
4693 """Read configuration from `fname`.
4695 Raises:
4696 ClientConfigError: Syntax error.
4697 ClientSecurityError: Permissions are insecure.
4698 """
4699 super().loadfile(fname)
4700 mode = os.stat(fname).st_mode
4701 if mode & 0o22:
4702 raise ClientSecurityError(f'{fname}: Is group- or world-writable')
4703 if mode & 0o44:
4704 for account in (self, *self._sections.values()):
4705 if 'password' in account:
4706 message = f'{fname}: Is group- or world-readable'
4707 raise ClientSecurityError(message)
4709 def loadaccount(self, host: str = 'localhost',
4710 login: Optional[str] = None):
4711 """Load the section for `login` on `host`."""
4712 self |= self.__class__(host=host, login=login)
4713 hosts = readnetrc(self.netrc)
4715 # Host
4716 for name, section in self.sections.items():
4717 if section.alias == self.host:
4718 try:
4719 self.host = section['host']
4720 except KeyError:
4721 self.host = name.rsplit('@', maxsplit=1)[-1]
4722 break
4723 with suppress(KeyError):
4724 self |= self.sections[self.host]
4726 # Login
4727 if not self.login:
4728 try:
4729 self.login = hosts[self.host][0]
4730 except KeyError:
4731 self.login = getpass.getuser()
4732 with suppress(KeyError):
4733 self |= self.sections[f'{self.login}@{self.host}']
4735 # Password
4736 if not self.password:
4737 with suppress(KeyError):
4738 self.password = hosts[self.host][2]
4740 alias: UniqueVar = UniqueVar()
4741 """Alias for a host."""
4743 backups = NumVar(cls=int, default=0, minval=0)
4744 """How many backups to keep."""
4746 cadir = FilenameVar()
4747 """Custom CA directory."""
4749 cafile = FilenameVar()
4750 """Custom CA file."""
4752 clobber = BoolVar(default=True)
4753 """Overwrite files?"""
4755 confirm = FlagVar(default=ShellCmd.ALL, cls=ShellCmd)
4756 """Which shell commands must be confirmed?"""
4758 cert = FilenameVar()
4759 """Client TLS certificate."""
4761 getpassphrase = CmdVar()
4762 """Command that prints the passphrase for the TLS key."""
4764 getpassword = CmdVar()
4765 """Command that prints a password."""
4767 host = HostVar(default='localhost')
4768 """Host to connect to by default."""
4770 key = FilenameVar()
4771 """Client TLS key."""
4773 login = BaseVar()
4774 """User to login as (authentication ID)."""
4776 memory = NumVar(default=524_288, minval=0)
4777 """How much memory to use for temporary data."""
4779 netrc: Optional[str] = os.getenv('NETRC')
4780 """Filename of the .netrc file."""
4782 ocsp = BoolVar(default=True)
4783 """Check whether server certificate was revoked?"""
4785 owner = BaseVar(default='')
4786 """User whose scripts to manage (authorization ID)."""
4788 password = BaseVar()
4789 """Password to login with."""
4791 port = NumVar(default=4190, minval=0, maxval=65535)
4792 """Port to connect to by default."""
4794 saslmechs = SASLMechVar(default=BasePwdAuth.getmechs())
4795 """How to authenticate."""
4797 saslprep = FlagVar(default=SASLPrep.ALL, cls=SASLPrep)
4798 """Which credentials to prepare."""
4800 timeout = NumVar(default=socket.getdefaulttimeout(), cls=float, minval=0)
4801 """Network timeout."""
4803 tls = BoolVar(default=True)
4804 """Use TLS?"""
4806 verbosity = EnumVar(default=LogLevel.INFO, cls=LogLevel)
4807 """Logging level."""
4809 x509strict = BoolVar(default=True)
4810 """Be strict when verifying TLS certificates?"""
4812 _section = 'account'
4815#
4816# Terminal I/O
4817#
4819class TermIO(io.TextIOWrapper):
4820 """I/O for the controlling terminal."""
4822 def __init__(self, *args, **kwargs):
4823 """Open the controlling terminal."""
4824 super().__init__(io.FileIO('/dev/tty', 'r+'), *args, **kwargs)
4827#
4828# Logging
4829#
4831LogIOWrapperT = TypeVar('LogIOWrapperT', bound='LogIOWrapper')
4832"""Type variable for :class:`LogIOWrapper`."""
4835class LogIOWrapper():
4836 """Logger for file-like objects."""
4838 @classmethod
4839 def wrap(cls: type[LogIOWrapperT],
4840 file: Union[BinaryIO, io.BufferedRWPair],
4841 logger: logging.Logger = logging.getLogger(__name__),
4842 level: int = logging.DEBUG,
4843 formats: tuple[str, str] = ('S: %s', 'C: %s'),
4844 encoding: str = 'utf8') \
4845 -> Union[BinaryIO, io.BufferedRWPair, LogIOWrapperT]:
4846 """Wrap a file in a :class:`LogIOWrapper` if logging is enabled.
4848 Takes the same arguments as :meth:`__init__`.
4850 Returns:
4851 The file or a :class:`LogIOWrapper` that wraps the file.
4852 """
4853 if logger.isEnabledFor(level):
4854 return cls(file, encoding, level, logger, formats)
4855 return file
4857 def __init__(self, file: Union[BinaryIO, io.BufferedRWPair],
4858 encoding: str = 'utf8', level: int = logging.DEBUG,
4859 logger: logging.Logger = logging.getLogger(__name__),
4860 formats: tuple[str, str] = ('S: %s', 'C: %s')):
4861 """Log I/O to `file`.
4863 Arguments:
4864 file: File-like object opened in binary mode.
4865 encoding: `file`'s encoding.
4866 level: Logging priority.
4867 logger: Logger.
4868 formats: Message formats; "%s" is replaced with I/O.
4869 """
4870 splitlines = re.compile(rb'\r?\n').split
4871 buffers = (bytearray(), bytearray())
4873 def extv(buf: bytearray, vec: Iterable[Iterable[int]]) -> None:
4874 for elem in vec:
4875 buf.extend(elem)
4877 def getdecorator(
4878 buf: bytearray, fmt: str, arg=None,
4879 ext: Callable[[bytearray, Iterable], None] = bytearray.extend
4880 ) -> Callable[[Callable[..., T]], Callable[..., T]]:
4881 def decorator(func: Callable[..., T]) -> Callable[..., T]:
4882 def wrapper(*args, **kwargs) -> T:
4883 retval = func(*args, **kwargs)
4884 if not self.quiet:
4885 data = retval if arg is None else args[arg]
4886 ext(buf, data) # type: ignore
4887 ptr: Union[bytes, bytearray] = buf
4888 while True:
4889 try:
4890 line, ptr = splitlines(ptr, maxsplit=1)
4891 except ValueError:
4892 break
4893 self.log(line, fmt)
4894 buf[:] = ptr
4895 return retval
4896 return wrapper
4897 return decorator
4899 logread = getdecorator(buffers[0], formats[0])
4900 logreadinto = getdecorator(buffers[0], formats[0], arg=0)
4901 logreadv = getdecorator(buffers[0], formats[0], ext=extv)
4902 logwrite = getdecorator(buffers[1], formats[1], arg=0)
4903 logwritev = getdecorator(buffers[1], formats[1], arg=0, ext=extv)
4905 self.read = logread(file.read)
4906 self.readline = logread(file.readline)
4907 self.readlines = logreadv(file.readlines)
4908 self.write = logwrite(file.write)
4909 self.writelines = logwritev(file.writelines)
4911 if isinstance(file, io.RawIOBase):
4912 self.readall = logread(file.readall)
4913 self.readinto = logreadinto(file.readinto) # type: ignore
4915 if isinstance(file, io.BufferedIOBase):
4916 self.read1 = logread(file.read1)
4917 self.readinto1 = logread(file.readinto1)
4918 self.readinto = logreadinto(file.readinto)
4920 self.buffers = buffers
4921 self.encoding = encoding
4922 self.formats = formats
4923 self.file = file
4924 self.level = level
4925 self.logger = logger
4927 def __del__(self):
4928 file = self.file
4929 if not file.closed:
4930 file.flush()
4931 file.close()
4932 if not self.quiet:
4933 for buf, fmt in zip(self.buffers, self.formats):
4934 if buf:
4935 self.log(buf, fmt)
4937 def __getattr__(self, name):
4938 return getattr(self.file, name)
4940 def __iter__(self):
4941 return self
4943 def __next__(self):
4944 if line := self.readline():
4945 return line
4946 raise StopIteration()
4948 def log(self, line: Union[bytearray, bytes], fmt: str):
4949 """Log `line` with `fmt`."""
4950 decoded = escapectrl(line.rstrip(b'\r\n').decode(self.encoding))
4951 self.logger.log(self.level, fmt, decoded)
4953 buffers: tuple[bytearray, bytearray]
4954 """Logging buffers."""
4956 encoding: str
4957 """:attr:`file`'s encoding."""
4959 file: Union[BinaryIO, io.BufferedRWPair]
4960 """Underlying file-like object."""
4962 formats: tuple[str, str]
4963 """Logging formats."""
4965 level: int
4966 """Logging level."""
4968 logger: logging.Logger
4969 """Logger."""
4971 quiet: bool = False
4972 """Log I/O?"""
4974 read: Callable[..., bytes]
4975 """Read from :attr:`file`."""
4977 readinto: Callable[..., int]
4978 """Read from :attr:`file` into a buffer."""
4980 readline: Callable[..., bytes]
4981 """Read a line from :attr:`file`."""
4983 readlines: Callable[..., list[bytes]]
4984 """Read all lines from :attr:`file`."""
4986 write: Callable[..., int]
4987 """Write to :attr:`file`."""
4989 writelines: Callable[..., None]
4990 """Write lines to :attr:`file`."""
4993#
4994# Signal handling
4995#
4997SignalHandlingFunc = Callable[[int, Union[types.FrameType, None]], Any]
4998"""Alias for signal handling functions."""
5001SignalHandler = Union[SignalHandlingFunc, int, None]
5002"""Alias for signal handlers."""
5005@dataclass(frozen=True)
5006class SignalCaught(Exception):
5007 """A signal was caught."""
5009 @classmethod
5010 def throw(cls, signo: int, frame: Optional[types.FrameType]):
5011 """Raise a :exc:`SignalCaught` exception."""
5012 raise cls(signo, frame)
5014 @classmethod
5015 def register(cls, signals: Iterable[int]) -> tuple[SignalHandler, ...]:
5016 """Register :meth:`throw` as handler for the given `signals`.
5018 Arguments:
5019 signals: Signals to register :meth:`throw` as handler for.
5021 Returns:
5022 Old signal handlers.
5023 """
5024 return tuple(signal.signal(s, cls.throw) for s in signals)
5026 @classmethod
5027 def catch(cls, *signals: int) -> \
5028 Callable[[Callable[..., T]], Callable[..., T]]:
5029 """Decorator that :meth:`handles <handle>` `signals`.
5031 If one of the given `signals` is caught, :exc:`SignalCaught` is raised.
5032 If that exception is not caught, the process group is terminated,
5033 the signal handler reset, and the signal re-raised.
5034 """
5035 def decorator(func: Callable[..., T]) -> Callable[..., T]:
5036 # pylint: disable=inconsistent-return-statements
5037 def wrapper(*args, **kwargs) -> T: # type: ignore[return]
5038 handlers = cls.register(signals)
5039 try:
5040 return func(*args, **kwargs)
5041 except cls as exc:
5042 logging.critical(exc)
5043 signal.signal(SIGTERM, SIG_IGN)
5044 os.killpg(os.getpgrp(), SIGTERM)
5045 signal.signal(exc.signo, SIG_DFL)
5046 signal.raise_signal(exc.signo)
5047 finally:
5048 for signo, handler in zip(signals, handlers):
5049 signal.signal(signo, handler)
5050 # NOTREACHED
5051 for name in dir(func):
5052 with suppress(AttributeError, TypeError, ValueError):
5053 setattr(wrapper, name, getattr(func, name))
5054 return wrapper
5055 return decorator
5057 def __str__(self):
5058 desc = signal.strsignal(self.signo)
5059 return desc.split(':')[0] if desc else f'caught signal {self.signo}'
5061 signo: int
5062 """Signal number."""
5064 frame: Optional[types.FrameType] = None
5065 """Stack frame."""
5068#
5069# Errors
5070#
5072# Error types
5073class Error(Exception):
5074 """Base class for errors."""
5077class CapabilityError(Error):
5078 """Base class for capability errors."""
5081class ConfigError(Error):
5082 """Base class for configuration errors."""
5085class DataError(Error):
5086 """Base class for data errors."""
5089class OperationError(Error):
5090 """Base class for operation errors."""
5093class ProtocolError(Error):
5094 """Base class for protocol errors.
5096 .. danger::
5097 Continuing after a :exc:`ProtocolError` may cause undefined behaviour.
5098 """
5101class SecurityError(Error):
5102 """Base class for security errors.
5104 .. danger::
5105 Continuing after a :exc:`SecurityError` compromises transmitted data.
5106 """
5109class SoftwareError(Error):
5110 """Base class for software errors."""
5113class UsageError(Error):
5114 """Base class for usage errors."""
5117# Client errors
5118class ClientError(Error):
5119 """Base class for client errors."""
5122class ClientConfigError(ClientError, ConfigError):
5123 """Client configuration error."""
5126class ClientConnectionError(ClientError, ConnectionError):
5127 """Client-side connection error."""
5130class ClientOperationError(ClientError, OperationError):
5131 """Client-side operation error."""
5134class ClientSecurityError(ClientError, SecurityError):
5135 """Client security error."""
5138class ClientSoftwareError(ClientError, SoftwareError):
5139 """Client software error (i.e., a bug)."""
5142# DNS errors
5143class DNSError(Error):
5144 """Base class for DNS errors."""
5147class DNSDataError(Error):
5148 """DNS data error."""
5151class DNSOperationError(DNSError, OperationError):
5152 """DNS operation error."""
5155class DNSSoftwareError(DNSError, SoftwareError):
5156 """DNS software error."""
5159# HTTP errors
5160class HTTPError(Error):
5161 """Base class for HTTP errors."""
5164class HTTPOperationError(HTTPError, OperationError):
5165 """HTTP operation error."""
5168class HTTPUsageError(HTTPError, ProtocolError):
5169 """HTTP usage error."""
5172# OCSP errors
5173class OCSPError(Error):
5174 """Base class for OCSP errors."""
5177class OCSPDataError(OCSPError, DataError):
5178 """OCSP data error."""
5181class OCSPOperationError(OCSPError, OperationError):
5182 """OCSP operation error."""
5185# SASL errors
5186class SASLError(Error):
5187 """Base class for SASL errors."""
5190class SASLCapabilityError(Error):
5191 """SASL capability error."""
5194class SASLProtocolError(SASLError, ProtocolError):
5195 """Server violated the SASL protocol."""
5198class SASLSecurityError(SASLError, SecurityError):
5199 """SASL security error."""
5202# Shell errors
5203class ShellError(Error):
5204 """Base class for shell errors."""
5207class ShellDataError(ShellError, DataError):
5208 """Shell data error."""
5211class ShellOperationError(ShellError, OperationError):
5212 """Shell operation error."""
5215class ShellUsageError(ShellError, UsageError):
5216 """Shell usage error."""
5219# ManageSieve errors
5220class SieveError(Error):
5221 """Base class for ManageSieve errors."""
5224class SieveCapabilityError(SieveError, CapabilityError):
5225 """Capability not supported by the server."""
5228class SieveConnectionError(Response, SieveError, ConnectionError):
5229 """Server said "BYE"."""
5231 # pylint: disable=redefined-outer-name
5232 def __init__(self, response: Atom = Atom('BYE'),
5233 code: tuple[Word, ...] = (),
5234 message: Optional[str] = None):
5235 super().__init__(response=response, code=code, message=message)
5238class SieveOperationError(Response, SieveError, OperationError):
5239 """Server said "NO"."""
5241 # pylint: disable=redefined-outer-name
5242 def __init__(self, response: Atom = Atom('NO'),
5243 code: tuple[Word, ...] = (),
5244 message: Optional[str] = None):
5245 super().__init__(response=response, code=code, message=message)
5248class SieveProtocolError(SieveError, ProtocolError):
5249 """Server violated the ManageSieve protocol error."""
5252# TLS errors
5253class TLSError(Error):
5254 """Base class for TLS errors."""
5257class TLSCapabilityError(TLSError, CapabilityError):
5258 """TLS capability error."""
5261class TLSSecurityError(TLSError, SecurityError):
5262 """TLS security error."""
5265class TLSSoftwareError(TLSError, SoftwareError):
5266 """TLS software error."""
5269#
5270# Helpers
5271#
5273def askpass(prompt: str) -> str:
5274 """Prompt for a password on the controlling terminal."""
5275 with TermIO() as tty:
5276 return getpass.getpass(prompt, stream=tty)
5279def backup(file: str, keep: int, getfiles: Callable[[], Iterable[str]],
5280 copy: Callable[[str, str], Any], remove: Callable[[str], Any]):
5281 """Make an Emacs-style backup of `file`.
5283 `keep` = 1
5284 :file:`file` is backed up as :file:`file~`.
5286 `keep` > 1
5287 :file:`file` is backed up as :file:`file.~{n}~`, where
5288 `n` starts with 1 and increments with each backup.
5290 Arguments:
5291 file: File to back up.
5292 keep: How many copies to keep.
5293 copy: Function that copies the file.
5294 getfiles: Function that returns a list of files.
5295 remove: Function that removes a file.
5297 Raises:
5298 ValueError: `keep` is < 0.
5299 """
5300 if keep < 0:
5301 raise ValueError('keep: must be >= 0')
5302 if keep == 0:
5303 return
5304 if keep == 1:
5305 copy(file, file + '~')
5306 else:
5307 backupexpr = re.escape(file) + r'\.~(\d+)~'
5308 matchbackup = re.compile(backupexpr).fullmatch
5309 backups = sorted((int(match.group(1)), file) for file in getfiles()
5310 if (match := matchbackup(file)))
5311 for _, bak in backups[:-(keep - 1)]:
5312 remove(bak)
5313 counter = backups[-1][0] + 1 if backups else 1
5314 copy(file, f'{file}.~{counter}~')
5317def bell():
5318 """Print a BEL to the controlling terminal, if there is one."""
5319 try:
5320 with TermIO() as tty:
5321 print('\a', end='', file=tty)
5322 except FileNotFoundError:
5323 pass
5326def certrevoked(cert, logger: logging.Logger = logging.getLogger(__name__)) \
5327 -> bool:
5328 """Check if `cert` has been revoked.
5330 Raises:
5331 OCSPDataError: `cert` contains no authority information.
5332 OCSPOperationError: no authoritative response.
5334 .. seealso::
5335 :rfc:`5019`
5336 Lightweight OCSP Profile
5337 :rfc:`6960`
5338 Online Certificate Status Protocol (OCSP)
5339 """
5340 try:
5341 issuers, responders = getcertauthinfo(cert)
5342 except ExtensionNotFound as err:
5343 raise OCSPDataError('no authority information') from err
5344 for caurl in issuers:
5345 try:
5346 der = httpget(caurl)
5347 except (urllib.error.URLError, HTTPError) as err:
5348 logger.error(err)
5349 continue
5350 ca = x509.load_der_x509_certificate(der)
5351 builder = ocsp.OCSPRequestBuilder()
5352 # SHA1 is mandated by RFC 5019.
5353 req = builder.add_certificate(cert, ca, SHA1()).build() # nosec B303
5354 # pylint: disable=redefined-outer-name
5355 path = b64encode(req.public_bytes(Encoding.DER)).decode('ascii')
5356 for responder in responders:
5357 statusurl = urllib.parse.urljoin(responder, path)
5358 try:
5359 res = ocsp.load_der_ocsp_response(httpget(statusurl))
5360 except HTTPError as err:
5361 logger.error(err)
5362 continue
5363 if res.response_status == OCSPResponseStatus.SUCCESSFUL:
5364 try:
5365 now = datetime.datetime.now(tz=datetime.UTC) # novermin
5366 end = res.next_update_utc # type: ignore
5367 if now < res.this_update_utc: # type: ignore
5368 continue
5369 if end is not None and now >= end:
5370 continue
5371 except AttributeError:
5372 warnings.filterwarnings(
5373 action='ignore',
5374 category=CryptographyDeprecationWarning
5375 )
5376 now = datetime.datetime.now()
5377 if now < res.this_update:
5378 continue
5379 if (end := res.next_update) is not None and now >= end:
5380 continue
5381 if res.certificate_status == OCSPCertStatus.REVOKED:
5382 return True
5383 if res.certificate_status == OCSPCertStatus.GOOD:
5384 return False
5385 raise OCSPOperationError('no authoritative OCSP response')
5388def escapectrl(chars: str) -> str:
5389 """Escape control characters."""
5390 categories = map(unicodedata.category, chars)
5391 escaped = [fr'\u{ord(char):04x}' if cat.startswith('C') else char
5392 for char, cat in zip(chars, categories)]
5393 return ''.join(escaped)
5396def httpget(url: str) -> bytes:
5397 """Download a file from `url` using HTTP.
5399 Raises:
5400 HTTPUsageError: `url` is not an HTTP URL.
5401 HTTPOperationError: "GET" failed.
5402 """
5403 while True:
5404 if not url.startswith('http://'):
5405 raise HTTPUsageError(f'{url}: not an HTTP URL')
5406 with urllib.request.urlopen(url) as res: # nosec B310
5407 if res.status == 200:
5408 return res.read()
5409 if res.status in (301, 302, 303, 307, 308):
5410 if url := res.getheader('Location'):
5411 continue
5412 raise HTTPOperationError(f'GET {url}: {res.reason}')
5413 # NOTREACHED
5416def getcertauthinfo(cert) -> tuple[list[str], list[str]]:
5417 """Get information about the authority that issued `cert`.
5419 Returns:
5420 CA issuer URLs and OCSP responder base URLs.
5421 """
5422 exts = cert.extensions.get_extension_for_class(AuthorityInformationAccess)
5423 issuers = []
5424 responders = []
5425 for field in exts.value:
5426 oid = field.access_method
5427 if oid == AuthorityInformationAccessOID.CA_ISSUERS:
5428 issuers.append(field.access_location.value)
5429 elif oid == AuthorityInformationAccessOID.OCSP:
5430 responders.append(field.access_location.value)
5431 return issuers, responders
5434def getfilesize(file: IO) -> int:
5435 """Get the size of file-like object relative to the current position."""
5436 try:
5437 pos = file.tell()
5438 except io.UnsupportedOperation:
5439 pos = 0
5440 try:
5441 size = os.fstat(file.fileno()).st_size - pos
5442 except io.UnsupportedOperation:
5443 size = file.seek(0, SEEK_END) - pos
5444 file.seek(pos, SEEK_SET)
5445 return size
5448def isdnsname(name: str) -> bool:
5449 """Check whether `name` is a valid DNS name.
5451 .. seealso::
5452 :rfc:`1035` (sec. 2.3.1)
5453 Domain names - Preferred name syntax
5454 :rfc:`2181` (sec. 11)
5455 Clarifications to the DNS Specification - Name syntax
5456 """
5457 return (all(0 < len(x) <= 63 for x in name.removesuffix('.').split('.'))
5458 and len(name) <= 253)
5461def ishostname(name: str) -> bool:
5462 """Check whether `name` is a valid hostname.
5464 .. seealso::
5465 :rfc:`921`
5466 Domain Name System Implementation Schedule
5467 :rfc:`952`
5468 Internet host table specification
5469 :rfc:`1123` (sec. 2.1)
5470 Host Names and Numbers
5471 """
5472 return (bool(re.fullmatch(r'((?!-)[a-z0-9-]+(?<!-)\.?)+', name, re.I))
5473 and isdnsname(name))
5476def isinetaddr(addr: str) -> bool:
5477 """Check whether `addr` is an internet address."""
5478 try:
5479 ipaddress.ip_address(addr)
5480 except ValueError:
5481 return False
5482 return True
5485def nwise(iterable: Iterable, n: Any) -> Iterator[tuple]:
5486 """Iterate over n-tuples."""
5487 iterator = iter(iterable)
5488 ntuple = deque(itertools.islice(iterator, n - 1), maxlen=n)
5489 for x in iterator:
5490 ntuple.append(x)
5491 yield tuple(ntuple)
5494def randomize(elems: Sequence[T], weights: Iterable[int]) -> list[T]:
5495 """Randomise the order of `elems`."""
5496 nelems = len(elems)
5497 weights = list(weights)
5498 indices = list(range(len(elems)))
5499 randomized = []
5500 for _ in range(nelems):
5501 i, = random.choices(indices, weights, k=1) # noqa DUO102 # nosec B311
5502 randomized.append(elems[i])
5503 weights[i] = 0
5504 return randomized
5507def readdir(dirname: str, predicate: Optional[Callable[[str], bool]] = None) \
5508 -> Iterator[str]:
5509 """Get every filename in `dirname` that matches `predicate`."""
5510 for dirent in os.listdir(dirname if dirname else '.'):
5511 fname = path.join(dirname, dirent)
5512 if predicate is None or predicate(fname):
5513 yield fname
5516def readnetrc(fname: Optional[str]) -> dict[str, tuple[str, str, str]]:
5517 """Read a .netrc file.
5519 Arguments:
5520 fname: Filename (default: :file:`~/.netrc`)
5522 Returns:
5523 Mapping from hosts to login-account-password 3-tuples.
5525 Raises:
5526 FileNotFoundError: `fname` was given but not found.
5527 netrc.NetrcParseError: Syntax error.
5528 """
5529 try:
5530 if fname:
5531 return netrc.netrc(fname).hosts
5532 with suppress(FileNotFoundError):
5533 return netrc.netrc().hosts
5534 except netrc.NetrcParseError as err:
5535 if sys.version_info < (3, 10):
5536 logging.error(err)
5537 else:
5538 raise
5539 return {}
5542def readoutput(*command: str, encoding: str = ENCODING,
5543 logger: logging.Logger = logging.getLogger(__name__)) -> str:
5544 """Decode and return the output of `command`.
5546 Returns:
5547 Decoded output or, if `command` exited with a non-zero status,
5548 the empty string.
5550 Raises:
5551 subprocess.CalledProcessError: `command` exited with a status >= 127.
5552 """
5553 logger.debug('exec: %s', ' '.join(command))
5554 try:
5555 cp = subprocess.run(command, capture_output=True, check=True)
5556 except subprocess.CalledProcessError as err:
5557 if err.returncode >= 127:
5558 raise
5559 logger.debug('%s exited with status %d', command[0], err.returncode)
5560 return ''
5561 return cp.stdout.rstrip().decode(encoding)
5564# pylint: disable=redundant-returns-doc
5565def resolvesrv(host: str) -> Iterator[SRV]:
5566 """Resolve a DNS SRV record.
5568 Arguments:
5569 host: Hostname (e.g., :samp:`_sieve._tcp.imap.foo.example`)
5571 Returns:
5572 An iterator over `SRV` records sorted by their priority
5573 and randomized according to their weight.
5575 Raises:
5576 DNSDataError: `host` is not a valid DNS name.
5577 DNSOperationError: Lookup error.
5578 DNSSoftwareError: dnspython_ is not available.
5580 .. note::
5581 Requires dnspython_.
5582 """
5583 if not HAVE_DNSPYTHON:
5584 raise DNSSoftwareError('dnspython unavailable')
5585 if not isdnsname(host):
5586 raise DNSDataError('hostname or label is too long')
5587 try:
5588 answer = dns.resolver.resolve(host, 'SRV')
5589 hosts: dict[str, list] = defaultdict(list)
5590 byprio: dict[int, list[SRV]] = defaultdict(list)
5591 for rec in answer.response.additional:
5592 name = rec.name.to_text()
5593 addrs = [i.address for i in rec.items
5594 if i.rdtype == dns.rdatatype.A]
5595 hosts[name].extend(addrs)
5596 for rec in answer: # type: ignore
5597 name = rec.target.to_text() # type: ignore
5598 priority = rec.priority # type: ignore
5599 weight = rec.weight # type: ignore
5600 port = rec.port # type: ignore
5601 if addrs := hosts.get(name): # type: ignore
5602 srvs = [SRV(priority, weight, a, port) for a in addrs]
5603 byprio[priority].extend(srvs)
5604 else:
5605 srv = SRV(priority, weight, name.rstrip('.'), port)
5606 byprio[priority].append(srv)
5607 for prio in sorted(byprio.keys()):
5608 srvs = byprio[prio]
5609 weights = [s.weight for s in srvs]
5610 if sum(weights) > 0:
5611 yield from randomize(srvs, weights)
5612 else:
5613 yield from srvs
5614 except DNSException as err:
5615 raise DNSOperationError(str(err)) from err
5618def yamlescape(data: str):
5619 """Strip and quote `data` for use as YAML scalar if needed."""
5620 indicators = ('-', '?', ':', ',', '[', ']', '{', '}', '#', '&',
5621 '*', '!', '|', '>', "'", '"', '%', '@', '`')
5622 data = data.strip()
5623 if not data:
5624 return '""'
5625 if data.casefold() == 'no':
5626 return '"' + data + '"'
5627 if data[0] in indicators or ': ' in data or ' #' in data:
5628 return '"' + data.replace('\\', '\\\\').replace('"', '\\\"') + '"'
5629 return data
5632#
5633# Main
5634#
5636# pylint: disable=too-many-branches, too-many-statements
5637@SignalCaught.catch(SIGHUP, SIGINT, SIGTERM)
5638def main() -> NoReturn:
5639 """sievemgr - manage remote Sieve scripts
5641 Usage: sievemgr [server] [command] [argument ...]
5642 sievemgr -e expression [...] [server]
5643 sievemgr -s file [server]
5645 Options:
5646 -C Do not overwrite existing files.
5647 -c file Read configuration from file.
5648 -d Enable debugging mode.
5649 -e expression Execute expression on the server.
5650 -f Overwrite and remove files without confirmation.
5651 -i Confirm removing or overwriting files.
5652 -o key=value Set configuration key to value.
5653 -q Be quieter.
5654 -s file Execute expressions read from file.
5655 -v Be more verbose.
5657 -e, -o, -q, and -v can be given multiple times.
5658 See sievemgr(1) for the complete list.
5660 Report bugs to: <https://github.com/odkr/sievemgr/issues>
5661 Home page: <https://odkr.codeberg.page/sievemgr>
5662 """
5663 progname = path.basename(sys.argv[0])
5664 logging.basicConfig(format=f'{progname}: %(message)s')
5666 # Options
5667 try:
5668 opts, args = getopt(sys.argv[1:], 'CN:Vc:de:fhio:qs:v',
5669 ['help', 'version'])
5670 except GetoptError as err:
5671 error('%s', err, status=2)
5673 optconf = SieveConfig()
5674 debug = False
5675 exprs: list[str] = []
5676 configfiles: list[str] = []
5677 script: Optional[TextIO] = None
5678 volume = 0
5679 for opt, arg in opts:
5680 try:
5681 if opt in ('-h', '--help'):
5682 showhelp(main)
5683 elif opt in ('-V', '--version'):
5684 showversion()
5685 elif opt == '-C':
5686 optconf.clobber = False
5687 elif opt == '-N':
5688 optconf.netrc = arg
5689 elif opt == '-c':
5690 configfiles.append(arg)
5691 elif opt == '-d':
5692 optconf.verbosity = LogLevel.DEBUG
5693 elif opt == '-e':
5694 exprs.append(arg)
5695 elif opt == '-f':
5696 optconf.confirm = ShellCmd.NONE
5697 elif opt == '-i':
5698 optconf.confirm = ShellCmd.ALL
5699 elif opt == '-o':
5700 optconf.parse(arg)
5701 elif opt == '-q':
5702 volume -= 1
5703 elif opt == '-s':
5704 # pylint: disable=consider-using-with
5705 script = open(arg)
5706 elif opt == '-v':
5707 volume += 1
5708 except (AttributeError, TypeError, ValueError) as err:
5709 error('option %s: %s', opt, err, status=2)
5711 # Arguments
5712 url = URL.fromstr(args.pop(0)) if args else None
5713 command = args.pop(0) if args else ''
5715 if script:
5716 if command:
5717 error('-s cannot be used together with a command', status=2)
5718 if exprs:
5719 error('-e and -s cannot be combined', status=2)
5721 # Configuration
5722 try:
5723 conf = SieveConfig().fromfiles(*configfiles) | optconf
5724 except FileNotFoundError as err:
5725 assert err.errno
5726 error('%s: %s', err.filename, os.strerror(err.errno))
5727 except (ConfigError, SecurityError) as err:
5728 error('%s', err)
5729 conf.loadaccount(host=url.hostname if url else conf.host,
5730 login=url.username if url else None)
5731 conf |= optconf
5733 if url:
5734 if url.username:
5735 conf.login = url.username
5736 if url.password:
5737 conf.password = url.password
5738 if url.owner:
5739 conf.owner = url.owner
5741 # Logging
5742 conf.verbosity = conf.verbosity.fromdelta(volume)
5743 logger = logging.getLogger()
5744 logger.setLevel(conf.verbosity)
5746 # Infos
5747 for line in _ABOUT.strip().splitlines():
5748 logging.info('%s', line)
5750 # Shell
5751 try:
5752 with conf.getmanager() as mgr:
5753 shell = conf.getshell(mgr)
5754 if exprs:
5755 for expr in exprs:
5756 shell.executeline(expr)
5757 elif script:
5758 shell.executescript(script)
5759 elif command:
5760 shell.execute(command, *args)
5761 else:
5762 shell.enter()
5763 except (socket.herror, socket.gaierror, ConnectionError) as err:
5764 error('%s', err.args[1])
5765 except (MemoryError, ssl.SSLError) as err:
5766 error('%s', err)
5767 except OSError as err:
5768 error('%s', os.strerror(err.errno) if err.errno else err)
5769 except subprocess.CalledProcessError as err:
5770 error('%s exited with status %d', err.cmd[0], err.returncode)
5771 except Error as err:
5772 if debug:
5773 raise
5774 error('%s', err)
5775 sys.exit(shell.retval)
5778def error(*args, status: int = 1, **kwargs) -> NoReturn:
5779 """Log an err and :func:`exit <sys.exit>` with `status`.
5781 Arguments:
5782 args: Positional arguments for :func:`logging.error`.
5783 status: Exit status.
5784 kwargs: Keyword arguments for :func:`logging.error`.
5785 """
5786 logging.error(*args, **kwargs)
5787 sys.exit(status)
5790def showhelp(func: Callable) -> NoReturn:
5791 """Print the docstring of `func` and :func:`exit <sys.exit>`."""
5792 assert func.__doc__
5793 lines = func.__doc__.splitlines()
5794 prefix = path.commonprefix([line for line in lines[1:-1] if line])
5795 nlines = len(lines)
5796 for i, line in enumerate(lines):
5797 text = line.removeprefix(prefix)
5798 if not (i == nlines - 1 and not text):
5799 print(text)
5800 sys.exit()
5803def showversion() -> NoReturn:
5804 """Print :attr:`_ABOUT` and :func:`exit <sys.exit>`."""
5805 print(_ABOUT.strip())
5806 sys.exit()
5809#
5810# Boilerplate
5811#
5813logging.getLogger(__name__).addHandler(logging.NullHandler())
5815if __name__ == '__main__':
5816 main()