Coverage for sievemgr.py: 44%

2693 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-12 00:39 +0100

1#!/usr/bin/env python3 

2"""Client for managing Sieve scripts remotely using the ManageSieve protocol""" 

3 

4# 

5# Copyright 2023 and 2024 Odin Kroeger 

6# 

7# This file is part of SieveManager. 

8# 

9# SieveManager is free software: you can redistribute it and/or 

10# modify it under the terms of the GNU General Public License as 

11# published by the Free Software Foundation, either version 3 of 

12# the License, or (at your option) any later version. 

13# 

14# SieveManager is distributed in the hope that it will be useful, 

15# but WITHOUT ALL WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with SieveManager. If not, see <https://www.gnu.org/licenses/>. 

21# 

22 

23 

24# 

25# Modules 

26# 

27 

28from __future__ import annotations 

29from abc import ABC, abstractmethod 

30from base64 import b64decode, b64encode 

31from collections import UserDict, defaultdict, deque 

32from collections.abc import Iterable 

33from contextlib import AbstractContextManager, suppress 

34from dataclasses import dataclass 

35from errno import (ECONNABORTED, EEXIST, EINPROGRESS, EISCONN, 

36 ENOENT, ENOTCONN, ETIMEDOUT) 

37from fcntl import LOCK_SH, LOCK_NB 

38from getopt import GetoptError, getopt 

39from inspect import Parameter 

40from os import O_CREAT, O_EXCL, O_WRONLY, O_TRUNC, SEEK_END, SEEK_SET 

41from os import path 

42from signal import SIG_DFL, SIG_IGN, SIGHUP, SIGINT, SIGTERM 

43from tempfile import SpooledTemporaryFile, TemporaryDirectory 

44from typing import (Any, BinaryIO, Callable, ClassVar, Final, IO, Iterator, 

45 NoReturn, Optional, Union, Sequence, TextIO, TypeVar) 

46 

47import code 

48import dataclasses 

49import datetime 

50import enum 

51import fcntl 

52import fnmatch 

53import getpass 

54import hashlib 

55import hmac 

56import inspect 

57import io 

58import ipaddress 

59import itertools 

60import locale 

61import logging 

62import math 

63import netrc 

64import os 

65import pwd 

66import random 

67import re 

68import readline 

69import rlcompleter 

70import secrets 

71import select 

72import shlex 

73import shutil 

74import signal 

75import socket 

76import subprocess # nosec B404 

77import ssl 

78import string 

79import stringprep 

80import sys 

81import threading 

82import types 

83import unicodedata 

84import urllib.error 

85import urllib.parse 

86import urllib.request 

87import warnings 

88 

89try: 

90 from cryptography import x509 

91 from cryptography.hazmat.primitives.hashes import SHA1 

92 from cryptography.hazmat.primitives.serialization import Encoding 

93 from cryptography.x509 import (AuthorityInformationAccess, 

94 ExtensionNotFound, ocsp) 

95 from cryptography.x509.oid import AuthorityInformationAccessOID 

96 from cryptography.x509.ocsp import OCSPResponseStatus, OCSPCertStatus 

97 from cryptography.utils import CryptographyDeprecationWarning 

98 

99 HAVE_CRYPTOGRAPHY: Final = True # type: ignore 

100except ImportError: 

101 HAVE_CRYPTOGRAPHY: Final = False # type: ignore 

102 

103try: 

104 import dns.rdatatype 

105 import dns.resolver 

106 from dns.exception import DNSException 

107 

108 HAVE_DNSPYTHON = True # type: ignore 

109except ImportError: 

110 HAVE_DNSPYTHON = False # type: ignore 

111 

112if sys.version_info < (3, 9): 

113 sys.exit('SieveManager requires Python 3.9 or later') 

114 

115 

116# 

117# Metadata 

118# 

119 

120__version__ = '0.7.4.7+devel' 

121__author__ = 'Odin Kroeger' 

122__copyright__ = '2023 and 2024 Odin Kroeger' 

123__license__ = 'GPLv3+' 

124__all__ = [ 

125 # ABCs 

126 'BaseAuth', 

127 'BaseSASLAdapter', 

128 

129 # ManageSieve 

130 'SieveManager', 

131 'Atom', 

132 'Line', 

133 'Word', 

134 'Capabilities', 

135 'Response', 

136 'URL', 

137 

138 # SASL 

139 'BaseAuth', 

140 'BasePwdAuth', 

141 'BaseScramAuth', 

142 'BaseScramPlusAuth', 

143 'CramMD5Auth', 

144 'ExternalAuth', 

145 'LoginAuth', 

146 'PlainAuth', 

147 'ExternalAuth', 

148 'ScramSHA1Auth', 

149 'ScramSHA1PlusAuth', 

150 'ScramSHA224Auth', 

151 'ScramSHA224PlusAuth', 

152 'ScramSHA256Auth', 

153 'ScramSHA256PlusAuth', 

154 'ScramSHA384Auth', 

155 'ScramSHA384PlusAuth', 

156 'ScramSHA512Auth', 

157 'ScramSHA512PlusAuth', 

158 'ScramSHA3_512Auth', 

159 'ScramSHA3_512PlusAuth', 

160 'SASLPrep', 

161 

162 # Errors 

163 'Error', 

164 'ProtocolError', 

165 'SecurityError', 

166 'CapabilityError', 

167 'ConfigError', 

168 'DataError', 

169 'OperationError', 

170 'UsageError', 

171 'ClientError', 

172 'ClientConfigError', 

173 'ClientConnectionError', 

174 'ClientOperationError', 

175 'ClientSecurityError', 

176 'OCSPError', 

177 'OCSPDataError', 

178 'OCSPOperationError', 

179 'SASLError', 

180 'SASLCapabilityError', 

181 'SASLProtocolError', 

182 'SASLSecurityError', 

183 'SieveError', 

184 'SieveCapabilityError', 

185 'SieveConnectionError', 

186 'SieveOperationError', 

187 'SieveProtocolError', 

188 'TLSError', 

189 'TLSCapabilityError', 

190 'TLSSecurityError' 

191] 

192 

193 

194# 

195# Globals 

196# 

197 

198_ABOUT: Final[str] = f'SieveManager {__version__}\nCopyright {__copyright__}' 

199"""About message.""" 

200 

201DEBUG: bool = False 

202"""Print stack traces even for expected error types?""" 

203 

204EDITOR: list[str] = shlex.split(os.getenv('EDITOR', 'ed'), posix=True) 

205""":envvar:`EDITOR` or :command:`ed` if :envvar:`EDITOR` is unset.""" 

206 

207ENCODING: str = locale.getpreferredencoding(do_setlocale=False) 

208"""Encoding.""" 

209 

210HOME: str = os.getenv('HOME', pwd.getpwuid(os.getuid()).pw_dir) 

211"""Home directory.""" 

212 

213PAGER: list[str] = shlex.split(os.getenv('PAGER', 'more'), posix=True) 

214""":envvar:`PAGER` or :command:`more` if :envvar:`PAGER` is unset.""" 

215 

216VISUAL: list[str] = shlex.split(os.getenv('VISUAL', 'vi'), posix=True) 

217""":envvar:`VISUAL` or :command:`vi` if :envvar:`VISUAL` is unset.""" 

218 

219XDG_CONFIG_HOME: Final[str] = os.getenv('XDG_CONFIG_HOME', f'{HOME}/.config') 

220"""X Desktop group base configuration directory.""" 

221 

222CONFIGFILES: Final[tuple[str, ...]] = ( 

223 '/etc/sieve/config', 

224 '/etc/sieve.cf', 

225 f'{XDG_CONFIG_HOME}/sieve/config', 

226 f'{HOME}/.sieve/config', 

227 f'{HOME}/.sieve.cf' 

228) 

229"""Default configuration files.""" 

230 

231 

232# 

233# Types 

234# 

235 

236class Atom(str): 

237 """ManageSieve keyword (e.g., ``LISTSCRIPTS``, ``OK``).""" 

238 

239 # pylint: disable=eq-without-hash 

240 def __eq__(self, other) -> bool: 

241 return self.casefold() == other.casefold() 

242 

243 def __ne__(self, other) -> bool: 

244 return self.casefold() != other.casefold() 

245 

246 

247AuthMech = type['BaseAuth'] 

248"""Alias for subclasses of :class:`BaseAuth`.""" 

249 

250 

251class AuthState(enum.IntEnum): 

252 """State of the authentication process.""" 

253 

254 PREAUTH = enum.auto() 

255 """"AUTHENTICATE" has *not* been issued.""" 

256 

257 SENT = enum.auto() 

258 """Data sent, ready to receive.""" 

259 

260 RECEIVED = enum.auto() 

261 """Data received, ready to send.""" 

262 

263 DONE = enum.auto() 

264 """Authentication concluded.""" 

265 

266 

267class ConfirmEnum(enum.IntEnum): 

268 """Answers that :meth:`BaseShell.confirm` may return.""" 

269 

270 NO = 0 

271 YES = 1 

272 ALL = 2 

273 NONE = 3 

274 

275 def __bool__(self) -> bool: 

276 return self in (self.YES, self.ALL) 

277 

278 

279Line = list['Word'] 

280""":class:`List <list>` of :class:`Word`-s.""" 

281 

282 

283class LogLevel(enum.IntEnum): 

284 """Logging levels supported by :class:`SieveConfig`.""" 

285 

286 AUTH = logging.DEBUG // 2 

287 DEBUG = logging.DEBUG 

288 INFO = logging.INFO 

289 WARNING = logging.WARNING 

290 ERROR = logging.ERROR 

291 

292 def fromdelta(self, delta: int) -> 'LogLevel': 

293 """Get a :class:`LogLevel` from a `delta`. 

294 

295 For example: 

296 

297 >>> LogLevel.INFO.fromdelta(-1) 

298 <LogLevel.WARNING: 30> 

299 >>> LogLevel.INFO.fromdelta(0) 

300 <LogLevel.INFO: 20> 

301 >>> LogLevel.INFO.fromdelta(1) 

302 <LogLevel.DEBUG: 10> 

303 >>> # Out-of-bounds deltas do not raise an error 

304 >>> LogLevel.INFO.fromdelta(-math.inf) 

305 <LogLevel.ERROR: 40> 

306 >>> LogLevel.INFO.fromdelta(math.inf) 

307 <LogLevel.AUTH: 5> 

308 """ 

309 levels = list(self.__class__) 

310 index = levels.index(self) - delta 

311 return levels[min(max(index, 0), len(levels) - 1)] 

312 

313 

314class SASLPrep(enum.IntEnum): 

315 """Controls which strings are prepared for authentication. 

316 

317 .. seealso:: 

318 :rfc:`3454` 

319 Preparation of Internationalized Strings 

320 :rfc:`4013` 

321 Stringprep Profile for User Names and Passwords 

322 :rfc:`4422` (sec. 4) 

323 SASL protocol requirements 

324 """ 

325 

326 NONE = 0 

327 USERNAMES = 1 

328 PASSWORDS = 2 

329 ALL = 3 

330 

331 

332class ShellCmd(enum.IntEnum): 

333 """Shell actions that may overwrite or remove files.""" 

334 

335 NONE = 0 

336 CP = 1 

337 GET = 2 

338 MV = 4 

339 PUT = 8 

340 RM = 16 

341 ALL = 31 

342 

343 

344class ShellPattern(str): 

345 """:class:`BaseShell` pattern.""" 

346 

347 def expand(self, tokens: Iterable[str]) -> Iterator[str]: 

348 """Yield all matching `tokens`.""" 

349 for token in tokens: 

350 if not token.startswith('.') and fnmatch.fnmatchcase(token, self): 

351 yield token 

352 

353 

354ShellWord = Union[ShellPattern, str] 

355"""Alias for :class:`ShellPattern` and `str`.""" 

356 

357 

358Word = Union[Atom, None, int, str, Line] 

359"""Alias for :class:`Atom`, ``None``, ``int``, ``str``, and :class:`Line`.""" 

360 

361 

362T = TypeVar('T') 

363"""Type variable.""" 

364 

365 

366# 

367# Abstract base classes 

368# 

369 

370BaseAuthT = TypeVar('BaseAuthT', bound='BaseAuth') 

371"""Type variable for :class:`BaseAuth`.""" 

372 

373 

374class BaseAuth(ABC): 

375 """Base class for authentication mechanisms. 

376 

377 The ManageSieve "AUTHENTICATE" command performs a `Simple Authentication 

378 and Security Layer`_ (SASL) protocol exchange. SASL is a framework and 

379 comprises different authentication mechanisms ("SASL mechanisms"). 

380 

381 :meth:`SieveManager.authenticate` does *not* implement any such 

382 mechanism, but delegates the SASL protocol exchange to classes that do. 

383 Such classes must subclass :class:`BaseAuth` *and* have a :attr:`name` 

384 attribute that indicates the mechanism they implement. 

385 

386 :class:`BaseAuth` provides methods to prepare strings according 

387 to :rfc:`3454` and :rfc:`4013` and a layer over :class:`BaseSASLAdapter` 

388 objects that calls :meth:`BaseSASLAdapter.begin` and 

389 :meth:`BaseSASLAdapter.end` transparently. 

390 

391 Credentials must be prepared in :meth:`__init__`. Subclasses 

392 should pass `connection`, `authcid`, `authzid`, and `prepare` to 

393 :code:`super().__init__` and use :meth:`prepare` to prepare the 

394 remaining credentials. For example: 

395 

396 .. literalinclude:: ../sievemgr.py 

397 :pyobject: BasePwdAuth.__init__ 

398 :dedent: 4 

399 

400 The SASL exchange must be implemented in :meth:`exchange`. 

401 Subclasses should use :meth:`send` and :meth:`receive` to 

402 exchange SASL messages. For example: 

403 

404 .. literalinclude:: ../sievemgr.py 

405 :pyobject: PlainAuth.exchange 

406 :dedent: 4 

407 """ 

408 

409 @staticmethod 

410 # pylint: disable=redefined-outer-name (string) 

411 def prepare(string: str) -> str: 

412 """Prepare `string` according to :rfc:`3454` and :rfc:`4013`. 

413 

414 Returns: 

415 Prepared `string`. 

416 

417 Raises: 

418 ValueError: `String` is malformed. 

419 """ 

420 if any(rlcat := list(map(stringprep.in_table_d1, string))): 

421 if not (rlcat[0] and rlcat[-1]): 

422 raise ValueError(f'{string}: Malformed RandLCat string') 

423 if any(map(stringprep.in_table_d2, string)): 

424 raise ValueError(f'{string}: Mixes RandLCat and LCat') 

425 prep = '' 

426 for i, char in enumerate(string, start=1): 

427 if stringprep.in_table_b1(char): 

428 pass 

429 elif stringprep.in_table_c12(char): 

430 prep += ' ' 

431 elif stringprep.in_table_c21_c22(char): 

432 raise ValueError(f'{string}:{i}: Control character') 

433 elif stringprep.in_table_c3(char): 

434 raise ValueError(f'{string}:{i}: Private use character') 

435 elif stringprep.in_table_c4(char): 

436 raise ValueError(f'{string}:{i}: Non-character code point') 

437 elif stringprep.in_table_c5(char): 

438 raise ValueError(f'{string}:{i}: Surrogate code point') 

439 elif stringprep.in_table_c6(char): 

440 raise ValueError(f'{string}:{i}: Not plain text') 

441 elif stringprep.in_table_c7(char): 

442 raise ValueError(f'{string}:{i}: Not canonical') 

443 elif stringprep.in_table_c8(char): 

444 raise ValueError(f'{string}:{i}: Changes display/deprecated') 

445 elif stringprep.in_table_c9(char): 

446 raise ValueError(f'{string}:{i}: Tagging character') 

447 elif stringprep.in_table_a1(char): 

448 raise ValueError(f'{string}:{i}: Unassigned code point') 

449 else: 

450 prep += char 

451 return unicodedata.ucd_3_2_0.normalize('NFKC', prep) 

452 

453 def __init__(self, adapter: BaseSASLAdapter, 

454 authcid: str, authzid: str = '', 

455 prepare: SASLPrep = SASLPrep.ALL): 

456 """Prepare authentication. 

457 

458 `authcid` and `authzid` are prepared according to :rfc:`3454` and 

459 :rfc:`4013` if ``prepare & SASLPrep.USERNAMES`` evaluates to true. 

460 

461 Arguments: 

462 conn: Connection over which to authenticate. 

463 authcid: Authentication ID (user to login as). 

464 authzid: Authorization ID (user whose rights to acquire). 

465 prepare: Which credentials to prepare. 

466 

467 Raises: 

468 ValueError: Bad characters in username. 

469 """ 

470 prepare &= SASLPrep.USERNAMES # type: ignore[assignment] 

471 self.adapter = adapter 

472 self.authcid = self.prepare(authcid) if prepare else authcid 

473 self.authzid = self.prepare(authzid) if prepare else authzid 

474 

475 # pylint: disable=useless-return 

476 def __call__(self) -> Optional[Any]: 

477 """Authenticate as :attr:`authcid`. 

478 

479 :attr:`authcid` is authorized as :attr:`authzid` 

480 if :attr:`authzid` is set (proxy authentication). 

481 

482 Returns: 

483 Data returned by the server, if any. 

484 

485 Raises: 

486 ConnectionError: Server has closed the connection. 

487 OperationError: Authentication failed. 

488 SASLCapabilityError: Some feature is not supported. 

489 SASLProtocolError: Server violated the SASL protocol. 

490 SASLSecurityError: Server verification failed. 

491 TLSCapabilityError: Channel-binding is not supported. 

492 

493 .. note:: 

494 Calls :meth:`exchange` and :meth:`end`. 

495 """ 

496 self.exchange() 

497 if self.state == AuthState.RECEIVED: 

498 self.send(b'') 

499 self.end() 

500 return None 

501 

502 def abort(self): 

503 """Abort authentication. 

504 

505 Raises: 

506 ProtocolError: Protocol violation. 

507 """ 

508 self.adapter.abort() 

509 self.state = AuthState.DONE 

510 

511 def begin(self, data: Optional[bytes] = None): 

512 """Begin authentication. 

513 

514 Arguments: 

515 data: Optional client-first message. 

516 

517 Raises: 

518 ConnectionError: Connection was closed. 

519 ProtocolError: Protocol violation. 

520 """ 

521 if self.state == AuthState.PREAUTH: 

522 self.adapter.begin(self.name, data) 

523 self.state = AuthState.SENT 

524 else: 

525 raise SASLProtocolError(f'SASL state {self.state}: Unexpected') 

526 

527 def end(self): 

528 """Conclude authentication. 

529 

530 Raises: 

531 ConnectionError: Connection was closed. 

532 OperationError: Authentication failed. 

533 ProtocolError: Protocol violation. 

534 """ 

535 if self.state == AuthState.SENT: 

536 self.adapter.end() 

537 else: 

538 raise SASLProtocolError(f'SASL state {self.state}: Unexpected') 

539 

540 @abstractmethod 

541 def exchange(self): 

542 """Exchange SASL messages.""" 

543 

544 def send(self, data: bytes): 

545 """Encode and send an SASL message. 

546 

547 Raises: 

548 ConnectionError: Connection was closed. 

549 ProtocolError: Protocol violation. 

550 

551 .. note:: 

552 Calls :meth:`begin` if needed. 

553 """ 

554 if self.state == AuthState.PREAUTH: 

555 self.begin(data) 

556 elif self.state == AuthState.RECEIVED: 

557 self.adapter.send(data) 

558 self.state = AuthState.SENT 

559 else: 

560 raise SASLProtocolError(f'SASL state {self.state}: Unexpected') 

561 

562 @classmethod 

563 def getmechs(cls: type[BaseAuthT], sort: bool = True, 

564 obsolete: bool = False) -> list[type[BaseAuthT]]: 

565 """Get authentication classes that subclass this class. 

566 

567 Arguments: 

568 sort: Sort mechanisms by :attr:`order`? 

569 obsolete: Return obsolete mechanisms? 

570 """ 

571 mechs: list[type[BaseAuthT]] = [] 

572 for subcls in cls.__subclasses__(): 

573 # pylint: disable=bad-indentation 

574 if (not subcls.__abstractmethods__ 

575 and (obsolete or not subcls.obsolete)): 

576 mechs.append(subcls) 

577 mechs.extend(subcls.getmechs(sort=False, obsolete=obsolete)) 

578 if sort: 

579 mechs.sort(key=lambda s: s.order) 

580 return mechs 

581 

582 # pylint: disable=missing-raises-doc 

583 def receive(self) -> bytes: 

584 """Receive and decode an SASL message. 

585 

586 Raises: 

587 ConnectionError: Connection was closed. 

588 OperationError: Authentication failed. 

589 ProtocolError: Protocol violation. 

590 

591 .. note:: 

592 Calls :meth:`begin` if needed. 

593 """ 

594 if self.state == AuthState.PREAUTH: 

595 self.begin() 

596 if self.state == AuthState.SENT: 

597 try: 

598 data = self.adapter.receive() 

599 self.state = AuthState.RECEIVED 

600 except SieveOperationError as err: 

601 if err.response != 'OK': 

602 raise 

603 if not err.matches('SASL'): 

604 # pylint: disable=raise-missing-from 

605 raise SASLProtocolError('Expected data') 

606 try: 

607 word = err.code[1] 

608 except ValueError as valuerr: 

609 raise SASLProtocolError('Unexpected data') from valuerr 

610 if isinstance(word, Atom) or not isinstance(word, str): 

611 # pylint: disable=raise-missing-from 

612 raise SASLProtocolError('Expected string') 

613 data = b64decode(word) 

614 self.state = AuthState.DONE 

615 return data 

616 raise SASLProtocolError(f'SASL state {self.state}: Unexpected') 

617 

618 @property 

619 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]: 

620 """Underlying socket.""" 

621 assert self.adapter 

622 return self.adapter.sock 

623 

624 @sock.setter 

625 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]): 

626 assert self.adapter 

627 self.adapter.sock = sock 

628 

629 adapter: BaseSASLAdapter 

630 """Underlying SASL adapter.""" 

631 

632 authcid: str 

633 """Authentication ID (user to login as).""" 

634 

635 authzid: str = '' 

636 """Authorization ID (user whose rights to acquire).""" 

637 

638 name: ClassVar[str] 

639 """Mechanism name.""" 

640 

641 obsolete: bool = False 

642 """Is this mechanism obsolete?""" 

643 

644 order: int = 0 

645 """Mechanism precedence.""" 

646 

647 state: AuthState = AuthState.PREAUTH 

648 """Current authentication state.""" 

649 

650 

651class BaseSASLAdapter(ABC): 

652 """Abstract base class for sending and receiving SASL messages. 

653 

654 SASL messages must be translated to the underlying protocol. 

655 This class defines the types of messages that may occur in an 

656 SASL protocol exchange. Classes that translate between SASL 

657 and the underlying protocol must subclass this class. 

658 

659 .. seealso:: 

660 :class:`BaseAuth` 

661 Abstract base class for SASL mechanisms. 

662 :rfc:`4422` 

663 Simple Authentication and Security Layer (SASL) 

664 """ 

665 

666 @abstractmethod 

667 def abort(self): 

668 """Abort authentication. 

669 

670 Raises: 

671 ProtocolError: Protocol violation. 

672 """ 

673 

674 @abstractmethod 

675 def begin(self, name: str, data: Optional[bytes] = None): 

676 """Begin authentication. 

677 

678 Arguments: 

679 name: SASL mechanism name. 

680 data: Optional client-first message. 

681 

682 Raises: 

683 ConnectionError: Connection was closed. 

684 ProtocolError: Protocol violation. 

685 """ 

686 

687 @abstractmethod 

688 def end(self): 

689 """Conclude authentication. 

690 

691 Raises: 

692 ConnectionError: Connection was closed. 

693 OperationError: Authentication failed. 

694 ProtocolError: Protocol violation. 

695 """ 

696 

697 @abstractmethod 

698 def send(self, data: bytes): 

699 """Encode and send an SASL message. 

700 

701 Raises: 

702 ConnectionError: Connection was closed. 

703 ProtocolError: Protocol violation. 

704 """ 

705 

706 @abstractmethod 

707 def receive(self) -> bytes: 

708 """Receive and decode an SASL message. 

709 

710 Raises: 

711 ConnectionError: Connection was closed. 

712 OperationError: Authentication failed. 

713 ProtocolError: Protocol violation. 

714 """ 

715 

716 @property 

717 @abstractmethod 

718 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]: 

719 """Underlying socket.""" 

720 

721 @sock.setter 

722 @abstractmethod 

723 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]): 

724 pass 

725 

726 

727# 

728# ACAP 

729# 

730 

731class BaseACAPConn(ABC): 

732 """Base class for ACAP parsers/serializers. 

733 

734 The ManageSieve protocol uses the syntax and data types of the 

735 Application Control Access Protocol (ACAP). This class provides 

736 a :meth:`parser <receiveline>` for converting ACAP lines into 

737 Python objects and a :meth:`serializer <sendline>` for converting 

738 Python objects into ACAP lines. 

739 

740 .. seealso:: 

741 :rfc:`2244` (secs. 2.2 and 2.6) 

742 ACAP commands, responses, and data formats. 

743 :rfc:`5804` (secs. 1.2 and 4) 

744 ManageSieve syntax. 

745 """ 

746 

747 # pylint: disable=missing-raises-doc 

748 def receiveline(self) -> Line: 

749 """Receive a line and parse it. 

750 

751 ================== ================= 

752 ACAP type Python type 

753 ================== ================= 

754 Atom :class:`Atom` 

755 Literal :class:`str` 

756 Nil ``None`` 

757 Number :class:`int` 

758 Parenthesised List :class:`list` 

759 String :class:`str` 

760 ================== ================= 

761 

762 For example: 

763 

764 >>> mgr.sendline(Atom('listscripts')) 

765 >>> mgr.receiveline() 

766 ['foo.sieve', 'ACTIVE'] 

767 >>> mgr.receiveline() 

768 ['bar.sieve'] 

769 >>> mgr.receiveline() 

770 ['baz.sieve'] 

771 >>> mgr.receiveline() 

772 ['OK', 'Listscripts completed.'] 

773 

774 Raises: 

775 ValueError: Line is malformed. 

776 """ 

777 assert self.file 

778 words: Line = [] 

779 stack: list[Line] = [] 

780 ptr: Line = words 

781 while line := self.file.readline().decode('utf8'): 

782 size: int = -1 

783 for token in self._lex(line): 

784 key = token.lastgroup 

785 value = token.group(key) # type: ignore[arg-type] 

786 if key == 'leftparen': 

787 parens: list[Word] = [] 

788 stack.append(ptr) 

789 ptr.append(parens) 

790 ptr = parens 

791 elif key == 'rightparen': 

792 try: 

793 ptr = stack.pop() 

794 except IndexError as err: 

795 raise ValueError('Unexpected parenthesis') from err 

796 elif key == 'atom': 

797 if value.casefold() == 'nil': 

798 ptr.append(None) 

799 else: 

800 ptr.append(Atom(value)) 

801 elif key == 'number': 

802 ptr.append(int(value)) 

803 elif key == 'string': 

804 ptr.append(value) 

805 elif key == 'literal': 

806 size = int(value) 

807 literal = self.file.read(size).decode('utf8') 

808 ptr.append(literal) 

809 elif key == 'garbage': 

810 raise ValueError('Unrecognized data') 

811 else: 

812 # NOTREACHED 

813 raise ClientSoftwareError('Unknown data type') 

814 if size == -1: 

815 break 

816 if stack: 

817 raise ValueError('Unbalanced parantheses') 

818 return words 

819 

820 def sendline(self, *objs: Union[IO[Any], 'Word'], whole: bool = True): 

821 """Convert `objs` to ACAP types and send them. 

822 

823 ================== ====================================== 

824 Python type ACAP type 

825 ================== ====================================== 

826 :class:`Atom` Atom 

827 :class:`typing.IO` Literal 

828 ``None`` Nil 

829 :class:`bytes` Literal or String [#literal]_ 

830 :class:`list` Parenthesised List 

831 :class:`int` Number [#nums]_ 

832 :class:`str` Literal or String [#literal]_ [#utf8]_ 

833 ================== ====================================== 

834 

835 For example: 

836 

837 >>> mgr.sendline(Atom('havespace'), 'script.sieve', 12345) 

838 >>> mgr.receiveline() 

839 ['OK', 'Putscript would succeed.'] 

840 

841 The low-level interface can be used to pipeline commands: 

842 

843 >>> mgr.isconn(check=True) 

844 >>> with open('foo.sieve') as script: 

845 >>> mgr.sendline(Atom('putscript', script, 'foo.sieve')) 

846 >>> mgr.sendline(Atom('logout')) 

847 >>> for _ in range(2): 

848 >>> mgr.collect(check=True) 

849 

850 Arguments: 

851 objs: Objects to serialize. 

852 whole: Conclude data with CRLF? 

853 

854 Raises: 

855 ValueError: Number is not within the range [0, 4,294,967,295]. 

856 TypeError: Object cannot be represented as ACAP data type. 

857 

858 .. [#literal] Depending on content. 

859 .. [#nums] Numbers must be within the range [0, 4,294,967,295] 

860 .. [#utf8] Strings are encoded in UTF-8 and normalised to form C. 

861 """ 

862 assert self.file 

863 

864 write = self.file.write 

865 normalize = unicodedata.normalize 

866 isstr = self._isstr 

867 

868 def encode(s: str) -> bytes: 

869 return normalize('NFC', s).encode('utf8') 

870 

871 def writestr(b: bytes): 

872 write(b'"%s"' % b if isstr(b) else b'{%d+}\r\n%s' % (len(b), b)) 

873 

874 for i, obj in enumerate(objs): 

875 if i > 0: 

876 write(b' ') 

877 if obj is None: 

878 write(b'NIL') 

879 elif isinstance(obj, Atom): 

880 write(encode(obj)) 

881 elif isinstance(obj, int): 

882 if not 0 <= obj < 4_294_967_296: 

883 raise ValueError(f'{obj}: Not in [0, 4,294,967,295]') 

884 write(encode(str(obj))) 

885 elif isinstance(obj, bytes): 

886 writestr(obj) 

887 elif isinstance(obj, str): 

888 writestr(encode(obj)) 

889 elif isinstance(obj, (IO, io.IOBase, SpooledTemporaryFile)): 

890 write(b'{%d+}\r\n' % getfilesize(obj)) 

891 while block := obj.read(io.DEFAULT_BUFFER_SIZE): 

892 write(encode(block) if isinstance(block, str) else block) 

893 elif isinstance(obj, Iterable): # type: ignore 

894 write(b'(') 

895 self.sendline(*obj, whole=False) 

896 write(b')') 

897 else: 

898 raise TypeError(type(obj).__name__ + ': Not an ACAP type') 

899 

900 if whole: 

901 write(b'\r\n') 

902 self.file.flush() 

903 

904 @property 

905 @abstractmethod 

906 def file(self) -> Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]: 

907 """File-like access to the underlying socket.""" 

908 

909 @file.setter 

910 @abstractmethod 

911 def file(self, file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]]): 

912 pass 

913 

914 _lex: ClassVar[Callable] = re.compile('|'.join(( 

915 r'\b(?P<atom>[a-z][^(){}\s\\]*)\b', 

916 r'\b(?P<number>\d+)\b', 

917 r'"(?P<string>[^\0\r\n"]*)"', 

918 r'{(?P<literal>\d+)\+?}', 

919 r'(?P<leftparen>\()', 

920 r'(?P<rightparen>\))', 

921 r'(?P<garbage>\S+)' 

922 )), flags=re.IGNORECASE).finditer 

923 """ACAP Lexer.""" 

924 

925 # Strings may be 1024 octets long, but I limit the length to 1020 octets 

926 # to allow for errors (two octets for quotations marks, one for the 

927 # terminating null byte, one for other off-by-one errors). 

928 _isstr: Callable[..., Optional[re.Match]] = \ 

929 re.compile(br'[^\0\r\n"]{0,1020}').fullmatch 

930 """Check whether bytes can be represented as ACAP string.""" 

931 

932 

933# 

934# ManageSieve 

935# 

936 

937class SieveConn(BaseACAPConn): 

938 """Low-level connection to a ManageSieve server. 

939 

940 For example: 

941 

942 >>> conn = SieveConn('imap.foo.example') 

943 >>> conn.authenticate('user', 'password') 

944 >>> with open('script.sieve', 'br') as file: 

945 >>> conn.execute('putscript', file.name, file) 

946 >>> conn.execute('logout') 

947 

948 .. warning:: 

949 :class:`SieveConn` is not thread-safe. 

950 

951 .. seealso:: 

952 :rfc:`2244` 

953 Application Configuration Access Protocol 

954 :rfc:`2782` 

955 DNS SRV 

956 :rfc:`5804` 

957 ManageSieve 

958 """ 

959 

960 def __init__(self, *args, **kwargs): 

961 """Create a :class:`SieveConn` object. 

962 

963 `args` and `kwargs` are passed to :meth:`open` if given. 

964 Otherwise, no connection is established. 

965 

966 For example: 

967 

968 >>> with SieveConn('imap.host.example') as conn: 

969 >>> conn.authenticate('user', 'password') 

970 >>> ... 

971 

972 >>> with SieveConn() as conn: 

973 >>> conn.open('imap.host.example') 

974 >>> conn.authenticate('user', 'password') 

975 >>> ... 

976 

977 Arguments: 

978 args: Positional arguments for :meth:`open`. 

979 kwargs: Keyword arguments for :meth:`open`. 

980 

981 Raises: 

982 ClientConnectionError: Socket error. 

983 SieveCapabilityError: "STARTTLS" not supported. 

984 SieveProtocolError: Server violated the ManageSieve protocol. 

985 TLSSecurityError: Server certificate has been revoked. 

986 """ 

987 if args or kwargs: 

988 self.open(*args, **kwargs) 

989 

990 def __del__(self): 

991 """Shut the connection down.""" 

992 with suppress(OSError): 

993 self.shutdown() 

994 

995 def authenticate(self, login: str, *auth, owner: str = '', 

996 sasl: Union[AuthMech, Iterable[AuthMech]] = (), 

997 logauth: bool = False, **kwauth): 

998 """Authenticate as `login`. 

999 

1000 How the user is authenticated depends on the type of SASL_ mechanisms 

1001 given in `sasl` (e.g., password-based or external). 

1002 

1003 If no mechanisms are given, authentication is attempted with every 

1004 non-obsolete password-based mechanism that is supported, starting with 

1005 those with better security properties and progressing to those with 

1006 worse security properties. 

1007 

1008 Unrecognized arguments are passed on to SASL mechanism constructors. 

1009 Password-based mechanisms require a password: 

1010 

1011 >>> mgr.authenticate('user', 'password') 

1012 

1013 By contrast, the "EXTERNAL" mechanism takes no arguments: 

1014 

1015 >>> mgr.authenticate('user', sasl=ExternalAuth) 

1016 

1017 If an `owner` is given, the scripts of that `owner` are managed 

1018 instead of those owned by `login`. This requires elevated privileges. 

1019 

1020 Arguments: 

1021 login: User to login as (authentication ID). 

1022 owner: User whose scripts to manage (authorization ID). 

1023 sasl: SASL mechanisms (default: :meth:`BasePwdAuth.getmechs`). 

1024 logauth: Log authentication exchange? 

1025 auth: Positional arguments for SASL mechanism constructors. 

1026 kwauth: Keyword arguments for SASL mechanism constructors. 

1027 

1028 Raises: 

1029 ClientConnectionError: Socket error. 

1030 ClientOperationError: Another operation is already in progress. 

1031 SASLCapabilityError: Authentication mechanisms exhausted. 

1032 SASLProtocolError: Server violated the SASL protocol. 

1033 SASLSecurityError: Server could not be verified. 

1034 SieveConnectionError: Server has closed the connection. 

1035 SieveOperationError: Authentication failed. 

1036 SieveProtocolError: Server violated the ManageSieve protocol. 

1037 ValueError: Bad characters in credentials. 

1038 

1039 .. note:: 

1040 If an `owner` is given, but the selected authentication mechanism 

1041 does not support proxy authentication, an error is logged to the 

1042 console and authentication is attempted with the next mechanism. 

1043 """ 

1044 kwauth['authzid'] = owner 

1045 logger = self.logger 

1046 self._auth = auth 

1047 self._kwauth = kwauth 

1048 self._logauth = logauth 

1049 self._sasl = (BasePwdAuth.getmechs() if not sasl else 

1050 sasl if isinstance(sasl, Iterable) else 

1051 (sasl,)) 

1052 

1053 def authenticate(): 

1054 # pylint: disable=consider-using-with 

1055 if not self.lock.acquire(blocking=False): 

1056 raise ClientOperationError(os.strerror(EINPROGRESS)) 

1057 if isinstance(self.file, LogIOWrapper) and not logauth: 

1058 self.file.quiet = True 

1059 try: 

1060 for mech in self._sasl: 

1061 assert self.capabilities 

1062 if mech.name.casefold() in self.capabilities.sasl: 

1063 conn = SieveSASLAdapter(self) 

1064 try: 

1065 run = mech(conn, login, *auth, **kwauth) 

1066 if caps := run(): 

1067 self.capabilities = caps 

1068 except SASLCapabilityError as err: 

1069 logger.error(err) 

1070 continue 

1071 except SieveOperationError as err: 

1072 # TRANSITION-NEEDED need not be treated specially. 

1073 if err.matches('AUTH-TOO-WEAK', 

1074 'ENCRYPT-NEEDED', 

1075 'TRANSITION-NEEDED'): 

1076 logger.error(err) 

1077 continue 

1078 raise 

1079 self.login = run.authcid 

1080 logger.info('Authenticated as %s using %s', 

1081 run.authcid, mech.name.upper()) 

1082 if authzid := run.authzid: 

1083 self.owner = authzid 

1084 logger.info('Authorized as %s', authzid) 

1085 return 

1086 raise SASLCapabilityError('SASL mechanisms exhausted') 

1087 finally: 

1088 if isinstance(self.file, LogIOWrapper): 

1089 self.file.quiet = False 

1090 self.lock.release() 

1091 # NOTREACHED 

1092 

1093 self.isconn(check=True) 

1094 self._withfollow(authenticate) 

1095 

1096 def close(self): 

1097 """Close the client side of the connection. 

1098 

1099 .. warning:: 

1100 Call only when the server has closed the connection. 

1101 """ 

1102 if self.file is not None: 

1103 self.file.close() 

1104 self.file = None 

1105 if self.poll is not None: 

1106 assert self.sock 

1107 self.poll.unregister(self.sock) 

1108 self.poll = None 

1109 if self.sock is not None: 

1110 self.sock.close() 

1111 self.sock = None 

1112 self._auth = () 

1113 self._kwauth = {} 

1114 self.capabilities = None 

1115 self.host = None 

1116 self.port = None 

1117 self.login = '' 

1118 self.owner = '' 

1119 

1120 def collect(self, check: bool = False) -> tuple['Response', list[Line]]: 

1121 """Collect the server's response to the last command. 

1122 

1123 For example: 

1124 

1125 >>> conn.sendline(Atom('listscripts')) 

1126 >>> conn.collect() 

1127 (Response(response=Atom('OK'), code=(), message=None), 

1128 [['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']]) 

1129 

1130 Arguments: 

1131 check: Raise an error if the response is not "OK"? 

1132 

1133 Raises: 

1134 ClientConnectionError: Socket error. 

1135 SieveConnectionError: Server said "BYE". [#collect-check]_ 

1136 SieveOperationError: Server said "NO". [#collect-check]_ 

1137 SieveProtocolError: Server violated the ManageSieve protocol. 

1138 

1139 .. [#collect-check] Only raised if `check` is `True`. 

1140 """ 

1141 lines: list[Line] = [] 

1142 while True: 

1143 try: 

1144 line = self.receiveline() 

1145 except ValueError as err: 

1146 raise SieveProtocolError(str(err)) from err 

1147 if line and isinstance(line[0], Atom): 

1148 res = Response.fromline(line) 

1149 if check and res.response != 'OK': 

1150 raise res.toerror() 

1151 self.warning = res.message if res.matches('WARNINGS') else None 

1152 return res, lines 

1153 lines.append(line) 

1154 # NOTREACHED 

1155 

1156 def execute(self, command: str, *args: Union[IO, Word]) \ 

1157 -> tuple['Response', list[Line]]: 

1158 """Execute `command` and return the server's response. 

1159 

1160 For example: 

1161 

1162 >>> conn.execute('listscripts') 

1163 (Response(response=Atom('OK'), code=(), message=None), 

1164 [['foo.sieve', 'ACTIVE'], ['bar.sieve'], ['baz.sieve']]) 

1165 

1166 Raises: 

1167 ClientConnectionError: Socket error. 

1168 ClientOperationError: Another operation is already in progress. 

1169 SieveConnectionError: Server said "BYE". 

1170 SieveOperationError: Server said "NO". 

1171 SieveProtocolError: Server violated the ManageSieve protocol. 

1172 

1173 .. note:: 

1174 Referrals are followed automatically. 

1175 """ 

1176 def execute() -> tuple['Response', list[Line]]: 

1177 # pylint: disable=consider-using-with 

1178 if not self.lock.acquire(blocking=False): 

1179 raise ClientOperationError(os.strerror(EINPROGRESS)) 

1180 try: 

1181 self.isconn(check=True) 

1182 self.sendline(Atom(command.upper()), *args) 

1183 res, data = self.collect() 

1184 finally: 

1185 self.lock.release() 

1186 if res.response != 'OK': 

1187 raise res.toerror() 

1188 return res, data 

1189 

1190 assert command 

1191 return self._withfollow(execute) 

1192 

1193 def geturl(self) -> Optional['URL']: 

1194 """URL of the current connection. 

1195 

1196 For example: 

1197 

1198 >>> with SieveManager('imap.foo.example') as mgr: 

1199 >>> mgr.authenticate('user', 'password') 

1200 >>> mgr.geturl() 

1201 'sieve://user@imap.foo.example' 

1202 

1203 .. note:: 

1204 Only changes to the connection state effected by :meth:`open`, 

1205 :meth:`close`, :meth:`shutdown`, :meth:`authenticate`, 

1206 :meth:`unauthenticate`, and referrals are tracked. 

1207 """ 

1208 if self.host: 

1209 return URL(hostname=self.host, 

1210 port=self.port if self.port != 4190 else None, 

1211 username=self.login, 

1212 owner=self.owner) 

1213 return None 

1214 

1215 def isconn(self, check: bool = False) -> bool: 

1216 """Check whether :attr:`sock` is connected. 

1217 

1218 Arguments: 

1219 check: Raise an error if :attr:`sock` is *not* connected. 

1220 

1221 Raises: 

1222 ClientConnectionError: Socket error. [#isconn-check]_ 

1223 

1224 .. [#isconn-check] Only raised if `check` is `True`. 

1225 """ 

1226 assert self.poll 

1227 (_, events), = self.poll.poll() 

1228 try: 

1229 if events & select.POLLERR: 

1230 raise ClientConnectionError(ENOTCONN, 'Socket error') 

1231 if events & select.POLLHUP: 

1232 raise ClientConnectionError(ETIMEDOUT, os.strerror(ETIMEDOUT)) 

1233 if events & select.POLLNVAL: 

1234 raise ClientConnectionError(ENOTCONN, os.strerror(ENOTCONN)) 

1235 except ClientConnectionError: 

1236 if check: 

1237 raise 

1238 return True 

1239 

1240 # pylint: disable=missing-raises-doc, redefined-outer-name 

1241 def open(self, host: str, port: int = 4190, 

1242 source: tuple[str, int] = ('', 0), 

1243 timeout: Optional[float] = socket.getdefaulttimeout(), 

1244 tls: bool = True, ocsp: bool = True): 

1245 """Connect to `host` at `port`. 

1246 

1247 Arguments: 

1248 host: Server name or address. 

1249 port: Server port. 

1250 source: Source address and port. 

1251 timeout: Timeout in seconds. 

1252 tls: Secure the connection? 

1253 ocsp: Check whether the server certificate was revoked? 

1254 

1255 Raises: 

1256 ConnectionError: Connection failed. 

1257 SieveCapabilityError: "STARTTLS" not supported. 

1258 SieveProtocolError: Server violated the ManageSieve protocol. 

1259 TLSSecurityError: Server certificate has been revoked. 

1260 """ 

1261 # pylint: disable=consider-using-with 

1262 if not self.lock.acquire(blocking=False): 

1263 raise ClientOperationError(os.strerror(EINPROGRESS)) 

1264 try: 

1265 self._connect(host, port, source, timeout) 

1266 _, lines = self.collect(check=True) 

1267 self._source = source 

1268 self.capabilities = Capabilities.fromlines(lines) 

1269 self.logger.info('Connected to %s:%d', host, port) 

1270 finally: 

1271 self.lock.release() 

1272 if tls: 

1273 self._starttls(ocsp=ocsp) 

1274 

1275 def shutdown(self): 

1276 """Shut the connection down. 

1277 

1278 .. note:: 

1279 Use only when :meth:`logging out <logout>` would be unsafe. 

1280 """ 

1281 if self.sock is not None: 

1282 self.sock.shutdown(socket.SHUT_RDWR) 

1283 self.logger.info('Shut connection down') 

1284 self.close() 

1285 

1286 def _connect(self, host: str, port: int = 4190, 

1287 source: tuple[str, int] = ('', 0), 

1288 timeout: Optional[float] = socket.getdefaulttimeout()): 

1289 """Connect to `host` at `port`. 

1290 

1291 Arguments: 

1292 host: Server address. 

1293 port: Server port. 

1294 source: Source address and port. 

1295 timeout: Timeout in seconds. 

1296 

1297 Raises: 

1298 ConnectionError: Connection failed. 

1299 """ 

1300 def connect(host: str): 

1301 self.sock = socket.create_connection( 

1302 (host, port), 

1303 timeout=timeout, source_address=source 

1304 ) 

1305 

1306 if self.sock or self.file: 

1307 raise ClientConnectionError(EISCONN, os.strerror(EISCONN)) 

1308 if isinetaddr(host): 

1309 connect(host) 

1310 else: 

1311 try: 

1312 # This is the algorithm specified by RFC 2782, NOT the one 

1313 # specified by RFC 5804, sec. 1.8, which is wrong. 

1314 records = list(resolvesrv(f'_sieve._tcp.{host}.')) 

1315 for rec in records: 

1316 try: 

1317 connect(rec.host) 

1318 except OSError: 

1319 if rec == records[-1]: 

1320 raise 

1321 except DNSError: 

1322 connect(host) 

1323 if not self.sock: 

1324 raise ClientConnectionError(ECONNABORTED, 

1325 os.strerror(ECONNABORTED)) 

1326 file = self.sock.makefile('rwb') # type: ignore[attr-defined] 

1327 self.file = LogIOWrapper.wrap(file, self.logger) 

1328 self.poll = select.poll() 

1329 self.poll.register(self.sock) 

1330 self.host = host 

1331 self.port = port 

1332 

1333 def _follow(self, url: str): 

1334 """Close the connection, :meth:`open <open>` `url`, and reauthenticate. 

1335 

1336 Raises: 

1337 ClientConnectionError: Socket error. 

1338 SieveProtocolError: Server violated the ManageSieve protocol. 

1339 """ 

1340 try: 

1341 ref = URL.fromstr(url) 

1342 except ValueError as err: 

1343 raise SieveProtocolError(err) from err 

1344 self.logger.info('Referred to %s', url) 

1345 (oargs, okwargs), (auargs, aukwargs) = self._getstate() 

1346 self.close() 

1347 self.open(ref.hostname, ref.port or 4190, *oargs[2:], **okwargs) 

1348 self.authenticate(*auargs, **aukwargs) 

1349 

1350 def _getstate(self) -> Iterator[tuple[list, dict[str, Any]]]: 

1351 """Get arguments to re-establish the current connection. 

1352 

1353 For example: 

1354 

1355 >>> mgr.geturl() 

1356 sieve://user@imap.foo.example 

1357 >>> (oargs, okwargs), (auth, kwauth) = mgr._getstate() 

1358 >>> mgr.shutdown() 

1359 >>> mgr.geturl() 

1360 >>> mgr.open(*oargs, **okwargs) 

1361 >>> mgr.authenticate(*auth, **kwauth) 

1362 >>> mgr.geturl() 

1363 sieve://user@imap.foo.example 

1364 """ 

1365 for meth in (self.open, self.authenticate): 

1366 args = [] 

1367 kwargs = {} 

1368 signature = inspect.signature(meth) # type: ignore[arg-type] 

1369 for name, param in list(signature.parameters.items()): 

1370 try: 

1371 value = getattr(self, name) 

1372 except AttributeError: 

1373 value = getattr(self, f'_{name}') 

1374 if param.kind in (Parameter.POSITIONAL_ONLY, 

1375 Parameter.POSITIONAL_OR_KEYWORD): 

1376 args.append(value) 

1377 elif param.kind == Parameter.VAR_POSITIONAL: 

1378 args.extend(value) 

1379 elif param.kind == Parameter.KEYWORD_ONLY: 

1380 kwargs[name] = value 

1381 elif param.kind == Parameter.VAR_KEYWORD: 

1382 kwargs.update(value) 

1383 yield (args, kwargs) 

1384 

1385 # pylint: disable=redefined-outer-name 

1386 def _starttls(self, ocsp: bool = True): 

1387 """Start TLS encryption. 

1388 

1389 Arguments: 

1390 ocsp: Check whether the server certificate was revoked? 

1391 

1392 Raises: 

1393 ClientConnectionError: Socket error. 

1394 ClientOperationError: Another operation is already in progress. 

1395 SieveCapabilityError: "STARTTLS" not supported. 

1396 TLSSecurityError: Server certificate has been revoked. 

1397 

1398 .. note:: 

1399 Called automatically by :meth:`open` unless `tls` is `False`. 

1400 """ 

1401 assert self.sock 

1402 assert self.capabilities 

1403 if not self.capabilities.starttls: 

1404 raise SieveCapabilityError('STARTTLS: not supported') 

1405 self.execute('STARTTLS') 

1406 # pylint: disable=consider-using-with 

1407 if not self.lock.acquire(blocking=False): 

1408 raise ClientOperationError(os.strerror(EINPROGRESS)) 

1409 try: 

1410 host = self.host 

1411 self.sock = self.sslcontext.wrap_socket( 

1412 self.sock, # type: ignore[arg-type] 

1413 server_hostname=host 

1414 ) 

1415 file = self.sock.makefile('rwb') 

1416 self.file = LogIOWrapper.wrap(file, self.logger) # type: ignore 

1417 _, lines = self.collect(check=True) 

1418 self.capabilities = Capabilities.fromlines(lines) 

1419 self.ocsp = ocsp 

1420 proto = self.sock.version() 

1421 cipher = self.sock.cipher() 

1422 if proto and cipher: 

1423 self.logger.info('Started %s using %s', proto, cipher[0]) 

1424 if ocsp: 

1425 if HAVE_CRYPTOGRAPHY: 

1426 der = self.sock.getpeercert(binary_form=True) 

1427 if not der: 

1428 raise TLSSoftwareError('No peer certificate') 

1429 cert = x509.load_der_x509_certificate(der) 

1430 if certrevoked(cert, logger=self.logger): 

1431 raise TLSSecurityError(f'{host}: Certificate revoked') 

1432 else: 

1433 self.logger.error('Module "cryptography" not found') 

1434 finally: 

1435 self.lock.release() 

1436 

1437 def _withfollow(self, func: Callable[..., T], *args, **kwargs) -> T: 

1438 """Call `func` and follow referrals. 

1439 

1440 For example: 

1441 

1442 >>> mgr._withfollow(mgr.execute, 'listscripts') 

1443 

1444 Arguments: 

1445 func: Function to call. 

1446 args: Positional arguments for `func`. 

1447 kwargs: Keyword arguments for `func`. 

1448 

1449 Returns: 

1450 The return value of `func`. 

1451 

1452 Raises: 

1453 ClientConnectionError: Socket error. 

1454 ClientOperationError: Another operation is already in progress. 

1455 SieveConnectionError: Server said "BYE". 

1456 SieveOperationError: Server said "NO". 

1457 SieveProtocolError: Server violated the ManageSieve protocol. 

1458 

1459 .. seealso:: 

1460 :rfc:`5804` (sec. 1.3) 

1461 ManageSieve "REFERRAL" response codes 

1462 """ 

1463 while True: 

1464 try: 

1465 return func(*args, **kwargs) 

1466 except SieveConnectionError as err: 

1467 if err.matches('REFERRAL'): 

1468 try: 

1469 url = err.code[1] 

1470 except IndexError as exc: 

1471 raise SieveProtocolError('Unexpected data') from exc 

1472 if isinstance(url, Atom) or not isinstance(url, str): 

1473 # pylint: disable=raise-missing-from 

1474 raise SieveProtocolError('Expected string') 

1475 self._follow(url) 

1476 continue 

1477 raise 

1478 # NOTREACHED 

1479 

1480 def _withreopen(self, func: Callable[..., T], *args, **kwargs) -> T: 

1481 """Call `func` and retry if the connection is closed. 

1482 

1483 For example: 

1484 

1485 >>> mgr._withreopen(mgr.execute, 'listscripts') 

1486 

1487 Arguments: 

1488 func: Function to call. 

1489 args: Positional arguments for `func`. 

1490 kwargs: Keyword arguments for `func`. 

1491 

1492 Raises: 

1493 ClientConnectionError: Socket error. 

1494 ClientOperationError: Another operation is already in progress. 

1495 SieveConnectionError: Server said "BYE". 

1496 SieveOperationError: Server said "NO". 

1497 SieveProtocolError: Server violated the ManageSieve protocol. 

1498 

1499 Returns: 

1500 The return value of `func`. 

1501 """ 

1502 (oargs, okwargs), (auth, kwauth) = self._getstate() 

1503 try: 

1504 return func(*args, **kwargs) 

1505 except (ConnectionError, TimeoutError, socket.herror, socket.gaierror): 

1506 self.close() 

1507 self.open(*oargs, **okwargs) 

1508 self.authenticate(*auth, **kwauth) 

1509 return func(*args, **kwargs) 

1510 

1511 @property 

1512 def timeout(self) -> Optional[float]: 

1513 """Connection timeout in seconds. 

1514 

1515 Set timeout to 500 ms: 

1516 

1517 >>> mgr.timeout = 0.5 

1518 

1519 .. note:: 

1520 The timeout can only be set while a connection is open. 

1521 """ 

1522 if self.sock: 

1523 return self.sock.gettimeout() 

1524 return None 

1525 

1526 @timeout.setter 

1527 def timeout(self, secs: Optional[float]): 

1528 if self.sock: 

1529 self.sock.settimeout(secs) 

1530 

1531 @property 

1532 def tls(self) -> Optional[str]: # type: ignore[override] 

1533 """TLS version.""" 

1534 if isinstance(self.sock, ssl.SSLSocket): 

1535 return self.sock.version() 

1536 return None 

1537 

1538 capabilities: Optional[Capabilities] = None 

1539 """Server capabilities.""" 

1540 

1541 file: Optional[Union[IO, io.BufferedRWPair, LogIOWrapper]] = None 

1542 """File-like access to :attr:`sock`.""" 

1543 

1544 host: Optional[str] = None 

1545 """Remote address.""" 

1546 

1547 lock: threading.Lock = threading.Lock() 

1548 """Operation lock.""" 

1549 

1550 logger: logging.Logger = logging.getLogger(__name__) 

1551 """Logger to use. 

1552 

1553 Messages are logged with the following priorities: 

1554 

1555 ====================== ===================================== 

1556 Priority Used for 

1557 ====================== ===================================== 

1558 :const:`logging.ERROR` Non-fatal errors 

1559 :const:`logging.INFO` State changes 

1560 :const:`logging.DEBUG` Data sent to/received from the server 

1561 ====================== ===================================== 

1562 

1563 Suppress logging: 

1564 

1565 >>> from logging import getLogger 

1566 >>> getLogger('sievemgr').setLevel(logging.CRITICAL) 

1567 

1568 Use a custom logger: 

1569 

1570 >>> from logging import getLogger 

1571 >>> mgr.logger = getLogger('foo').addHandler(logging.NullHandler()) 

1572 

1573 Print data send to/received from the server to standard error: 

1574 

1575 >>> from logging import getLogger 

1576 >>> getLogger('sievemgr').setLevel(logging.DEBUG) 

1577 >>> mgr.listscripts() 

1578 C: LISTSCRIPTS 

1579 S: "foo.sieve" ACTIVE 

1580 S: "bar.sieve" 

1581 S: "baz.sieve" 

1582 S: OK "Listscripts completed" 

1583 (Response(response=Atom('OK'), code=(), message=None), 

1584 [('foo.sieve', True), ('bar.sieve', False), ('baz.sieve', False)]) 

1585 """ 

1586 

1587 login: str = '' 

1588 """Login name (authentication ID).""" 

1589 

1590 ocsp: bool 

1591 """Check whether the server certificate was revoked?""" 

1592 

1593 owner: str = '' 

1594 """User whose scripts are managed (authorization ID).""" 

1595 

1596 poll: Optional[select.poll] = None 

1597 """Polling object for :attr:`sock`.""" 

1598 

1599 port: Optional[int] = None 

1600 """Remote port.""" 

1601 

1602 sock: Optional[socket.SocketType] = None 

1603 """Underlying socket.""" 

1604 

1605 sslcontext: ssl.SSLContext = ssl.create_default_context() 

1606 """Settings for negotiating Transport Layer Security (TLS). 

1607 

1608 Disable workarounds for broken X.509 certificates: 

1609 

1610 >>> with SieveManager() as mgr: 

1611 >>> mgr.sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT 

1612 >>> mgr.open('imap.foo.example') 

1613 >>> ... 

1614 

1615 Load client certificate/key pair: 

1616 

1617 >>> with SieveManager() as mgr: 

1618 >>> mgr.sslcontext.load_cert_chain(cert='cert.pem') 

1619 >>> mgr.open('imap.foo.example') 

1620 >>> ... 

1621 

1622 Use a custom certificate authority: 

1623 

1624 >>> with SieveManager() as mgr: 

1625 >>> mgr.sslcontext.load_verify_locations(cafile='ca.pem') 

1626 >>> mgr.open('imap.foo.example') 

1627 >>> ... 

1628 """ 

1629 

1630 warning: Optional[str] = None 

1631 """Warning issued in response to the last "CHECKSCRIPT" or "PUTSCRIPT". 

1632 

1633 For example: 

1634 

1635 >>> with open('script.sieve', 'br') as file: 

1636 >>> mgr.execute('putscript', file, 'script.sieve') 

1637 (Response(response=Atom('OK'), code=('warnings,'), 

1638 message='line 7: may need to be frobnicated'), []) 

1639 >>> mgr.warning 

1640 'line 7: may need to be frobnicated' 

1641 

1642 .. note:: 

1643 Only set by :meth:`collect`, :meth:`execute`, 

1644 :meth:`checkscript`, and :meth:`putscript`. 

1645 

1646 .. seealso:: 

1647 :rfc:`5804` (sec. 1.3) 

1648 ManageSieve "WARNINGS" response code. 

1649 """ 

1650 

1651 _auth: tuple = () 

1652 """Positional arguments for SASL mechanism constructors.""" 

1653 

1654 _kwauth: dict[str, Any] = {} 

1655 """Keyword arguments for SASL mechanism constructors.""" 

1656 

1657 _logauth: bool 

1658 """Log the authentication exchange?""" 

1659 

1660 _sasl: Iterable[AuthMech] = () 

1661 """SASL mechanisms.""" 

1662 

1663 _source: tuple[str, int] = ('', 0) 

1664 """Source address and port.""" 

1665 

1666 

1667class SieveManager(SieveConn, AbstractContextManager): 

1668 """Connection to a ManageSieve server. 

1669 

1670 For example: 

1671 

1672 >>> with SieveManager('imap.foo.example') as mgr: 

1673 >>> mgr.authenticate('user', 'password') 

1674 >>> with open('sieve.script', 'br') as script: 

1675 >>> mgr.putscript(script, 'sieve.script') 

1676 >>> mgr.setactive('sieve.script') 

1677 

1678 .. warning:: 

1679 :class:`SieveManager` is not thread-safe. 

1680 """ 

1681 

1682 # pylint: disable=redefined-outer-name 

1683 def __init__(self, *args, backup: int = 0, 

1684 memory: int = 524_288, **kwargs): 

1685 """Create a :class:`SieveManager` object. 

1686 

1687 `args` and `kwargs` are passed to :meth:`open` if given. 

1688 Otherwise, no connection is established. 

1689 

1690 Arguments: 

1691 backup: How many backups to keep by default. 

1692 memory: See `max_size` in :class:`tempfile.SpooledTemporaryFile`. 

1693 args: Positional arguments for :meth:`open`. 

1694 kwargs: Keyword arguments for :meth:`open`. 

1695 

1696 Raises: 

1697 ClientConnectionError: Socket error. 

1698 SieveCapabilityError: "STARTTLS" not supported. 

1699 SieveProtocolError: Server violated the ManageSieve protocol. 

1700 TLSSecurityError: Server certificate has been revoked. 

1701 """ 

1702 super().__init__(*args, **kwargs) 

1703 self.backup: int = backup 

1704 self.memory: int = memory 

1705 

1706 def __exit__(self, _exctype, excvalue, _traceback): 

1707 """Exit the context and close the connection appropriately.""" 

1708 if isinstance(excvalue, (ConnectionError, TimeoutError)): 

1709 self.close() 

1710 elif isinstance(excvalue, ProtocolError): 

1711 self.shutdown() 

1712 else: 

1713 try: 

1714 self.logout() 

1715 except ClientOperationError: 

1716 try: 

1717 self.shutdown() 

1718 except OSError: 

1719 self.close() 

1720 

1721 def backupscript(self, script: str, keep: int = 1): 

1722 """Make an Emacs-style backup of `script`. 

1723 

1724 `keep` = 0 

1725 Do nothing. 

1726 

1727 `keep` = 1 

1728 :file:`script` is backed up as :file:`script~`. 

1729 

1730 `keep` > 1 

1731 :file:`script` is backed up as :file:`script.~{n}~`. 

1732 `n` starts with 1 and increments with each backup. 

1733 Old backups are deleted if there are more than `keep` backups. 

1734 

1735 For example: 

1736 

1737 >>> mgr.listscripts() 

1738 [('script.sieve', True)] 

1739 >>> mgr.backupscript('script.sieve', keep=0) 

1740 >>> mgr.listscripts() 

1741 [('script.sieve', True)] 

1742 

1743 >>> mgr.listscripts() 

1744 [('script.sieve', True)] 

1745 >>> mgr.backupscript('script.sieve', keep=1) 

1746 >>> mgr.listscripts() 

1747 [('script.sieve', True), ('script.sieve~', False)] 

1748 

1749 >>> mgr.listscripts() 

1750 [('script.sieve', True)] 

1751 >>> mgr.backupscript('script.sieve', keep=2) 

1752 >>> mgr.listscripts() 

1753 [('script.sieve', True), ('script.sieve.~1~', False)] 

1754 >>> mgr.backupscript('script.sieve', keep=2) 

1755 >>> mgr.listscripts() 

1756 [('script.sieve', True), 

1757 ('script.sieve.~1~', False), 

1758 ('script.sieve.~2~', False)] 

1759 >>> mgr.backupscript('script.sieve', keep=2) 

1760 >>> mgr.listscripts() 

1761 [('script.sieve', True), 

1762 ('script.sieve.~2~', False), 

1763 ('script.sieve.~3~', False)] 

1764 

1765 Arguments: 

1766 script: Script name. 

1767 keep: How many backups to keep. 

1768 

1769 Raises: 

1770 ClientConnectionError: Socket error. 

1771 ClientOperationError: Another operation is already in progress. 

1772 SieveConnectionError: Server has closed the connection. 

1773 SieveProtocolError: Server violated the ManageSieve protocol. 

1774 """ 

1775 def getfiles() -> Iterator[str]: 

1776 for script, _ in self.listscripts(): 

1777 yield script 

1778 

1779 def copy(src: str, targ: str): 

1780 self.copyscript(src, targ, backup=0) 

1781 

1782 backup(script, keep, getfiles, copy, self.deletescript) 

1783 

1784 def checkscript(self, script: Union[str, IO]): 

1785 """Check whether `script` is valid. 

1786 

1787 Syntax errors trigger a :exc:`SieveOperationError`. 

1788 Semantic errors are reported in :attr:`warning`. 

1789 

1790 For example: 

1791 

1792 >>> checkscript('foo') 

1793 Traceback (most recent call last): 

1794 [...] 

1795 SieveOperationError: line 1: error: expected end of command ';' 

1796 error: parse failed. 

1797 

1798 >>> checkscript('# foo') 

1799 >>> 

1800 

1801 Arguments: 

1802 script: Script (*not* script name). 

1803 

1804 Raises: 

1805 ClientConnectionError: Socket error. 

1806 ClientOperationError: Another operation is already in progress. 

1807 SieveCapabilityError: "CHECKSCRIPT" not supported. 

1808 SieveConnectionError: Server has closed the connection. 

1809 SieveOperationError: `Script` contains syntax errors. 

1810 SieveProtocolError: Server violated the ManageSieve protocol. 

1811 

1812 .. important:: 

1813 Sieve scripts must be encoded in UTF-8. 

1814 """ 

1815 assert self.capabilities 

1816 if not self.capabilities.version: 

1817 raise SieveCapabilityError('CHECKSCRIPT: Not supported') 

1818 self.execute('CHECKSCRIPT', script) 

1819 

1820 def copyscript(self, source: str, target: str, 

1821 backup: Optional[int] = None): 

1822 """Download `source` and re-upload it as `target`. 

1823 

1824 Arguments: 

1825 source: Source name. 

1826 target: Target name. 

1827 backup: How many backups to keep (default: :attr:`backup`). 

1828 

1829 Raises: 

1830 ClientConnectionError: Socket error. 

1831 ClientOperationError: Another operation is already in progress. 

1832 SieveConnectionError: Server has closed the connection. 

1833 SieveProtocolError: Server violated the ManageSieve protocol. 

1834 """ 

1835 with SpooledTemporaryFile(max_size=self.memory, mode='bw+') as temp: 

1836 temp.write(self.getscript(source).encode('utf8')) 

1837 temp.seek(0) 

1838 self.putscript(temp, target, backup=backup) 

1839 

1840 def deletescript(self, script: str): 

1841 """Delete `script`. 

1842 

1843 Raises: 

1844 ClientConnectionError: Socket error. 

1845 ClientOperationError: Another operation is already in progress. 

1846 SieveConnectionError: Server has closed the connection. 

1847 SieveProtocolError: Server violated the ManageSieve protocol. 

1848 """ 

1849 self._scripts = None 

1850 self.validname(script, check=True) 

1851 self.execute('DELETESCRIPT', script) 

1852 self.logger.info('Removed %s', script) 

1853 

1854 def editscripts(self, command: list[str], scripts: list[str], *args, 

1855 catch: Optional[Callable[[Exception, str], bool]] = None, 

1856 check: bool = True, create: bool = True, 

1857 **kwargs) -> subprocess.CompletedProcess: 

1858 """Download `scripts`, edit them with `command`, and re-upload them. 

1859 

1860 The `scripts` are appended to the `command`, which is then passed 

1861 to :func:`subprocess.run`. Scripts that have been changed are then 

1862 re-uploaded to the server. If the server has closed the connection 

1863 in the meantime, the connection is re-established automatically. 

1864 

1865 If :meth:`putscript` raises an error and `catch` has been given, 

1866 then the error and the name of the offending script are passed to 

1867 `catch`, which should return `True` if the `command` should be 

1868 re-invoked for that script and `False` otherwise. Either way, 

1869 the error will be suppressed. 

1870 

1871 For example: 

1872 

1873 >>> mgr.editscripts(['vi'], ['foo.sieve']) 

1874 

1875 >>> cp = mgr.editscripts(['cmp'], ['a.sieve', 'b.sieve'], check=False) 

1876 >>> if cp.returncode != 0: 

1877 >>> print('a.sieve and b.sieve differ') 

1878 

1879 Arguments: 

1880 command: Command to run. 

1881 scripts: Scripts to edit. 

1882 catch: Error handler. 

1883 check: See :func:`subprocess.run`. 

1884 create: Create scripts that do not exist? 

1885 args: Positional arguments for :func:`subprocess.run`. 

1886 kwargs: Keywords arguments for :func:`subprocess.run`. 

1887 

1888 Raises: 

1889 ClientConnectionError: Socket error. 

1890 ClientOperationError: Another operation is already in progress. 

1891 SieveConnectionError: Server has closed the connection. 

1892 SieveOperationError: At least one script contains a syntax error. 

1893 [#editscripts-catch]_ 

1894 SieveProtocolError: Server violated the ManageSieve protocol. 

1895 ValueError: Script name contains path separator. 

1896 

1897 .. [#editscripts-catch] Only raised if `catch` has *not* been given. 

1898 """ 

1899 for script in scripts: 

1900 self.validname(script, check=True) 

1901 if path.sep in script: 

1902 raise ValueError(f'{script}: Filename contains {path.sep}') 

1903 with TemporaryDirectory() as tmpdir: 

1904 fnames = [] 

1905 ctimes = [] 

1906 for script in scripts: 

1907 fname = path.join(tmpdir, script) 

1908 with open(fname, 'w', encoding='utf8') as file: 

1909 try: 

1910 file.write(self.getscript(script)) 

1911 except SieveOperationError as err: 

1912 if not create: 

1913 raise 

1914 if not err.code: 

1915 if self.scriptexists(script): 

1916 raise 

1917 elif not err.matches('NONEXISTENT'): 

1918 raise 

1919 fnames.append(fname) 

1920 ctimes.append(os.stat(fname).st_ctime) 

1921 while True: 

1922 cp = subprocess.run(command + fnames, *args, 

1923 check=check, **kwargs) 

1924 retry: list[tuple[str, str, float]] = [] 

1925 for script, fname, ctime in zip(scripts, fnames, ctimes): 

1926 if os.stat(fname).st_ctime > ctime: 

1927 with open(fname, 'rb') as file: 

1928 try: 

1929 self._withreopen(self.putscript, file, script) 

1930 except SieveOperationError as err: 

1931 if catch is None: 

1932 raise 

1933 if catch(err, script): 

1934 retry.append((script, fname, ctime)) 

1935 if not retry: 

1936 return cp 

1937 scripts, fnames, ctimes = map(list, zip(*retry)) 

1938 # NOTREACHED 

1939 

1940 def getactive(self) -> Optional[str]: 

1941 """Get the name of the active script. 

1942 

1943 Raises: 

1944 ClientConnectionError: Socket error. 

1945 ClientOperationError: Another operation is already in progress. 

1946 SieveConnectionError: Server has closed the connection. 

1947 SieveProtocolError: Server violated the ManageSieve protocol. 

1948 """ 

1949 for name, active in self.listscripts(): 

1950 if active: 

1951 return name 

1952 return None 

1953 

1954 def getscript(self, script: str) -> str: 

1955 """Download `script`. 

1956 

1957 For example: 

1958 

1959 >>> with open('foo.sieve', 'w', encoding='utf8') as file: 

1960 >>> file.write(mgr.getscript('foo.sieve')) 

1961 

1962 Arguments: 

1963 script: Script name. 

1964 

1965 Raises: 

1966 ClientConnectionError: Socket error. 

1967 ClientOperationError: Another operation is already in progress. 

1968 SieveConnectionError: Server has closed the connection. 

1969 SieveProtocolError: Server violated the ManageSieve protocol. 

1970 """ 

1971 self.validname(script, check=True) 

1972 try: 

1973 # pylint: disable=unbalanced-tuple-unpacking 

1974 _, ((content,),) = self.execute('GETSCRIPT', script) 

1975 except ValueError as err: 

1976 raise SieveProtocolError('Unexpected data') from err 

1977 if isinstance(content, Atom): 

1978 raise SieveProtocolError('Unexpected atom') 

1979 if isinstance(content, str): 

1980 return content 

1981 raise SieveProtocolError('Unexpected ' + type(content).__name__) 

1982 

1983 def havespace(self, script: str, size: int): 

1984 """Check whether there is enough space for `script`. 

1985 

1986 Arguments: 

1987 script: Script name. 

1988 size: Script size in bytes. 

1989 

1990 Raises: 

1991 ClientConnectionError: Socket error. 

1992 ClientOperationError: Another operation is already in progress. 

1993 SieveConnectionError: Server has closed the connection. 

1994 SieveOperationError: There is *not* enough space. 

1995 SieveProtocolError: Server violated the ManageSieve protocol. 

1996 """ 

1997 self.validname(script, check=True) 

1998 self.execute('HAVESPACE', script, size) 

1999 

2000 def listscripts(self, cached: bool = False) -> list[tuple[str, bool]]: 

2001 """List scripts and whether they are the active script. 

2002 

2003 For example: 

2004 

2005 >>> mgr.listscripts() 

2006 [('foo.sieve', False), ('bar.sieve', True)] 

2007 

2008 >>> scripts = [script for script, _ in mgr.listscripts()] 

2009 

2010 Arguments: 

2011 cached: Return cached response? [#cached]_ 

2012 

2013 Returns: 

2014 A list of script name/status tuples. 

2015 

2016 Raises: 

2017 ClientConnectionError: Socket error. 

2018 ClientOperationError: Another operation is already in progress. 

2019 SieveConnectionError: Server has closed the connection. 

2020 SieveProtocolError: Server violated the ManageSieve protocol. 

2021 

2022 .. [#cached] The cache is cleared after :meth:`copyscript`, 

2023 :meth:`deletescript`, :meth:`putscript`, 

2024 :meth:`renamescript`, :meth:`setactive`, and 

2025 :meth:`unsetactive`. 

2026 """ 

2027 if not cached or self._scripts is None: 

2028 self._scripts = [] 

2029 for line in self.execute('LISTSCRIPTS')[1]: 

2030 try: 

2031 name = line[0] 

2032 except IndexError as err: 

2033 raise SieveProtocolError('No data') from err 

2034 if not isinstance(name, str): 

2035 raise SieveProtocolError('Expected string') 

2036 try: 

2037 status = line[1] 

2038 if not isinstance(status, str): 

2039 raise SieveProtocolError('Expected string') 

2040 active = status.casefold() == 'active' 

2041 except IndexError: 

2042 active = False 

2043 self._scripts.append((name, active)) 

2044 return self._scripts 

2045 

2046 def logout(self): 

2047 """Log out. 

2048 

2049 .. note:: 

2050 :meth:`logout` should be called to close the connection 

2051 unless :class:`SieveManager` is used as a context manager. 

2052 

2053 .. warning:: 

2054 Logging out is unsafe after a :exc:`ProtocolError`. 

2055 Use :meth:`shutdown` instead. 

2056 """ 

2057 if self.sock is not None: 

2058 try: 

2059 self.execute('LOGOUT') 

2060 self.logger.info('Logged out') 

2061 except (OperationError, ProtocolError): 

2062 with suppress(OSError): 

2063 self.shutdown() 

2064 except ConnectionError: 

2065 pass 

2066 self.close() 

2067 

2068 def noop(self, tag: Optional[str] = None) -> Optional[str]: 

2069 """Request a no-op. 

2070 

2071 For example: 

2072 

2073 >>> mgr.noop('foo') 

2074 'foo' 

2075 

2076 Arguments: 

2077 tag: String for the server to echo back. 

2078 

2079 Returns: 

2080 Server echo. 

2081 

2082 Raises: 

2083 ClientConnectionError: Socket error. 

2084 ClientOperationError: Another operation is already in progress. 

2085 SieveCapabilityError: "NOOP" not supported. 

2086 SieveConnectionError: Server has closed the connection. 

2087 SieveProtocolError: Server violated the ManageSieve protocol. 

2088 """ 

2089 assert self.capabilities 

2090 if not self.capabilities.version: 

2091 raise SieveCapabilityError('NOOP: not supported') 

2092 args = () if tag is None else (tag,) 

2093 res, _ = self.execute('NOOP', *args) 

2094 try: 

2095 data = res.code[1] 

2096 except IndexError: 

2097 return None 

2098 if isinstance(data, Atom) or not isinstance(data, str): 

2099 raise SieveProtocolError('Expected string') 

2100 return data 

2101 

2102 def putscript(self, source: Union[str, IO], target: str, 

2103 backup: Optional[int] = None): 

2104 """Upload `source` to the server as `target`. 

2105 

2106 The server should reject syntactically invalid scripts and 

2107 may issue a :attr:`warning` for semantically invalid ones. 

2108 Updates should be atomic. 

2109 

2110 For example: 

2111 

2112 >>> mgr.putscript('# empty', 'foo.sieve') 

2113 

2114 >>> with open('foo.sieve', 'br') as file: 

2115 >>> mgr.putscript(file, 'foo.sieve') 

2116 

2117 Arguments: 

2118 source: Script (*not* script name). 

2119 target: Script name. 

2120 backup: How many backups to keep (default: :attr:`backup`). 

2121 

2122 Raises: 

2123 ClientConnectionError: Socket error. 

2124 ClientOperationError: Another operation is already in progress. 

2125 SieveConnectionError: Server has closed the connection. 

2126 SieveOperationError: `Script` contains syntax errors. 

2127 SieveProtocolError: Server violated the ManageSieve protocol. 

2128 

2129 .. important:: 

2130 Sieve scripts must be encoded in UTF-8. 

2131 """ 

2132 self.validname(target, check=True) 

2133 try: 

2134 keep = self.backup if backup is None else backup 

2135 self.backupscript(target, keep=keep) 

2136 except SieveOperationError as err: 

2137 if not err.code: 

2138 for script, _ in self.listscripts(): 

2139 if script == target: 

2140 raise 

2141 elif not err.matches('NONEXISTENT'): 

2142 raise 

2143 self._scripts = None 

2144 self.execute('PUTSCRIPT', target, source) 

2145 self.logger.info('Uploaded %s', target) 

2146 

2147 def renamescript(self, source: str, target: str, emulate: bool = True): 

2148 """Rename `source` to `target`. 

2149 

2150 Some servers do not the support the "RENAMESCRIPT" command. 

2151 On such servers, renaming is emulated by downloading `source`, 

2152 re-uploading it as `target`, marking `target` as the active script 

2153 if `source` is the active script, and then deleting `source`. 

2154 

2155 For example: 

2156 

2157 >>> mgr.renamescript('foo.sieve', 'bar.sieve', emulate=False) 

2158 

2159 Arguments: 

2160 source: Script name. 

2161 target: Script name. 

2162 emulate: Emulate "RENAMESCRIPT" if the server does not support it? 

2163 

2164 Raises: 

2165 SieveCapabilityError: "RENAMESCRIPT" not supported. [#emulate]_ 

2166 SieveOperationError: `source` does not exist or `target` exists. 

2167 

2168 .. [#emulate] Only raised if `emulate` is `False`. 

2169 """ 

2170 assert self.capabilities 

2171 self.validname(source, check=True) 

2172 self.validname(target, check=True) 

2173 if self.capabilities.version: 

2174 self._scripts = None 

2175 self.execute('RENAMESCRIPT', source, target) 

2176 self.logger.info('Renamed %s to %s', source, target) 

2177 elif emulate: 

2178 sourceactive: Optional[bool] = None 

2179 for script, active in self.listscripts(): 

2180 if script == source: 

2181 sourceactive = active 

2182 if script == target: 

2183 raise SieveOperationError( 

2184 code=(Atom('alreadyexists'),), 

2185 message=f'{target}: {os.strerror(EEXIST)}' 

2186 ) 

2187 if sourceactive is None: 

2188 raise SieveOperationError( 

2189 code=(Atom('nonexistent'),), 

2190 message=f'{source}: {os.strerror(ENOENT)}' 

2191 ) 

2192 self.copyscript(source, target, backup=0) 

2193 if sourceactive: 

2194 self.setactive(target) 

2195 self.deletescript(source) 

2196 else: 

2197 raise SieveCapabilityError('RENAMESCRIPT: Not supported') 

2198 

2199 def scriptexists(self, script: str, cached: bool = False) -> bool: 

2200 """Check if `script` exists. 

2201 

2202 Arguments: 

2203 script: Script name. 

2204 cached: Return cached response? [#cached]_ 

2205 

2206 Raises: 

2207 ClientConnectionError: Socket error. 

2208 ClientOperationError: Another operation is already in progress. 

2209 SieveConnectionError: Server has closed the connection. 

2210 SieveProtocolError: Server violated the ManageSieve protocol. 

2211 """ 

2212 self.validname(script, check=True) 

2213 return any(s == script for s, _ in self.listscripts(cached=cached)) 

2214 

2215 def setactive(self, script: str): 

2216 """Mark `script` as the active script. 

2217 

2218 Raises: 

2219 ClientConnectionError: Socket error. 

2220 ClientOperationError: Another operation is already in progress. 

2221 SieveConnectionError: Server has closed the connection. 

2222 SieveProtocolError: Server violated the ManageSieve protocol. 

2223 """ 

2224 self.validname(script, check=True) 

2225 self._scripts = None 

2226 self.execute('SETACTIVE', script) 

2227 self.logger.info('Activated %s', script) 

2228 

2229 def unauthenticate(self): 

2230 """Unauthenticate. 

2231 

2232 Raises: 

2233 ClientConnectionError: Socket error. 

2234 ClientOperationError: Another operation is already in progress. 

2235 SieveCapabilityError: "UNAUTHENTICATE" not supported. 

2236 SieveConnectionError: Server has closed the connection. 

2237 SieveProtocolError: Server violated the ManageSieve protocol. 

2238 """ 

2239 assert self.capabilities 

2240 if not self.capabilities.unauthenticate: 

2241 raise SieveCapabilityError('UNAUTHENTICATE: Not supported') 

2242 self.execute('UNAUTHENTICATE') 

2243 self.login = '' 

2244 self.owner = '' 

2245 self.logger.info('Un-authenticated') 

2246 

2247 def unsetactive(self): 

2248 """Deactivate the active script. 

2249 

2250 Raises: 

2251 ClientConnectionError: Socket error. 

2252 ClientOperationError: Another operation is already in progress. 

2253 SieveConnectionError: Server has closed the connection. 

2254 SieveProtocolError: Server violated the ManageSieve protocol. 

2255 """ 

2256 self._scripts = None 

2257 self.execute('SETACTIVE', '') 

2258 self.logger.info('Deactivated active script') 

2259 

2260 @classmethod 

2261 def validname(cls, script: str, check: bool = False) -> bool: 

2262 """Check whether `script` is a valid script name. 

2263 

2264 Arguments: 

2265 script: Script name 

2266 check: Raise an error if `script` is not a valid script name? 

2267 

2268 Raises: 

2269 ValueError: `script` is *not* valid. [#validname-check]_ 

2270 

2271 .. [#validname-check] Only raised if `check` is `True`. 

2272 """ 

2273 if cls._isname(script): 

2274 return True 

2275 if check: 

2276 raise ValueError(escapectrl(script) + ': Bad name') 

2277 return False 

2278 

2279 backup: int = 0 

2280 """How many backups to keep.""" 

2281 

2282 _isname: Callable[..., Optional[re.Match]] = re.compile( 

2283 '[^\u0000-\u001f\u0080-\u009f\u2028\u2029]+' 

2284 ).fullmatch 

2285 """Check whether a string is a valid script name. 

2286 

2287 .. seealso:: 

2288 :rfc:`5198` (sec. 2) 

2289 Definition of unicode format for network interchange. 

2290 :rfc:`5804` (sec. 1.6) 

2291 ManageSieve script names. 

2292 """ 

2293 

2294 _scripts: Optional[list[tuple[str, bool]]] = None 

2295 """Scripts returned by the last :meth:`listscripts`.""" 

2296 

2297 

2298class SieveSASLAdapter(BaseSASLAdapter): 

2299 """Adapter to send SASL messages over a :class:`SieveConn`.""" 

2300 

2301 def __init__(self, connection: 'SieveConn'): 

2302 """Initialize the adapter.""" 

2303 self.conn = connection 

2304 

2305 def abort(self): 

2306 self.send(b'*') 

2307 self.end() 

2308 

2309 def begin(self, name: str, data: Optional[bytes] = None): 

2310 assert self.conn 

2311 args: list[Any] = [Atom('AUTHENTICATE'), name.upper()] 

2312 if data is not None: 

2313 args.append(b64encode(data)) 

2314 self.conn.sendline(*args) # type: ignore[arg-type] 

2315 

2316 def end(self): 

2317 assert self.conn 

2318 try: 

2319 res = Response.fromline(self.conn.receiveline()) 

2320 except ValueError as err: 

2321 raise SieveProtocolError(str(err)) from err 

2322 if res.response != 'OK': 

2323 raise res.toerror() 

2324 

2325 def send(self, data: bytes): 

2326 assert self.conn 

2327 conn = self.conn 

2328 conn.sendline(b64encode(data)) # type: ignore[arg-type] 

2329 

2330 def receive(self) -> bytes: 

2331 assert self.conn 

2332 try: 

2333 line = self.conn.receiveline() 

2334 except ValueError as err: 

2335 raise SieveProtocolError(str(err)) from err 

2336 if isinstance(line[0], Atom): 

2337 res = Response.fromline(line) 

2338 raise res.toerror() 

2339 try: 

2340 word, = line 

2341 except ValueError as err: 

2342 raise SieveProtocolError('Unexpected data') from err 

2343 if isinstance(word, Atom) or not isinstance(word, str): 

2344 raise SieveProtocolError('Expected string') 

2345 return b64decode(word) 

2346 

2347 @property 

2348 def sock(self) -> Union[socket.SocketType, ssl.SSLSocket]: 

2349 assert self.conn 

2350 assert self.conn.sock 

2351 return self.conn.sock 

2352 

2353 @sock.setter 

2354 def sock(self, sock: Union[socket.SocketType, ssl.SSLSocket]): 

2355 assert self.conn 

2356 self.conn.sock = sock 

2357 

2358 conn: Optional[SieveConn] = None 

2359 """Underlying connection.""" 

2360 

2361 

2362CapabilitiesT = TypeVar('CapabilitiesT', bound='Capabilities') 

2363""":class:`Capabilities` type variable.""" 

2364 

2365 

2366@dataclass 

2367class Capabilities(): 

2368 """Server capabilities.""" 

2369 

2370 @classmethod 

2371 def fromlines(cls: type[CapabilitiesT], 

2372 lines: Iterable[Line]) -> CapabilitiesT: 

2373 """Create a :class:`Capabilities` object from a server response.""" 

2374 def getvalue(words: Line) -> str: 

2375 try: 

2376 value, = words[1:] 

2377 except ValueError as err: 

2378 raise SieveProtocolError('Expected word') from err 

2379 if not isinstance(value, str): 

2380 raise SieveProtocolError('Expected string') 

2381 return value 

2382 

2383 obj = cls() 

2384 for words in lines: 

2385 try: 

2386 key = words[0].casefold() # type: ignore[union-attr] 

2387 except IndexError as err: 

2388 raise SieveProtocolError('Expected word') from err 

2389 except AttributeError as err: 

2390 raise SieveProtocolError('Expected string') from err 

2391 if key in ('implementation', 'language', 'owner', 'version'): 

2392 setattr(obj, key, getvalue(words)) 

2393 elif key in ('notify', 'sasl', 'sieve'): 

2394 setattr(obj, key, tuple(getvalue(words).casefold().split())) 

2395 elif key == 'maxredirects': 

2396 setattr(obj, key, int(getvalue(words))) 

2397 elif key in ('starttls', 'unauthenticate'): 

2398 setattr(obj, key, True) 

2399 else: 

2400 try: 

2401 obj.notunderstood[key] = words[1] 

2402 except IndexError: 

2403 obj.notunderstood[key] = True 

2404 return obj 

2405 

2406 implementation: Optional[str] = None 

2407 """Server application (e.g. "Dovecot Pigeonhole").""" 

2408 

2409 sieve: tuple[str, ...] = () 

2410 """Supported Sieve modules.""" 

2411 

2412 language: Optional[str] = None 

2413 """Natural language used for messages (:rfc:`5646` tag).""" 

2414 

2415 maxredirects: Optional[int] = None 

2416 """Maximum redirects per operation.""" 

2417 

2418 notify: tuple[str, ...] = () 

2419 """URI schema parts for supported notification methods.""" 

2420 

2421 owner: str = '' 

2422 """Canonical name of the user whose scripts are managed.""" 

2423 

2424 sasl: tuple[str, ...] = () 

2425 """Supported authentication methods.""" 

2426 

2427 starttls: bool = False 

2428 """Is "STARTTLS" available?""" 

2429 

2430 unauthenticate: bool = False 

2431 """Is "UNAUTHENTICATE" available?""" 

2432 

2433 version: Optional[str] = None 

2434 """ManageSieve protocol version.""" 

2435 

2436 notunderstood: dict = dataclasses.field(default_factory=dict) 

2437 """Capabilities not understood by SieveManager.""" 

2438 

2439 

2440ResponseT = TypeVar('ResponseT', bound='Response') 

2441"""Type variable for :class:`Response`.""" 

2442 

2443 

2444@dataclass(frozen=True) 

2445class Response(): 

2446 """Server response to a command. 

2447 

2448 .. seealso:: 

2449 :rfc:`5804` (secs. 1.2, 1.3, 4, 6.4, and passim) 

2450 ManageSieve responses 

2451 """ 

2452 

2453 @classmethod 

2454 def fromline(cls: type[ResponseT], line: Line) -> ResponseT: 

2455 """Create a :class:`Response` object from a :class:`Line`.""" 

2456 # pylint: disable=redefined-outer-name 

2457 code = cls.code 

2458 message = cls.message 

2459 response = None 

2460 for i, word in enumerate(line): 

2461 if isinstance(word, Atom): 

2462 if i == 0: 

2463 response = word 

2464 continue 

2465 elif isinstance(word, str): 

2466 if 1 <= i <= 2: 

2467 message = word 

2468 continue 

2469 elif isinstance(word, Sequence): 

2470 if i == 1: 

2471 code = tuple(word) 

2472 continue 

2473 raise SieveProtocolError('Malformed response') 

2474 if response is None: 

2475 raise SieveProtocolError('Expected atom') 

2476 return cls(response=response, code=code, message=message) 

2477 

2478 def __str__(self) -> str: 

2479 """:attr:`message` or, if no message was returned, a stub message.""" 

2480 return self.message if self.message else f'server says {self.response}' 

2481 

2482 def matches(self, *categories: str) -> bool: 

2483 """Check if :attr:`code` matches any of the given `categories`. 

2484 

2485 Returns `False` if :attr:`code` is empty. 

2486 Matching is case-insensitive. 

2487 

2488 For example: 

2489 

2490 >>> with open('script.sieve') as script: 

2491 >>> try: 

2492 >>> mgr.putscript(script, script.name) 

2493 >>> except SieveOperationError as err: 

2494 >>> if err.matches('QUOTA'): 

2495 >>> print('over quota') 

2496 

2497 Print more informative messages: 

2498 

2499 >>> with open('script.sieve') as script: 

2500 >>> try: 

2501 >>> mgr.putscript(script, script.name) 

2502 >>> except SieveOperationError as err: 

2503 >>> if err.matches('QUOTA/MAXSCRIPTS'): 

2504 >>> print('too many scripts') 

2505 >>> elif err.matches('QUOTA/MAXSIZE'): 

2506 >>> print(f'{script.name} is too large') 

2507 >>> elif err.matches('QUOTA'): 

2508 >>> print('over quota') 

2509 """ 

2510 try: 

2511 rescode = self.code[0] 

2512 except IndexError: 

2513 return False 

2514 assert isinstance(rescode, str) 

2515 for cat in categories: 

2516 pattern = re.escape(cat.removesuffix('/')) + r'(/|$)' 

2517 if re.match(pattern, rescode, flags=re.IGNORECASE): 

2518 return True 

2519 return False 

2520 

2521 def toerror(self) -> 'SieveError': 

2522 """Convert a :class:`Response` into an error.""" 

2523 cls = (SieveConnectionError if self.response == 'BYE' else 

2524 SieveOperationError) 

2525 return cls(self.response, self.code, self.message) 

2526 

2527 response: Atom 

2528 """'OK', 'NO', or 'BYE'. 

2529 

2530 ======== =========================== 

2531 Response Meaning 

2532 ======== =========================== 

2533 'OK' Success 

2534 'NO' Failure 

2535 'BYE' Connection closed by server 

2536 ======== =========================== 

2537 """ 

2538 

2539 code: tuple[Word, ...] = () 

2540 """Response code. 

2541 

2542 ManageSieve response codes are lists of categories, separated by 

2543 slashes ("/"), where each category is the super-category of the 

2544 next (e.g., "quota/maxsize"). 

2545 

2546 Some response codes carry data (e.g., ``TAG "SYNC-123"``). 

2547 

2548 See :rfc:`5804` (sec. 1.3) for a list of response codes. 

2549 

2550 .. warning:: 

2551 Servers need *not* return response codes. 

2552 """ 

2553 

2554 message: Optional[str] = None 

2555 """Human-readable message. 

2556 

2557 .. warning:: 

2558 Servers need *not* return a message. 

2559 """ 

2560 

2561 

2562@dataclass(frozen=True) 

2563class SRV(): 

2564 """DNS SRV record. 

2565 

2566 .. seealso:: 

2567 :rfc:`2782` 

2568 DNS SRV 

2569 """ 

2570 

2571 priority: int 

2572 weight: int 

2573 host: str 

2574 port: int 

2575 

2576 

2577URLT = TypeVar('URLT', bound='URL') 

2578"""Type variable for :class:`URL`.""" 

2579 

2580 

2581@dataclass(frozen=True) 

2582class URL(): 

2583 """Sieve URL. 

2584 

2585 .. seealso:: 

2586 :rfc:`5804` (sec. 3) 

2587 Sieve URL Scheme 

2588 """ 

2589 

2590 @classmethod 

2591 def fromstr(cls: type[URLT], url: str) -> URLT: 

2592 """Create a :class:`URL` object from a URL string. 

2593 

2594 For example: 

2595 

2596 >>> URL.fromstr('sieve://user@imap.foo.example') 

2597 URL(hostname='imap.foo.example', scheme='sieve', 

2598 username='user', password=None, port=None, 

2599 owner=None, scriptname=None) 

2600 

2601 Raises: 

2602 ValueError: Not a valid Sieve URL. 

2603 """ 

2604 if not re.match(r'([a-z][a-z0-9+.-]*:)?//', url): 

2605 url = 'sieve://' + url 

2606 parts = urllib.parse.urlsplit(url) 

2607 if parts.query or parts.fragment: 

2608 raise ValueError(f'{url}: Not a Sieve URL') 

2609 if not parts.hostname: 

2610 raise ValueError(f'{url}: No host') 

2611 if not (isinetaddr(parts.hostname) or ishostname(parts.hostname)): 

2612 raise ValueError(f'{parts.hostname}: Neither address nor hostname') 

2613 try: 

2614 owner, scriptname = parts.path.split('/', maxsplit=2)[1:] 

2615 except ValueError: 

2616 owner, scriptname = parts.path[1:], None 

2617 return cls( 

2618 scheme=parts.scheme, 

2619 username=parts.username, 

2620 password=parts.password, 

2621 hostname=parts.hostname, 

2622 port=parts.port, 

2623 owner=owner if owner else None, 

2624 scriptname=scriptname if scriptname else None 

2625 ) 

2626 

2627 def __str__(self): 

2628 """Get a string representation of the URL.""" 

2629 url = '' 

2630 if self.scheme: 

2631 url += f'{self.scheme}://' 

2632 else: 

2633 url += 'sieve://' 

2634 if self.username: 

2635 url += self.username 

2636 if self.password is not None: 

2637 url += f':{self.password}' 

2638 url += '@' 

2639 if self.hostname: 

2640 url += self.hostname 

2641 else: 

2642 url += 'localhost' 

2643 if self.port is not None: 

2644 url += f':{self.port}' 

2645 if self.owner: 

2646 url += f'/{self.owner}' 

2647 if self.scriptname: 

2648 url += f'/{self.scriptname}' 

2649 return url 

2650 

2651 hostname: str 

2652 scheme: str = 'sieve' 

2653 username: Optional[str] = None 

2654 password: Optional[str] = None 

2655 port: Optional[int] = None 

2656 owner: Optional[str] = None 

2657 scriptname: Optional[str] = None 

2658 

2659 

2660# 

2661# Authentication 

2662# 

2663 

2664class BasePwdAuth(BaseAuth, ABC): 

2665 """Base class for password-based authentication mechanisms. 

2666 

2667 Prepares credentials, so that subclasses need only 

2668 implement :meth:`exchange`. For example: 

2669 

2670 .. literalinclude:: ../sievemgr.py 

2671 :pyobject: PlainAuth 

2672 """ 

2673 

2674 def __init__(self, connection: BaseSASLAdapter, 

2675 authcid: str, password: str, authzid: str = '', 

2676 prepare: SASLPrep = SASLPrep.ALL): 

2677 """Prepare authentication. 

2678 

2679 `authcid`, `password`, and `authzid` are prepared according to 

2680 :rfc:`3454` and :rfc:`4013` if ``prepare & SASLPrep.USERNAMES`` 

2681 and/or ``prepare & SASLPrep.PASSWORDS`` respectively 

2682 evaluate to true. 

2683 

2684 Arguments: 

2685 conn: Connection over which to authenticate. 

2686 authcid: Authentication ID (user to login as). 

2687 password: Password. 

2688 authzid: Authorization ID (user whose rights to acquire). 

2689 prepare: Which credentials to prepare. 

2690 

2691 Raises: 

2692 ValueError: Bad characters in username or password. 

2693 """ 

2694 super().__init__(connection, authcid, authzid, prepare) 

2695 prepare &= SASLPrep.PASSWORDS # type: ignore[assignment] 

2696 self.password = self.prepare(password) if prepare else password 

2697 

2698 password: str 

2699 """Password.""" 

2700 

2701 

2702class BaseScramAuth(BasePwdAuth, ABC): 

2703 """Base class for SCRAM authentication mechanisms. 

2704 

2705 Implements :meth:`exchange`, so that subclasses need only define a digest. 

2706 For example: 

2707 

2708 .. literalinclude:: ../sievemgr.py 

2709 :pyobject: ScramSHA1Auth 

2710 

2711 .. seealso:: 

2712 :rfc:`5802` 

2713 Salted Challenge Response Authentication Mechanism (SCRAM). 

2714 :rfc:`7677` 

2715 SCRAM-SHA-256 and SCRAM-SHA-256-PLUS. 

2716 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-bis 

2717 Updated recommendations for implementing SCRAM. 

2718 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha-512 

2719 SCRAM-SHA-512 and SCRAM-SHA-512-PLUS. 

2720 https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha3-512 

2721 SCRAM-SHA3-512 and SCRAM-SHA3-512-PLUS. 

2722 https://csb.stevekerrison.com/post/2022-01-channel-binding 

2723 Discussion of TLS channel binding. 

2724 https://csb.stevekerrison.com/post/2022-05-scram-detail 

2725 Discussion of SCRAM. 

2726 """ 

2727 

2728 def exchange(self): 

2729 # Compare to 

2730 # * https://github.com/stevekerrison/auth-examples 

2731 # * https://github.com/horazont/aiosasl 

2732 

2733 def todict(msg: bytes) -> dict[bytes, bytes]: 

2734 return dict([a.split(b'=', maxsplit=1) for a in msg.split(b',')]) 

2735 

2736 def escape(b: bytes) -> bytes: 

2737 return b.replace(b'=', b'=3D').replace(b',', b'=2C') 

2738 

2739 # Parameters 

2740 authcid = self.authcid.encode('utf8') 

2741 authzid = self.authzid.encode('utf8') 

2742 password = self.password.encode('utf8') 

2743 chan_bind_type = self.cbtype.encode('utf8') 

2744 chan_bind_data = self.cbdata 

2745 c_nonce_len = self.noncelen 

2746 digest = self.digest 

2747 

2748 # Send client-first message 

2749 chan_bind_attr = b'p=%s' % chan_bind_type if chan_bind_type else b'n' 

2750 c_first_prefix = chan_bind_attr + b',' + escape(authzid) 

2751 c_nonce = b64encode(secrets.token_bytes(c_nonce_len)) 

2752 c_first_bare = b'n=%s,r=%s' % (escape(authcid), c_nonce) 

2753 c_first = c_first_prefix + b',' + c_first_bare 

2754 self.send(c_first) 

2755 

2756 # Receive server-first message 

2757 try: 

2758 s_first = self.receive() 

2759 except SieveOperationError as err: 

2760 # pylint: disable=bad-exception-cause (???) 

2761 raise SASLCapabilityError(f'{self.name}: {err}') from err 

2762 s_first_dict = todict(s_first) 

2763 iters = int(s_first_dict[b'i']) 

2764 s_nonce = s_first_dict[b'r'] 

2765 salt = b64decode(s_first_dict[b's']) 

2766 

2767 # Send client-final message 

2768 salted_pwd = hashlib.pbkdf2_hmac(digest, password, salt, iters) 

2769 c_key = hmac.digest(salted_pwd, b'Client Key', digest) 

2770 stored_key = hashlib.new(digest, c_key).digest() 

2771 chan_bind = b64encode(c_first_prefix + b',' + chan_bind_data) 

2772 c_final_prefix = b'c=%s,r=%s' % (chan_bind, s_nonce) 

2773 auth_message = b','.join((c_first_bare, s_first, c_final_prefix)) 

2774 c_signature = hmac.digest(stored_key, auth_message, digest) 

2775 c_proof = b64encode(bytes(a ^ b for a, b in zip(c_key, c_signature))) 

2776 c_final = c_final_prefix + b',p=%s' % c_proof 

2777 self.send(c_final) 

2778 

2779 # Receive server-final message 

2780 s_final = self.receive() 

2781 s_key = hmac.digest(salted_pwd, b'Server Key', digest) 

2782 s_signature = hmac.digest(s_key, auth_message, digest) 

2783 s_final_dict = todict(s_final) 

2784 if s_signature != b64decode(s_final_dict[b'v']): 

2785 host = self.sock.getpeername()[0] 

2786 raise SASLSecurityError(f'{host}: Verification failed') 

2787 

2788 @property 

2789 @abstractmethod 

2790 def digest(self) -> str: 

2791 """Digest name as used by :mod:`hashlib` and :mod:`hmac`.""" 

2792 

2793 cbtype: str = '' 

2794 """TLS channel-binding type.""" 

2795 

2796 cbdata: bytes = b'' 

2797 """TLS channel-binding data.""" 

2798 

2799 noncelen: int = 18 

2800 """Client nonce length in bytes.""" 

2801 

2802 

2803class BaseScramPlusAuth(BaseScramAuth, ABC): 

2804 """Base class for SCRAM mechanisms with channel binding. 

2805 

2806 For example: 

2807 

2808 .. literalinclude:: ../sievemgr.py 

2809 :pyobject: ScramSHA1PlusAuth 

2810 """ 

2811 

2812 # Channel-binding is, for the most part, implemented in BaseScramAuth. 

2813 def __init__(self, *args, **kwargs): 

2814 super().__init__(*args, **kwargs) 

2815 if not isinstance(self.sock, ssl.SSLSocket): 

2816 raise SASLProtocolError('Non-TLS channel cannot be bound') 

2817 for cbtype in ('tls-exporter', 'tls-unique', 'tls-server-endpoint'): 

2818 if cbtype in ssl.CHANNEL_BINDING_TYPES: 

2819 if cbdata := self.sock.get_channel_binding(cbtype): 

2820 self.cbtype = cbtype 

2821 self.cbdata = cbdata 

2822 break 

2823 else: 

2824 raise TLSCapabilityError('No supported channel-binding type') 

2825 

2826 

2827class AuthzUnsupportedMixin(): 

2828 """Mixin for SASL mechanisms that do not support authorization. 

2829 

2830 For example: 

2831 

2832 .. literalinclude: ../sievemgr.py 

2833 :pyobject: LoginAuth 

2834 """ 

2835 

2836 def __init__(self, *args, **kwargs): 

2837 """Prepare authentication. 

2838 

2839 Raises: 

2840 SASLCapabilityError: :attr:`authzid` is set.""" 

2841 assert isinstance(self, BaseAuth) 

2842 super().__init__(*args, **kwargs) 

2843 if self.authzid: 

2844 raise SASLCapabilityError(f'{self.name}: No authorization') 

2845 

2846 

2847class CramMD5Auth(AuthzUnsupportedMixin, BasePwdAuth): 

2848 """CRAM-MD5 authentication. 

2849 

2850 .. seealso:: 

2851 :rfc:`2195` (sec. 2) 

2852 Definition of CRAM-MD5. 

2853 """ 

2854 

2855 def exchange(self): 

2856 challenge = self.receive() 

2857 password = self.password.encode('utf8') 

2858 digest = hmac.new(password, challenge, hashlib.md5) 

2859 data = ' '.join((self.authcid, digest.hexdigest())) 

2860 self.send(data.encode('utf8')) 

2861 

2862 obsolete = True 

2863 name = 'CRAM-MD5' 

2864 

2865 

2866class ExternalAuth(BaseAuth): 

2867 """EXTERNAL authentication. 

2868 

2869 .. seealso:: 

2870 :rfc:`4422` (App. A) 

2871 Definition of the EXTERNAL mechanism. 

2872 """ 

2873 

2874 def __call__(self): 

2875 """Authenticate.""" 

2876 args = (self.authzid.encode('utf8'),) if self.authzid else () 

2877 self.begin(*args) 

2878 self.receive() 

2879 self.send(b'') 

2880 self.end() 

2881 

2882 def exchange(self): 

2883 """No-op.""" 

2884 

2885 name = 'EXTERNAL' 

2886 

2887 

2888class LoginAuth(AuthzUnsupportedMixin, BasePwdAuth): 

2889 """LOGIN authentication. 

2890 

2891 .. seealso:: 

2892 https://datatracker.ietf.org/doc/draft-murchison-sasl-login 

2893 Definition of the LOGIN mechanism. 

2894 """ 

2895 

2896 def __init__(self, *args, **kwargs): 

2897 """Prepare authentication. 

2898 

2899 Arguments: 

2900 conn: Connection over which to authenticate. 

2901 authcid: Authentication ID (user to login as). 

2902 password: Password. 

2903 authzid: Authorization ID (user whose rights to acquire). 

2904 prepare: Which credentials to prepare. 

2905 

2906 Raises: 

2907 ValueError: Password contains CR, LF, or NUL. 

2908 """ 

2909 super().__init__(*args, **kwargs) 

2910 if {self.password} & {'\r', '\n', '\0'}: 

2911 raise ValueError('Password contains CR, LF, or NUL') 

2912 

2913 def exchange(self): 

2914 self.receive() 

2915 self.send(self.authcid.encode('utf8')) 

2916 self.receive() 

2917 self.send(self.password.encode('utf8')) 

2918 

2919 obsolete = True 

2920 name = 'LOGIN' 

2921 

2922 

2923class PlainAuth(BasePwdAuth): 

2924 """PLAIN authentication. 

2925 

2926 .. seealso:: 

2927 :rfc:`4616` 

2928 PLAIN authentication mechanism. 

2929 """ 

2930 

2931 def exchange(self): 

2932 data = '\0'.join((self.authzid, self.authcid, self.password)) 

2933 self.send(data.encode('utf8')) 

2934 

2935 name = 'PLAIN' 

2936 

2937 

2938class ScramSHA1Auth(BaseScramAuth): 

2939 """SCRAM-SHA-1 authentication.""" 

2940 

2941 @property 

2942 def digest(self) -> str: 

2943 return 'sha1' 

2944 

2945 name = 'SCRAM-SHA-1' 

2946 order = -10 

2947 

2948 

2949class ScramSHA1PlusAuth(BaseScramPlusAuth, ScramSHA1Auth): 

2950 """SCRAM-SHA-1-PLUS authentication.""" 

2951 name = 'SCRAM-SHA-1-PLUS' 

2952 order = -1000 

2953 

2954 

2955class ScramSHA224Auth(BaseScramAuth): 

2956 """SCRAM-SHA-224 authentication.""" 

2957 

2958 @property 

2959 def digest(self) -> str: 

2960 return 'sha224' 

2961 

2962 name = 'SCRAM-SHA-224' 

2963 order = -20 

2964 

2965 

2966class ScramSHA224PlusAuth(BaseScramPlusAuth, ScramSHA224Auth): 

2967 """SCRAM-SHA-224-PLUS authentication.""" 

2968 name = 'SCRAM-SHA-224-PLUS' 

2969 order = -2000 

2970 

2971 

2972class ScramSHA256Auth(BaseScramAuth): 

2973 """SCRAM-SHA-256 authentication.""" 

2974 

2975 @property 

2976 def digest(self) -> str: 

2977 return 'sha256' 

2978 

2979 name = 'SCRAM-SHA-256' 

2980 order = -30 

2981 

2982 

2983class ScramSHA256PlusAuth(BaseScramPlusAuth, ScramSHA256Auth): 

2984 """SCRAM-SHA-256-PLUS authentication.""" 

2985 name = 'SCRAM-SHA-256-PLUS' 

2986 order = -3000 

2987 

2988 

2989class ScramSHA384Auth(BaseScramAuth): 

2990 """SCRAM-SHA-384 authentication.""" 

2991 

2992 @property 

2993 def digest(self) -> str: 

2994 return 'sha384' 

2995 

2996 name = 'SCRAM-SHA-384' 

2997 order = -40 

2998 

2999 

3000class ScramSHA384PlusAuth(BaseScramPlusAuth, ScramSHA384Auth): 

3001 """SCRAM-SHA-384-PLUS authentication.""" 

3002 name = 'SCRAM-SHA-384-PLUS' 

3003 order = -4000 

3004 

3005 

3006class ScramSHA512Auth(BaseScramAuth): 

3007 """SCRAM-SHA-512 authentication.""" 

3008 

3009 @property 

3010 def digest(self): 

3011 return 'sha512' 

3012 

3013 name = 'SCRAM-SHA-512' 

3014 order = -50 

3015 

3016 

3017class ScramSHA512PlusAuth(BaseScramPlusAuth, ScramSHA512Auth): 

3018 """SCRAM-SHA-512-PLUS authentication.""" 

3019 name = 'SCRAM-SHA-512-PLUS' 

3020 order = -5000 

3021 

3022 

3023# pylint: disable=invalid-name 

3024class ScramSHA3_512Auth(BaseScramAuth): 

3025 """SCRAM-SHA-512 authentication.""" 

3026 

3027 @property 

3028 def digest(self): 

3029 return 'sha3_512' 

3030 

3031 name = 'SCRAM-SHA3-512' 

3032 order = -60 

3033 

3034 

3035# pylint: disable=invalid-name 

3036class ScramSHA3_512PlusAuth(BaseScramPlusAuth, ScramSHA3_512Auth): 

3037 """SCRAM-SHA-512-PLUS authentication.""" 

3038 name = 'SCRAM-SHA3-512-PLUS' 

3039 order = -6000 

3040 

3041 

3042# 

3043# SieveManager Shell 

3044# 

3045 

3046class BaseShell(): 

3047 """Base class for interactive shells. 

3048 

3049 :class:`BaseShell` is similar-ish to :class:`cmd.Cmd`. However, lines read 

3050 from standard input are :meth:`expanded <expand>` before being passed to 

3051 methods that implement commands. :class:`BaseShell` also provides an 

3052 extensible :meth:`completion system <complete>` as well as a built-in 

3053 :meth:`help system <do_help>`. :attr:`aliases` can be extended, too. 

3054 

3055 Define a :samp:`do_{command}` method to add `command`. 

3056 

3057 For example: 

3058 

3059 >>> class Calculator(BaseShell): 

3060 >>> def do_add(self, n, m): 

3061 >>> \"\"\"add n m - add n and m\"\"\" 

3062 >>> print(n + m) 

3063 >>> 

3064 >>> def do_sum(self, *numbers): 

3065 >>> \"\"\"sum [n ...] - \"\"\" 

3066 >>> print sum(numbers) 

3067 >>> 

3068 >>> aliases = { 

3069 >>> '+': 'sum' 

3070 >>> } 

3071 >>> 

3072 >>> calc = Calculator() 

3073 >>> calc.executeline('add 0 1') 

3074 1 

3075 >>> calc.executeline('sum 0 1 2') 

3076 3 

3077 >>> calc.executeline('+ 0 1 2 3') 

3078 6 

3079 >>> calc.executeline('add 0') 

3080 usage: add n m 

3081 >>> calc.executeline('help add') 

3082 add n m - add n and m 

3083 """ 

3084 

3085 # Shell behaviour 

3086 def __init__(self): 

3087 """Initialize a :class:`BaseShell` object.""" 

3088 self.commands = tuple(self.getcommands()) 

3089 

3090 @staticmethod 

3091 def columnize(words: Sequence[str], file: TextIO = sys.stdout, 

3092 width: int = shutil.get_terminal_size().columns): 

3093 """Print `words` in columns to `file`. 

3094 

3095 Arguments: 

3096 words: Words. 

3097 file: Output file. 

3098 width: Terminal width in chars. 

3099 """ 

3100 if (nwords := len(words)) > 0: 

3101 colwidth = max(map(len, words)) + 1 

3102 maxncols = max((width + 1) // colwidth, 1) 

3103 nrows = math.ceil(nwords / maxncols) 

3104 rows: tuple[list[str], ...] = tuple([] for _ in range(nrows)) 

3105 for i, word in enumerate(words): 

3106 rows[i % nrows].append(word) 

3107 for row in rows: 

3108 line = ''.join([col.ljust(colwidth) for col in row]) 

3109 print(line.rstrip(), file=file) 

3110 

3111 def complete(self, text: str, n: int) -> Optional[str]: 

3112 """Completion function for :func:`readline.set_completer`. 

3113 

3114 Tab-completion for command names (e.g., "exit") is built in. To 

3115 enable tab-completion for the arguments of some `command`, define 

3116 a method :samp:`complete_{command}`, which takes the index of the 

3117 argument that should be completed and the given `text` and returns 

3118 a sequence of :class:`str`-:class:`bool` pairs, where the string 

3119 is a completion and the boolean indicates whether a space should 

3120 be appended to that completion. 

3121 

3122 For example: 

3123 

3124 >>> class Foo(BaseShell) 

3125 >>> def do_cmd(self, arg1, arg2): 

3126 >>> pass 

3127 >>> 

3128 >>> def complete_cmd(self, argidx, text): 

3129 >>> if argidx == 1: 

3130 >>> return [(s, True) for s in ('foo', 'bar', 'baz')] 

3131 >>> if argidx == 2: 

3132 >>> return [('quux/', False)] 

3133 >>> return [] 

3134 >>> 

3135 >>> foo = Foo() 

3136 >>> foo.complete('', 0) 

3137 'cmd ' 

3138 >>> foo.complete('', 1) 

3139 None 

3140 >>> foo.complete('cmd ', 0) 

3141 'foo ' 

3142 >>> foo.complete('cmd ', 1) 

3143 'bar ' 

3144 >>> foo.complete('cmd ', 2) 

3145 'baz ' 

3146 >>> foo.complete('cmd ', 3) 

3147 None 

3148 >>> foo.complete('cmd b', 0) 

3149 'bar ' 

3150 >>> foo.complete('cmd b', 1) 

3151 'baz ' 

3152 >>> foo.complete('cmd b', 2) 

3153 None 

3154 >>> foo.complete('cmd bar ', 0) 

3155 'quux/' 

3156 >>> foo.complete('cmd bar ', 1) 

3157 None 

3158 

3159 Arguments: 

3160 text: Possibly partial word to be completed. 

3161 n: Index of the completion to return. 

3162 

3163 Returns: 

3164 Either the `n`-th completion for `text` or 

3165 ``None`` if there is no such completion. 

3166 

3167 .. admonition:: Side-effects 

3168 

3169 * Logging of messages with a priority lower than 

3170 :data:`logging.ERROR` is suppressed during tab-completion. 

3171 * A BEL is printed to the controlling terminal if 

3172 no completion for `text` is found. 

3173 """ 

3174 if n == 0: 

3175 logger = self.logger 

3176 loglevel = logger.level 

3177 if loglevel < logging.ERROR: 

3178 logger.setLevel(logging.ERROR) 

3179 try: 

3180 if args := self._getargs(): 

3181 command = self.aliases.get(args[0], args[0]) 

3182 try: 

3183 complete = getattr(self, f'complete_{command}') 

3184 except AttributeError: 

3185 complete = None 

3186 self._completions = [] 

3187 self._completions = sorted( 

3188 shlex.quote(word) + (' ' if space else '') 

3189 for word, space in complete(len(args), text) 

3190 if fnmatch.fnmatchcase(word, text + '*') 

3191 ) if complete else [] 

3192 else: 

3193 self._completions = sorted(c + ' ' for c in self.commands 

3194 if c.startswith(text)) 

3195 if text and not self._completions: 

3196 bell() 

3197 finally: 

3198 logger.setLevel(loglevel) 

3199 try: 

3200 return self._completions[n] 

3201 except IndexError: 

3202 return None 

3203 

3204 @staticmethod 

3205 def confirm(prompt: str, default: ConfirmEnum = ConfirmEnum.NO, 

3206 multi: bool = False, attempts: int = 3) -> ConfirmEnum: 

3207 """Prompt the user for confirmation. 

3208 

3209 Arguments: 

3210 prompt: Prompt. 

3211 default: Default. 

3212 multi: Give choices "all" and "none"? 

3213 attempts: How often to try before raising a :exc:`ValueError`. 

3214 

3215 Raises: 

3216 ValueError: Unrecognized answer. 

3217 

3218 .. note:: 

3219 Prompts are printed to/answers are read from :file:`/dev/tty`, 

3220 regardless of I/O redirection. 

3221 """ 

3222 assert attempts > 0 

3223 with TermIO() as tty: 

3224 for _ in range(attempts): 

3225 tty.write(prompt + ' ') 

3226 answer = tty.readline().strip().casefold() 

3227 if answer == '': 

3228 return default 

3229 if answer in ('y', 'yes'): 

3230 return ConfirmEnum.YES 

3231 if answer in ('n', 'no'): 

3232 return ConfirmEnum.NO 

3233 if multi: 

3234 if answer == 'all': 

3235 return ConfirmEnum.ALL 

3236 if answer == 'none': 

3237 return ConfirmEnum.NONE 

3238 tty.write('Enter "yes", "no", "all", or "none"\n') 

3239 else: 

3240 tty.write('Enter "yes" or "no"\n') 

3241 raise ValueError('Too many retries') 

3242 # NOTREACHED 

3243 

3244 def enter(self) -> int: 

3245 """Start reading commands from standard input. 

3246 

3247 Reading stops at the end of the file or when 

3248 a command raises :exc:`StopIteration`. 

3249 

3250 .. admonition:: Side-effects 

3251 

3252 Entering the shell binds :kbd:`Tab` to :func:`readline.complete`. 

3253 """ 

3254 hasatty = self.hasatty() 

3255 info = self.logger.info 

3256 if hasatty: 

3257 oldcompleter = readline.get_completer() 

3258 olddelims = readline.get_completer_delims() 

3259 readline.set_auto_history(True) 

3260 readline.set_completer(self.complete) 

3261 readline.set_completer_delims(' ') 

3262 readline.parse_and_bind('tab: complete') 

3263 if readline.__doc__ and 'libedit' in readline.__doc__: 

3264 readline.parse_and_bind('python:bind ^I rl_complete') 

3265 info('Enter "? [command]" for help and "exit" to exit') 

3266 self.retval = 0 

3267 try: 

3268 while True: 

3269 try: 

3270 if hasatty: 

3271 print(self.getprompt(), end='') 

3272 line = input().strip() 

3273 except EOFError: 

3274 break 

3275 try: 

3276 self.retval = self.executeline(line) 

3277 except StopIteration: 

3278 break 

3279 if self.retval != 0 and not hasatty: 

3280 break 

3281 finally: 

3282 if hasatty: 

3283 readline.set_completer(oldcompleter) 

3284 readline.set_completer_delims(olddelims) 

3285 return self.retval 

3286 

3287 def execute(self, command: str, *args: str) -> int: 

3288 """Execute `command`. 

3289 

3290 For example: 

3291 

3292 >>> shell.execute('ls', 'foo', 'bar') 

3293 0 

3294 

3295 Arguments: 

3296 command: Command name. 

3297 args: Arguments to the command. 

3298 

3299 Returns: 

3300 Return value. 

3301 

3302 Raises: 

3303 ShellUsageError: Command not found or arguments invalid. 

3304 

3305 .. note:: 

3306 Patterns are expanded by :meth:`executeline`. 

3307 Patterns passed to :meth:`execute` are 

3308 treated as ordinary characters. 

3309 """ 

3310 name = self.aliases.get(command, command) 

3311 try: 

3312 method = getattr(self, f'do_{name}') 

3313 except AttributeError as err: 

3314 raise ShellUsageError(f'{command}: No such command') from err 

3315 try: 

3316 inspect.signature(method).bind(*args) 

3317 except TypeError as err: 

3318 raise ShellUsageError(self.getusage(method)) from err 

3319 self.retval = 0 if (retval := method(*args)) is None else retval 

3320 return self.retval 

3321 

3322 def executeline(self, line: str) -> int: 

3323 """:meth:`Expand <expand>` `line` and :meth:`execute` it. 

3324 

3325 For example: 

3326 

3327 >>> shell.executeline('ls foo bar') 

3328 0 

3329 

3330 Returns: 

3331 Return value. 

3332 

3333 Raises: 

3334 ShellUsageError: Command not found or arguments invalid. 

3335 """ 

3336 if match := re.match(r'\s*([^\w\s#_]+)\s*(.*)', line): 

3337 command = match.group(1) 

3338 args = self.expand(match.group(2)) 

3339 else: 

3340 try: 

3341 command, *args = self.expand(line) 

3342 except ValueError: 

3343 return 0 

3344 return self.execute(command, *args) 

3345 

3346 def executescript(self, script: TextIO) -> int: 

3347 """Split `script` into lines and :meth:`execute <excuteline>` them. 

3348 

3349 For example: 

3350 

3351 >>> with open('scriptfile') as scriptfile: 

3352 >>> shell.executescript(scriptfile) 

3353 0 

3354 

3355 Returns: 

3356 Return value. 

3357 

3358 Raises: 

3359 ShellUsageError: Command not found or arguments invalid. 

3360 """ 

3361 self.retval = 0 

3362 for line in script: 

3363 try: 

3364 self.retval = self.executeline(line) 

3365 except StopIteration: 

3366 break 

3367 if self.retval != 0: 

3368 break 

3369 return self.retval 

3370 

3371 def expand(self, line: str) -> list[str]: 

3372 """Expand the words that comprise `line`. 

3373 

3374 Similar to :manpage:`wordexp(3)`. Patterns are expanded using 

3375 :func:`fnmatch.fnmatchcase`, with filenames being provided by 

3376 the :meth:`completion system <complete>`. 

3377 

3378 For example: 

3379 

3380 >>> class Foo(BaseShell) 

3381 >>> def complete_cmd(self, _, text): 

3382 >>> return [(s, True) for s in ('foo', 'bar', 'baz')] 

3383 >>> 

3384 >>> foo = Foo() 

3385 >>> foo.expand('cmd *') 

3386 ['cmd', 'foo', 'bar', 'baz'] 

3387 

3388 Raises: 

3389 ShellOperationError: Pattern does not match. 

3390 """ 

3391 try: 

3392 command, *args = self.split(line) 

3393 except ValueError: 

3394 return [] 

3395 try: 

3396 complete = getattr(self, f'complete_{command}') 

3397 except AttributeError: 

3398 def complete(_argidx, _text): 

3399 raise ShellUsageError(f'{command} does not accept patterns') 

3400 expanded: list[str] = [command] 

3401 for i, arg in enumerate(args, start=1): 

3402 if isinstance(arg, ShellPattern): 

3403 completions = (c for c, _ in complete(i, '')) # type: ignore 

3404 matches = sorted(arg.expand(completions)) 

3405 if not matches: 

3406 raise ShellOperationError(f'{arg}: No matches') 

3407 expanded.extend(matches) 

3408 else: 

3409 expanded.append(arg) 

3410 return expanded 

3411 

3412 @classmethod 

3413 def getcommands(cls) -> Iterator[str]: 

3414 """Get the shell commands provided by `cls`. 

3415 

3416 For example: 

3417 

3418 >>> class Foo(BaseShell) 

3419 >>> def do_cmd(self, arg1, arg2): 

3420 >>> pass 

3421 >>> 

3422 >>> tuple(Foo.getcommands()) 

3423 ('cmd', 'exit', 'help') 

3424 >>> foo = Foo() 

3425 >>> foo.commands 

3426 ('cmd', 'exit', 'help') 

3427 """ 

3428 for attr in dir(cls): 

3429 with suppress(ValueError): 

3430 prefix, name = attr.split('_', maxsplit=1) 

3431 if prefix == 'do' and name: 

3432 yield name 

3433 

3434 def getprompt(self) -> str: 

3435 """Get the shell prompt. 

3436 

3437 .. tip:: 

3438 

3439 Override this class to change the prompt. 

3440 """ 

3441 return '> ' 

3442 

3443 @staticmethod 

3444 def getusage(func: Callable) -> Optional[str]: 

3445 """Derive a usage message from `func`'s docstring. 

3446 

3447 :func:`getusage` assumes that the docstring has the form 

3448 :samp:`{command} {args} - {description}`. The usage message 

3449 is either the text up to the last dash ("-") or, if the 

3450 docstring does not contain a dash, the whole docstring. 

3451 Leading and trailing whitespace is stripped. 

3452 

3453 For example: 

3454 

3455 >>> def frobnicate(foo, bar): 

3456 >>> \"\"\"frobnicate foo bar - frobnicate foo with bar\"\"\" 

3457 >>> ... 

3458 >>> 

3459 >>> BaseShell.getusage(frobnicate) 

3460 'frobnicate foo bar' 

3461 """ 

3462 if (doc := func.__doc__) and (syn := doc.rsplit('-', 1)[0].strip()): 

3463 return 'usage: ' + syn 

3464 return None 

3465 

3466 @staticmethod 

3467 def hasatty() -> bool: 

3468 """Is standard input a terminal?""" 

3469 try: 

3470 return os.isatty(sys.stdin.fileno()) 

3471 except io.UnsupportedOperation: 

3472 return False 

3473 

3474 @staticmethod 

3475 def split(line: str) -> list['ShellWord']: 

3476 """Split `line` into words in the way a POSIX-compliant shell would. 

3477 

3478 Raises: 

3479 ShellDataError: Quotation marks are unbalanced. 

3480 """ 

3481 addempty: bool = False 

3482 buffer: ShellWord = '' 

3483 escaped: bool = False 

3484 tokens: list[ShellWord] = [] 

3485 quoted: str = '' 

3486 ispattern: bool = False 

3487 

3488 unquotepattern = re.compile(r'\[(.)\]').subn 

3489 def addtoken(): 

3490 if buffer or addempty: 

3491 token = (ShellPattern(buffer) if ispattern else 

3492 unquotepattern(r'\1', buffer)[0]) 

3493 tokens.append(token) 

3494 

3495 for i, char in enumerate(line): 

3496 if quoted: 

3497 if escaped: 

3498 if char in ('\\', '"', '\n'): 

3499 buffer += char 

3500 else: 

3501 buffer += f'\\{char}' 

3502 escaped = False 

3503 elif char == quoted: 

3504 quoted = '' 

3505 elif char == '\\' and quoted == '"': 

3506 escaped = True 

3507 else: 

3508 buffer += f'[{char}]' if char in '*?[' else char 

3509 elif escaped: 

3510 buffer += f'[{char}]' if char in '*?[' else char 

3511 escaped = False 

3512 elif char in ('"', "'"): 

3513 quoted = char 

3514 addempty = True 

3515 elif char == '\\': 

3516 escaped = True 

3517 elif char.isspace(): 

3518 try: 

3519 if line[i + 1].isspace(): 

3520 continue 

3521 except IndexError: 

3522 break 

3523 addtoken() 

3524 buffer = '' 

3525 addempty = False 

3526 ispattern = False 

3527 elif char == '#': 

3528 break 

3529 else: 

3530 if char in '*?[': 

3531 ispattern = True 

3532 buffer += char 

3533 if quoted: 

3534 raise ShellDataError('Unbalanced quotes') 

3535 addtoken() 

3536 return tokens 

3537 

3538 # Basic commands 

3539 def do_exit(self): 

3540 """exit - exit the shell""" 

3541 raise StopIteration() 

3542 

3543 def do_help(self, name: Optional[str] = None): 

3544 """help [command] - list commands/show help for command""" 

3545 if name: 

3546 try: 

3547 print(getattr(self, f'do_{name}').__doc__) 

3548 except AttributeError as err: 

3549 raise ShellUsageError(f'{name}: Unknown command') from err 

3550 else: 

3551 self.columnize(self.commands) 

3552 

3553 # Completers 

3554 def complete_help(self, *_) -> tuple[tuple[str, bool], ...]: 

3555 """Completer for help.""" 

3556 return tuple((c, True) for c in self.commands) 

3557 

3558 # Private methods 

3559 @classmethod 

3560 def _getargs(cls) -> list[ShellWord]: 

3561 """:meth:`Split <split>` line before current completion scope.""" 

3562 begin = readline.get_begidx() 

3563 buffer = readline.get_line_buffer() 

3564 return cls.split(buffer[:begin]) 

3565 

3566 # Attributes 

3567 aliases: dict[str, str] = { 

3568 '!': 'sh', 

3569 '?': 'help' 

3570 } 

3571 """Mapping of aliases to :attr:`commands`.""" 

3572 

3573 commands: tuple[str, ...] 

3574 """Shell commands. Populated by :meth:`__init__`.""" 

3575 

3576 logger: logging.Logger = logging.getLogger(__name__) 

3577 """Logger. 

3578 

3579 Messages are logged with the following priorities: 

3580 

3581 ====================== ====================================== 

3582 Priority Used for 

3583 ====================== ====================================== 

3584 :const:`logging.INFO` Help message when the shell is entered 

3585 ====================== ====================================== 

3586 """ 

3587 

3588 retval: int = 0 

3589 """Return value of the most recently completed command.""" 

3590 

3591 _completions: list[str] = [] 

3592 """Most recent completions.""" 

3593 

3594 _unescpattern: ClassVar[re.Pattern] = re.compile(r'\\([*?\[\]])') 

3595 """Regular expression that un-escapes fnmatch patterns.""" 

3596 

3597 

3598# pylint: disable=too-many-public-methods 

3599class SieveShell(BaseShell): 

3600 """Shell around a `SieveManager` connection.""" 

3601 

3602 def __init__(self, manager: 'SieveManager', clobber: bool = True, 

3603 confirm: ShellCmd = ShellCmd.ALL): 

3604 """Initialize a :class:`SieveShell` object. 

3605 

3606 Arguments: 

3607 manager: Connection to a ManageSieve server. 

3608 clobber: Overwrite files? 

3609 confirm: Shell commands that require confirmation. 

3610 """ 

3611 super().__init__() 

3612 self.clobber = clobber 

3613 self.reqconfirm = confirm 

3614 self.manager = manager 

3615 

3616 # Methods 

3617 def getprompt(self, *args, **kwargs): 

3618 prompt = super().getprompt(*args, **kwargs) 

3619 mgr = self.manager 

3620 if mgr and (url := mgr.geturl()) is not None: 

3621 prompt = ('' if mgr.tls else '(insecure) ') + str(url) + prompt 

3622 return prompt 

3623 

3624 def enter(self) -> int: 

3625 # pylint: disable=redefined-outer-name 

3626 error = self.logger.error 

3627 while True: 

3628 try: 

3629 return super().enter() 

3630 except (ConnectionError, DNSError, ProtocolError, SecurityError): 

3631 raise 

3632 # pylint: disable=broad-exception-caught 

3633 except Exception as err: 

3634 if not self.hasatty(): 

3635 raise 

3636 if isinstance(err, (GetoptError, UsageError)): 

3637 error('%s', err) 

3638 self.retval = 2 

3639 elif isinstance(err, subprocess.CalledProcessError): 

3640 error('%s exited with status %d', 

3641 err.cmd[0], err.returncode) 

3642 self.retval = 1 

3643 elif isinstance(err, (FileNotFoundError, FileExistsError)): 

3644 assert err.errno 

3645 error('%s: %s', err.filename, os.strerror(err.errno)) 

3646 self.retval = 1 

3647 elif isinstance(err, OSError): 

3648 assert err.errno 

3649 error('%s', os.strerror(err.errno)) 

3650 self.retval = 1 

3651 elif isinstance(err, (Error, ValueError)): 

3652 for line in str(err).splitlines(): 

3653 error('%s', line) 

3654 self.retval = 1 

3655 else: 

3656 raise 

3657 # NOTREACHED 

3658 

3659 def execute(self, *args, **kwargs) -> int: 

3660 retval = super().execute(*args, **kwargs) 

3661 if warning := self.manager.warning: 

3662 for line in warning.splitlines(): 

3663 self.logger.warning('%s', escapectrl(line)) 

3664 return retval 

3665 

3666 def editscripts(self, editor: list[str], *args: str): 

3667 """Edit scripts with the given `editor`.""" 

3668 mgr = self.manager 

3669 

3670 def retry(err: Exception, script: str) -> bool: 

3671 print(str(err).rstrip('\r\n'), file=sys.stderr) 

3672 return bool(self.confirm(f'Re-edit {script}?', 

3673 default=ConfirmEnum.YES)) 

3674 

3675 (opts, scripts) = getopt(list(args), 'a') 

3676 for opt, _ in opts: 

3677 if opt == '-a': 

3678 scripts = [active] if (active := mgr.getactive()) else [] 

3679 if scripts: 

3680 mgr.editscripts(editor, list(scripts), catch=retry) 

3681 else: 

3682 self.logger.error('No scripts given') 

3683 

3684 # Shell commands 

3685 def do_activate(self, script: str): 

3686 """activate script - mark script as active""" 

3687 self.manager.setactive(script) 

3688 

3689 # pylint: disable=redefined-loop-name 

3690 def do_caps(self): 

3691 """caps - show server capabilities""" 

3692 print('---') 

3693 if (caps := self.manager.capabilities) is not None: 

3694 for key, value in caps.__dict__.items(): 

3695 if not value: 

3696 continue 

3697 if key in ('implementation', 'language', 

3698 'maxredirects', 'owner', 'version'): 

3699 if isinstance(value, str): 

3700 value = yamlescape(value) 

3701 print(f'{key}: {value}') 

3702 elif key in ('notify', 'sasl', 'sieve'): 

3703 items = [yamlescape(i) if isinstance(i, str) else str(i) 

3704 for i in value] 

3705 # pylint: disable=consider-using-f-string 

3706 print('{}: [{}]'.format(key, ', '.join(items))) 

3707 elif key in ('starttls', 'unauthenticate'): 

3708 print(f'{key}: yes') 

3709 for key, value in caps.notunderstood.items(): 

3710 if value is not None: 

3711 if isinstance(value, str): 

3712 value = yamlescape(value) 

3713 print(f'{key}: {value}') 

3714 print('...') 

3715 

3716 def do_cat(self, *scripts: str): 

3717 """cat [script ...] - concatenate scripts on standard output""" 

3718 for script in scripts: 

3719 sys.stdout.write(self.manager.getscript(script)) 

3720 

3721 def do_cd(self, localdir: str = HOME): 

3722 """cd [localdir] - change local directory""" 

3723 os.chdir(localdir) 

3724 self.logger.info('Changed directory to %s', localdir) 

3725 

3726 # pylint: disable=redefined-loop-name 

3727 def do_cert(self): 

3728 """cert - show the server's TLS certificate.""" 

3729 indent = ' ' * 4 

3730 mgr = self.manager 

3731 if not isinstance(mgr.sock, ssl.SSLSocket): 

3732 raise ShellUsageError('Not a secure connection') 

3733 cert = mgr.sock.getpeercert() 

3734 assert cert 

3735 value: Any 

3736 print('---') 

3737 for key, value in sorted(cert.items()): 

3738 if key in ('OCSP', 'caIssuers', 'crlDistributionPoints'): 

3739 print(f'{key}:') 

3740 for item in sorted(value): 

3741 if isinstance(item, str): 

3742 item = yamlescape(item) 

3743 print(f'{indent}- {item}') 

3744 elif key in ('issuer', 'subject'): 

3745 print(f'{key}:') 

3746 for pairs in sorted(value): 

3747 print(f'{indent}- ', end='') 

3748 for i, (k, v) in enumerate(pairs): 

3749 if isinstance(v, str): 

3750 v = yamlescape(v) 

3751 if i == 0: 

3752 print(f'{k}: {v}') 

3753 else: 

3754 print(f'{indent} {k}: {v}\n') 

3755 elif key == 'subjectAltName': 

3756 print(f'{key}:') 

3757 for k, v in sorted(value): 

3758 v = yamlescape(v) 

3759 print(f'{indent}- {k}: {v}') 

3760 elif isinstance(value, str): 

3761 value = yamlescape(value) 

3762 print(f'{key}: {value}') 

3763 elif isinstance(value, int): 

3764 print(f'{key}: {value}') 

3765 print('...') 

3766 

3767 def do_check(self, localscript: str): 

3768 """check localscript - check whether localscript is valid""" 

3769 with open(localscript, 'rb', encoding='utf8') as file: 

3770 # checkscript performs a blocking send/sendline, 

3771 # but holding a lock should be okay in an interactive app. 

3772 fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB) 

3773 self.manager.checkscript(file) 

3774 self.logger.info('%s is valid', localscript) 

3775 

3776 def do_cmp(self, *args: str) -> int: 

3777 """cmp [-s] script1 [...] scriptN - compare scripts""" 

3778 silent = False 

3779 opts, scripts = getopt(list(args), 's') 

3780 for opt, _ in opts: 

3781 if opt == '-s': 

3782 silent = True 

3783 if len(scripts) < 2: 

3784 message = self.getusage(self.do_cmp) 

3785 assert message 

3786 raise ShellUsageError(message) 

3787 prefix = ', '.join(scripts) 

3788 contents = map(self.manager.getscript, scripts) 

3789 iters = map(str.splitlines, contents) 

3790 for i, lines in enumerate(itertools.zip_longest(*iters), start=1): 

3791 for j, chars in enumerate(itertools.zip_longest(*lines), start=1): 

3792 char1 = chars[0] 

3793 for char2 in chars[1:]: 

3794 if char1 != char2: 

3795 if not silent: 

3796 print(f'{prefix}: line {i}, column {j} differs') 

3797 return 1 

3798 if not silent: 

3799 print(f'{prefix}: equal') 

3800 return 0 

3801 

3802 def do_cp(self, *args: str): 

3803 """cp [-f|-i] source target - re-upload source as target""" 

3804 clobber = self.clobber 

3805 confirm = bool(self.reqconfirm & ShellCmd.CP) 

3806 opts, scripts = getopt(list(args), 'Cfi') 

3807 for opt, _ in opts: 

3808 if opt == '-f': 

3809 clobber = True 

3810 confirm = False 

3811 elif opt == '-i': 

3812 clobber = True 

3813 confirm = True 

3814 try: 

3815 source, target = scripts 

3816 except ValueError as err: 

3817 message = self.getusage(self.do_cp) 

3818 assert message 

3819 raise ShellUsageError(message) from err 

3820 if self.manager.scriptexists(target): 

3821 if not clobber: 

3822 raise FileExistsError(EEXIST, os.strerror(EEXIST), target) 

3823 if confirm and not self.confirm(f'Overwrite {target}?'): 

3824 return 

3825 self.manager.copyscript(source, target) 

3826 

3827 def do_deactivate(self): 

3828 """deactivate - deactivate the active script""" 

3829 self.manager.unsetactive() 

3830 

3831 def do_diff(self, *args: str) -> int: 

3832 """diff <options> script1 script2 - show how scripts differ""" 

3833 pairs, scripts = getopt(list(args), 'C:U:bcu') 

3834 opts = tuple(filter(bool, itertools.chain(*pairs))) 

3835 if len(scripts) != 2: 

3836 message = self.getusage(self.do_diff) 

3837 assert message 

3838 raise ShellUsageError(message) 

3839 cp = self.manager.editscripts(['diff', *opts], scripts, check=False) 

3840 return cp.returncode 

3841 

3842 def do_echo(self, *args: str): 

3843 """echo word [...] - print words to standard output.""" 

3844 print(*args) 

3845 

3846 def do_ed(self, *args: str): 

3847 """ed [-a] script [...] - edit scripts with a line editor""" 

3848 self.editscripts(EDITOR, *args) 

3849 

3850 def do_get(self, *args: str): 

3851 """get [-a] [-f|-i] [-o file] [script ...] - download script""" 

3852 mgr = self.manager 

3853 opts, sources = getopt(list(args), 'afio:') 

3854 output = '' 

3855 clobber = self.clobber 

3856 confirm = bool(self.reqconfirm & ShellCmd.GET) 

3857 multi = len(sources) > 1 

3858 for opt, arg in opts: 

3859 if opt == '-a': 

3860 sources = [active] if (active := mgr.getactive()) else [] 

3861 elif opt == '-f': 

3862 clobber = True 

3863 confirm = False 

3864 elif opt == '-i': 

3865 clobber = True 

3866 confirm = True 

3867 elif opt == '-o': 

3868 if multi: 

3869 raise ShellUsageError('-o: Too many sources') 

3870 output = arg 

3871 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL 

3872 keep = mgr.backup 

3873 for src in sources: 

3874 # Try not to make a backup if the source doesn't exist. 

3875 # Writing to a temporary file would create a worse race condition. 

3876 if keep > 0: 

3877 if not mgr.scriptexists(src): 

3878 raise FileNotFoundError(ENOENT, os.strerror(ENOENT), src) 

3879 targ = output if output else src 

3880 flags = O_CREAT | O_EXCL | O_WRONLY | O_TRUNC 

3881 if path.exists(targ): 

3882 if not clobber: 

3883 raise FileExistsError(EEXIST, os.strerror(EEXIST), targ) 

3884 if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE): 

3885 answer = self.confirm(f'Overwrite {targ}?', multi=multi) 

3886 if answer: 

3887 def getfiles() -> Iterator[str]: 

3888 # pylint: disable=cell-var-from-loop 

3889 return readdir(path.dirname(targ), path.isfile) 

3890 backup(targ, keep, getfiles, shutil.copy, os.remove) 

3891 flags &= ~O_EXCL 

3892 else: 

3893 continue 

3894 fd = os.open(targ, flags, mode=0o644) 

3895 # getscript performs a blocking read, 

3896 # but holding a lock should be okay in an interactive app. 

3897 fcntl.flock(fd, LOCK_SH | LOCK_NB) 

3898 with os.fdopen(fd, 'w', encoding='utf8') as file: 

3899 file.write(mgr.getscript(src)) 

3900 self.logger.info('Downloaded %s as %s', src, targ) 

3901 

3902 def do_ls(self, *args: str): 

3903 """ls [-1al] [script ...] - list scripts""" 

3904 active = False 

3905 long = False 

3906 one = not self.hasatty() 

3907 (opts, fnames) = getopt(list(args), '1al') 

3908 for opt, _ in opts: 

3909 if opt == '-1': 

3910 one = True 

3911 elif opt == '-a': 

3912 active = True 

3913 elif opt == '-l': 

3914 long = True 

3915 if active: 

3916 if script := self.manager.getactive(): 

3917 print(script) 

3918 else: 

3919 self.logger.warning('No active script') 

3920 else: 

3921 scripts = self.manager.listscripts() 

3922 if fnames: 

3923 existing = {fname for fname, _ in scripts} 

3924 for name in set(fnames) - existing: 

3925 raise FileNotFoundError(ENOENT, os.strerror(ENOENT), name) 

3926 scripts = [s for s in scripts if s[0] in fnames] 

3927 scripts.sort() 

3928 if long: 

3929 for fname, active in scripts: 

3930 print('a' if active else '-', fname) 

3931 print('e') 

3932 elif one: 

3933 for script, _ in scripts: 

3934 print(script) 

3935 else: 

3936 words = [f + ('*' if a else '') for f, a in scripts] 

3937 self.columnize(words) 

3938 

3939 def do_more(self, *args: str): 

3940 """more <options> script [...] - display scripts page-by-page.""" 

3941 mgr = self.manager 

3942 pairs, scripts = getopt(list(args), 'aceis') 

3943 opts = list(filter(bool, itertools.chain(*pairs))) 

3944 if '-a' in opts: 

3945 active = mgr.getactive() 

3946 if active is None: 

3947 self.logger.warning('No active script') 

3948 return 

3949 scripts = [active] 

3950 opts.remove('-a') 

3951 elif not scripts: 

3952 message = self.getusage(self.do_more) 

3953 assert message 

3954 raise ShellUsageError(message) 

3955 mgr.editscripts(PAGER + opts, list(scripts)) 

3956 

3957 def do_mv(self, *args: str): 

3958 """mv [-f|-i] source target - rename source to target""" 

3959 clobber = self.clobber 

3960 confirm = bool(self.reqconfirm & ShellCmd.MV) 

3961 mgr = self.manager 

3962 opts, scripts = getopt(list(args), 'Cfi') 

3963 for opt, _ in opts: 

3964 if opt == '-f': 

3965 clobber = True 

3966 confirm = False 

3967 elif opt == '-i': 

3968 clobber = True 

3969 confirm = True 

3970 try: 

3971 source, target = scripts 

3972 except ValueError as err: 

3973 message = self.getusage(self.do_mv) 

3974 assert message 

3975 raise ShellUsageError(message) from err 

3976 if mgr.scriptexists(target): 

3977 if not clobber: 

3978 raise FileExistsError(EEXIST, os.strerror(EEXIST), target) 

3979 if confirm and not self.confirm(f'Overwrite {target}?'): 

3980 return 

3981 mgr.backupscript(target) 

3982 mgr.deletescript(target) 

3983 mgr.renamescript(source, target, emulate=True) 

3984 

3985 def do_put(self, *args: str): 

3986 """put [-f|-i] [-a] [-o name] [localscript ...] - upload scripts""" 

3987 active: Optional[str] = None 

3988 clobber: bool = self.clobber 

3989 confirm: bool = bool(self.reqconfirm & ShellCmd.PUT) 

3990 mgr: SieveManager = self.manager 

3991 output: str = '' 

3992 activate: bool = False 

3993 opts, sources = getopt(list(args), 'Cafio:') 

3994 multi = len(sources) > 1 

3995 for opt, arg in opts: 

3996 if opt == '-a': 

3997 if multi: 

3998 raise ShellUsageError('-a: Only one script can be active') 

3999 active = mgr.getactive() 

4000 activate = True 

4001 if active: 

4002 output = active 

4003 elif opt == '-f': 

4004 clobber = True 

4005 confirm = False 

4006 elif opt == '-i': 

4007 clobber = True 

4008 confirm = True 

4009 elif opt == '-o': 

4010 if multi: 

4011 raise ShellUsageError('-o: Too many sources') 

4012 output = arg 

4013 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL 

4014 for src in sources: 

4015 targ = output if output else src 

4016 if mgr.scriptexists(targ): 

4017 if not clobber: 

4018 raise FileExistsError(EEXIST, os.strerror(EEXIST), targ) 

4019 if answer not in (ConfirmEnum.ALL, ConfirmEnum.NONE): 

4020 answer = self.confirm(f'Overwrite {targ}?', multi=multi) 

4021 if not answer: 

4022 continue 

4023 with open(src, encoding='utf8') as file: 

4024 # checkscript performs a blocking send/sendline, 

4025 # but holding a lock should be okay in an interactive app. 

4026 fcntl.flock(file.fileno(), LOCK_SH | LOCK_NB) 

4027 mgr.putscript(file, targ) 

4028 if activate and targ != active: 

4029 mgr.setactive(targ) 

4030 

4031 def do_python(self): 

4032 """python - enter Python read-evaluate-print loop""" 

4033 hasatty = self.hasatty() 

4034 wrapper = ObjWrapper(self.manager) 

4035 if hasatty: 

4036 oldcompleter = readline.get_completer() 

4037 readline.set_completer(rlcompleter.Completer(wrapper).complete) 

4038 with suppress(AttributeError): 

4039 readline.clear_history() 

4040 readline.set_auto_history(True) 

4041 try: 

4042 with suppress(SystemExit): 

4043 banner = (f'Python {sys.version}\n' 

4044 'Enter "help()" for help and "exit()" to exit') 

4045 code.interact(local=wrapper, banner=banner, exitmsg='') 

4046 finally: 

4047 if hasatty: 

4048 readline.set_completer(oldcompleter) 

4049 with suppress(AttributeError): 

4050 readline.clear_history() 

4051 

4052 def do_rm(self, *args: str): 

4053 """rm [-f|-i] [script ...] - remove script""" 

4054 confirm = bool(self.reqconfirm & ShellCmd.RM) 

4055 opts, scripts = getopt(list(args), 'fi') 

4056 for opt, _ in opts: 

4057 if opt == '-f': 

4058 confirm = False 

4059 elif opt == '-i': 

4060 confirm = True 

4061 answer = ConfirmEnum.NO if confirm else ConfirmEnum.ALL 

4062 multi = len(scripts) > 1 

4063 for script in scripts: 

4064 if answer is not ConfirmEnum.ALL: 

4065 answer = self.confirm(f'Remove {script}?', multi=multi) 

4066 if answer is ConfirmEnum.NONE: 

4067 self.logger.info('Stopped') 

4068 break 

4069 if answer: 

4070 self.manager.deletescript(script) 

4071 

4072 def do_sh(self, *args: str): 

4073 """sh [command] [argument ...] - run system command or system shell""" 

4074 if not args: 

4075 args = (pwd.getpwuid(os.getuid()).pw_shell,) 

4076 subprocess.run(args, check=True) 

4077 

4078 def do_su(self, user: str): 

4079 """su user - manage scripts of user.""" 

4080 mgr = self.manager 

4081 # pylint: disable=protected-access 

4082 _, (args, kwargs) = mgr._getstate() 

4083 kwargs['owner'] = user 

4084 if mgr.login: 

4085 mgr.unauthenticate() 

4086 # Some ManageSieve servers reject the first 

4087 # "AUTHENTICATE" after an "UNAUTHENTICATE". 

4088 for i in range(2): 

4089 try: 

4090 mgr.authenticate(*args, **kwargs) 

4091 except SieveOperationError: 

4092 if i: 

4093 raise 

4094 continue 

4095 break 

4096 

4097 def do_vi(self, *args: str): 

4098 """vi [-a] script [...] - edit scripts with a visual editor""" 

4099 self.editscripts(VISUAL, *args) 

4100 

4101 def do_xargs(self, command: str, *args: str): 

4102 """xargs cmd [arg ...] - call cmd with arguments from standard input""" 

4103 try: 

4104 func = getattr(self, f'do_{command}') 

4105 except AttributeError as err: 

4106 raise ShellUsageError(f'{command}: No such command') from err 

4107 lines = [] 

4108 with suppress(EOFError): 

4109 while line := sys.stdin.readline().rstrip('\n'): 

4110 lines.append(line) 

4111 return func(*args, *lines) 

4112 

4113 # Globbing and tab-completion 

4114 @staticmethod 

4115 def complete_dirs(_: int, text: str) -> list[tuple[str, bool]]: 

4116 """Complete local directory names.""" 

4117 return [(d + '/', False) 

4118 for d in readdir(path.dirname(text), path.isdir)] 

4119 

4120 @staticmethod 

4121 def complete_files(_: int, text: str) -> list[tuple[str, bool]]: 

4122 """Complete local filenames.""" 

4123 return [(f + '/', False) if path.isdir(f) else (f, True) 

4124 for f in readdir(path.dirname(text))] 

4125 

4126 def complete_scripts(self, *_) -> list[tuple[str, bool]]: 

4127 """Complete remote filenames.""" 

4128 return [(s, True) for s, _ in self.manager.listscripts(cached=True)] 

4129 

4130 complete_activate = complete_scripts 

4131 """Completer for activate.""" 

4132 

4133 complete_cat = complete_scripts 

4134 """Completer for cat.""" 

4135 

4136 complete_cd = complete_dirs 

4137 """Completer for cd.""" 

4138 

4139 complete_cmp = complete_scripts 

4140 """Completer for cmp.""" 

4141 

4142 complete_cp = complete_scripts 

4143 """Completer for cp.""" 

4144 

4145 complete_check = complete_files 

4146 """Completer for check.""" 

4147 

4148 complete_diff = complete_scripts 

4149 """Completer for diff.""" 

4150 

4151 complete_ed = complete_scripts 

4152 """Completer for ed.""" 

4153 

4154 complete_get = complete_scripts 

4155 """Completer for get.""" 

4156 

4157 complete_ls = complete_scripts 

4158 """Completer for ls.""" 

4159 

4160 complete_more = complete_scripts 

4161 """Completer for more.""" 

4162 

4163 complete_mv = complete_scripts 

4164 """Completer for mv.""" 

4165 

4166 complete_put = complete_files 

4167 """Completer for put.""" 

4168 

4169 complete_rm = complete_scripts 

4170 """Completer for rm.""" 

4171 

4172 complete_vi = complete_scripts 

4173 """Completer for vi.""" 

4174 

4175 # Properties 

4176 clobber: bool 

4177 """Overwrite files?""" 

4178 

4179 reqconfirm: ShellCmd 

4180 """Commands that require confirmation.""" 

4181 

4182 manager: SieveManager 

4183 """Connection to a ManageSieve server.""" 

4184 

4185 

4186class ObjWrapper(dict): 

4187 """Object wrapper for use with :func:`code.interact`. 

4188 

4189 Arguments: 

4190 obj: Object to wrap. 

4191 """ 

4192 

4193 def __init__(self, obj: Any): 

4194 """Initialize a proxy.""" 

4195 pairs: dict[str, Any] = {} 

4196 for key, value in globals().items(): 

4197 pairs[key] = value 

4198 for cls in obj.__class__.__mro__: 

4199 pairs |= cls.__dict__ 

4200 for name in dir(obj): 

4201 pairs[name] = getattr(obj, name) 

4202 for name in dir(self): 

4203 pairs[name] = getattr(self, name) 

4204 super().__init__(pairs) 

4205 

4206 @staticmethod 

4207 def exit(): 

4208 """Exit the Python read-evaluate-print loop.""" 

4209 raise SystemExit() 

4210 

4211 # Needed, or else `help` ignores the wrapper. 

4212 @staticmethod 

4213 def help(*args, **kwargs): 

4214 """Show help.""" 

4215 help(*args, **kwargs) 

4216 

4217 

4218# 

4219# Configuration 

4220# 

4221 

4222BaseConfigT = TypeVar('BaseConfigT', bound='BaseConfig') 

4223"""Type variable for :class:`BaseConfig`.""" 

4224 

4225 

4226class BaseConfig(UserDict): 

4227 """Base class for configurations.""" 

4228 

4229 def __or__(self: BaseConfigT, other) -> BaseConfigT: 

4230 obj = self.__class__() 

4231 obj.__ior__(self) 

4232 obj.__ior__(other) 

4233 return obj 

4234 

4235 def __ior__(self: BaseConfigT, other) -> BaseConfigT: 

4236 super().__ior__(other) 

4237 with suppress(AttributeError): 

4238 for key, value in other._sections.items(): 

4239 UserDict.__ior__(self._sections[key], value) 

4240 return self 

4241 

4242 def loadfile(self, fname: str): 

4243 """Read configuration variables from `fname`. 

4244 

4245 Raises: 

4246 ClientConfigError: Syntax error. 

4247 """ 

4248 ptr = self 

4249 cwd = os.getcwd() 

4250 with open(fname) as file: 

4251 if basedir := path.dirname(fname): 

4252 os.chdir(basedir) 

4253 try: 

4254 for i, line in enumerate(file, start=1): 

4255 if (pair := line.strip()) and not pair.startswith('#'): 

4256 try: 

4257 key, value = pair.split(maxsplit=1) 

4258 if key == self._section: 

4259 if value not in self._sections: 

4260 self._sections[value] = self.__class__() 

4261 ptr = self._sections[value] 

4262 else: 

4263 ptr.set(key, value) 

4264 except (AttributeError, TypeError, ValueError) as err: 

4265 message = f'{fname}:{i}: {err}' 

4266 raise ClientConfigError(message) from err 

4267 finally: 

4268 os.chdir(cwd) 

4269 

4270 def parse(self, expr: str): 

4271 """Split `expr` into a name and a value and set the variable. 

4272 

4273 `Expr` is split at the first equals sign ("="). 

4274 :samp:`{var}` is equivalent to :samp:`{var}=yes`, 

4275 :samp:`no{var}` to :samp:`{var}=no`. 

4276 

4277 raises: 

4278 AttributeError: Bad variable. 

4279 """ 

4280 value: Union[bool, str] 

4281 try: 

4282 name, value = expr.split('=', maxsplit=1) 

4283 except ValueError: 

4284 if expr.startswith('no'): 

4285 name, value = expr[2:], False 

4286 else: 

4287 name, value = expr, True 

4288 if value == '': 

4289 raise ValueError(f'{name}: Empty') 

4290 self.set(name, value) 

4291 

4292 def set(self, name: str, value): 

4293 """Set the configuration variable `name` to `value`. 

4294 

4295 raises: 

4296 AttributeError: Bad variable. 

4297 """ 

4298 if name.startswith('_'): 

4299 raise AttributeError(f'{name}: Private variable') 

4300 try: 

4301 attr = getattr(self, name) 

4302 except AttributeError as err: 

4303 raise AttributeError(f'{name}: No such variable') from err 

4304 if callable(attr): 

4305 raise AttributeError(f'{name}: Not a variable') 

4306 try: 

4307 setattr(self, name, value) 

4308 except AttributeError as err: 

4309 raise AttributeError(f'{name}: Read-only variable') from err 

4310 except (TypeError, ValueError) as err: 

4311 raise err.__class__(f'{name}: {err}') 

4312 

4313 @property 

4314 def sections(self: BaseConfigT) -> dict[str, BaseConfigT]: 

4315 """Sections in the loaded configuration files.""" 

4316 return self._sections 

4317 

4318 _section: ClassVar[str] 

4319 """Name of the statement that starts a section.""" 

4320 

4321 _sections: dict[str, Any] = {} 

4322 """Sections in the loaded configuration files.""" 

4323 

4324 

4325class BaseVar(ABC): 

4326 """Base class for :class:`BaseConfig` attributes. 

4327 

4328 For example: 

4329 

4330 >>> @dataclasses.dataclass 

4331 >>> class FooConfig(BaseConfig): 

4332 >>> foo = BoolVar(default=False) 

4333 >>> bar = NumVar(cls=int, default=0) 

4334 >>> 

4335 >>> foo = FooConfig(bar=1) 

4336 >>> foo.foo 

4337 False 

4338 >>> foo.bar 

4339 1 

4340 >>> foo.foo = 'yes' 

4341 >>> foo.foo 

4342 True 

4343 >>> foo.bar = '2' 

4344 >>> foo.bar 

4345 2 

4346 """ 

4347 

4348 def __init__(self, default: Any = None): 

4349 """Initialize a configuration variable.""" 

4350 self.default = default 

4351 

4352 def __get__(self, obj: BaseConfig, _: type) -> Any: 

4353 try: 

4354 return obj[self.name] 

4355 except KeyError: 

4356 return self.default 

4357 

4358 def __set__(self, obj: BaseConfig, value: Any): 

4359 if value is None: 

4360 with suppress(KeyError): 

4361 del obj[self.name] 

4362 else: 

4363 obj[self.name] = value 

4364 

4365 def __set_name__(self, _: object, name: str): 

4366 self.name = name 

4367 

4368 name: str 

4369 """Variable name.""" 

4370 

4371 default: Any 

4372 """Default value.""" 

4373 

4374 

4375class ExpandingVarMixin(): 

4376 """Mixin for variables that do word expansion.""" 

4377 

4378 def expand(self, obj: BaseConfig, value: str) -> str: 

4379 """Expand '~' and configuration variables.""" 

4380 assert isinstance(self, BaseVar) 

4381 template = string.Template(value) 

4382 # template.get_identifiers is only available in Python >= v3.11. 

4383 varnames: set[str] = set() 

4384 # pylint: disable=consider-using-f-string 

4385 for pattern in (r'\$(%s)' % template.idpattern, 

4386 r'\$\{(%s)\}' % template.idpattern): 

4387 for match in re.finditer(pattern, value, flags=re.IGNORECASE): 

4388 varnames.add(match.group(1)) 

4389 variables = {} 

4390 for name in varnames: 

4391 try: 

4392 var = getattr(obj, name) 

4393 except AttributeError as err: 

4394 raise ValueError(f'${name}: No such variable') from err 

4395 if var is None: 

4396 raise ValueError(f'${name}: Not set') 

4397 if not isinstance(var, (int, str)): 

4398 raise ValueError(f'${name}: Not a scalar') 

4399 variables[name] = var 

4400 return path.expanduser(template.substitute(variables)) 

4401 

4402 

4403class ListVarMixin(): 

4404 """Mixin for lists.""" 

4405 

4406 splititems: Callable = re.compile(r'\s*,\s*').split 

4407 """Split a comma-separated list into items.""" 

4408 

4409 

4410class BoolVar(BaseVar): 

4411 """Convert "yes" and "no" to :class:`bool`.""" 

4412 

4413 def __set__(self, obj: BaseConfig, value: Union[bool, str]): 

4414 if value in (True, 'yes'): 

4415 super().__set__(obj, True) 

4416 elif value in (False, 'no'): 

4417 super().__set__(obj, False) 

4418 else: 

4419 raise ValueError(f'{value}: Not a boolean') 

4420 

4421 

4422class CmdVar(BaseVar, ExpandingVarMixin): 

4423 """Split up value into a list using :func:`shlex.split`.""" 

4424 

4425 def __set__(self, obj: BaseConfig, value: str): 

4426 super().__set__(obj, shlex.split(value, posix=True)) 

4427 

4428 def __get__(self, obj: BaseConfig, objtype: type) -> Optional[list[str]]: 

4429 return (None if (value := super().__get__(obj, objtype)) is None else 

4430 [self.expand(obj, word) for word in value]) 

4431 

4432 

4433class EnumVar(BaseVar): 

4434 """Convert comma-separated values to an :class:`enum.Enum`.""" 

4435 

4436 def __init__(self, *args, cls: type[enum.Enum], **kwargs): 

4437 """Initialize the variable. 

4438 

4439 Arguments: 

4440 name: Variable name. 

4441 cls: Enumeration type. 

4442 default: Default value. 

4443 """ 

4444 assert issubclass(cls, enum.Enum) 

4445 super().__init__(*args, **kwargs) 

4446 self.cls = cls 

4447 

4448 def __set__(self, obj: BaseConfig, value: Union[enum.Enum, str]): 

4449 if isinstance(value, enum.Enum): 

4450 super().__set__(obj, value) 

4451 elif isinstance(value, str): # type: ignore 

4452 for member in self.cls: 

4453 if member.name.casefold() == value.casefold(): 

4454 super().__set__(obj, member) 

4455 break 

4456 else: 

4457 raise ValueError(f'{value}: No such item') 

4458 else: 

4459 raise TypeError(f'{type(value)}: Not an enumeration') 

4460 

4461 

4462class FilenameVar(BaseVar): 

4463 """Expand ``~user`` and make filenames absolute.""" 

4464 

4465 def __set__(self, obj: BaseConfig, value: Optional[str]): 

4466 if value is None: 

4467 super().__set__(obj, None) 

4468 if isinstance(value, str): 

4469 super().__set__(obj, path.abspath(path.expanduser(value))) 

4470 raise TypeError('{value}: Not a str') 

4471 

4472 

4473class FlagVar(BaseVar, ListVarMixin): 

4474 """Convert comma-separated values to an :class:`int`.""" 

4475 

4476 def __init__(self, *args, cls: type[enum.IntEnum], **kwargs): 

4477 assert issubclass(cls, enum.IntEnum) 

4478 super().__init__(*args, **kwargs) 

4479 self.cls = cls 

4480 

4481 def __set__(self, obj: BaseConfig, value: Union[str, int, enum.IntEnum]): 

4482 if isinstance(value, (int, enum.IntFlag)): 

4483 super().__set__(obj, value) 

4484 elif isinstance(value, str): # type: ignore 

4485 flag = 0 

4486 for name in self.splititems(value): 

4487 for member in self.cls: 

4488 if name.casefold() == member.name.casefold(): 

4489 flag |= member.value 

4490 break 

4491 else: 

4492 raise ValueError(f'{name}: No such item') 

4493 super().__set__(obj, flag) 

4494 else: 

4495 raise TypeError(f'{value}: Neither an int nor a str') 

4496 

4497 cls: type[enum.IntEnum] 

4498 """Enumeration type""" 

4499 

4500 

4501class HostVar(BaseVar): 

4502 """Check whether value is a valid hostname.""" 

4503 

4504 def __set__(self, obj: BaseConfig, value: Optional[str]): 

4505 if isinstance(value, str): 

4506 if not (isinetaddr(value) or ishostname(value)): 

4507 raise ValueError(f'{value}: Neither hostname nor address') 

4508 super().__set__(obj, value) 

4509 elif value is None: 

4510 super().__set__(obj, value) 

4511 else: 

4512 raise TypeError(f'{value}: Not a string') 

4513 

4514 

4515class NumVar(BaseVar): 

4516 """Convert value to a number of type :attr:`cls`.""" 

4517 

4518 def __init__(self, *args, cls: type = int, 

4519 minval: Optional[Union[float, int]] = None, 

4520 maxval: Optional[Union[float, int]] = None, 

4521 **kwargs): 

4522 """Initialize the variable. 

4523 

4524 Arguments: 

4525 name: Variable name. 

4526 cls: Number type. 

4527 minval: Smallest permissible value. 

4528 maxval: Greatest permissible value. 

4529 default: Default value. 

4530 """ 

4531 super().__init__(*args, **kwargs) 

4532 self.cls = cls 

4533 self.minval = minval 

4534 self.maxval = maxval 

4535 

4536 def __set__(self, obj: BaseConfig, value: Union[int, float, str]): 

4537 try: 

4538 num = self.cls(value) 

4539 except ValueError as err: 

4540 raise ValueError(f'{value}: Not a number') from err 

4541 if self.minval is not None and num < self.minval: 

4542 raise ValueError(f'{value} < {self.minval}') 

4543 if self.maxval is not None and num > self.maxval: 

4544 raise ValueError(f'{value} > {self.maxval}') 

4545 super().__set__(obj, num) 

4546 

4547 cls: type 

4548 """Number type.""" 

4549 

4550 minval: Optional[Union[float, int]] 

4551 """Minimum value.""" 

4552 

4553 maxval: Optional[Union[float, int]] 

4554 """Maximum value.""" 

4555 

4556 

4557class SASLMechVar(BaseVar, ListVarMixin): 

4558 """Convert SASL mechanism names to :class:`BaseAuth` subclasses.""" 

4559 

4560 def __set__(self, obj: BaseConfig, 

4561 value: Union[Iterable[type[BaseAuth]], str]): 

4562 if isinstance(value, str): 

4563 classes = BaseAuth.getmechs(obsolete=True) 

4564 mechs = [] 

4565 for name in self.splititems(value.casefold()): 

4566 matches = [] 

4567 for cls in classes: 

4568 if fnmatch.fnmatchcase(cls.name.casefold(), name): 

4569 if cls in mechs: 

4570 raise ValueError(f'{cls.name}: Duplicate') 

4571 matches.append(cls) 

4572 if not matches: 

4573 raise ValueError(f'{name}: No matches') 

4574 mechs.extend(matches) 

4575 elif isinstance(value, Sequence): 

4576 mechs = value # type: ignore[assignment] 

4577 else: 

4578 raise TypeError(f'{value}: Not a SASL mechanism') 

4579 super().__set__(obj, mechs) 

4580 

4581 

4582class UniqueVar(BaseVar): 

4583 """Variable the value of which must be unique.""" 

4584 

4585 def __set__(self, obj: BaseConfig, value: str): 

4586 values = self.__class__.values 

4587 if value in values: 

4588 raise ValueError(f'{value}: Already in use') 

4589 super().__set__(obj, value) 

4590 values.add(value) 

4591 

4592 values: ClassVar[set] = set() 

4593 

4594 

4595SieveConfigT = TypeVar('SieveConfigT', bound='SieveConfig') 

4596 

4597 

4598class SieveConfig(BaseConfig): 

4599 """Configuration for the SieveManager command-line client.""" 

4600 

4601 @classmethod 

4602 def fromfiles(cls: type[SieveConfigT], *fnames: str) -> SieveConfigT: 

4603 """Create a new configuration from `fnames`. 

4604 

4605 Arguments: 

4606 fnames: Filenames (default: :data:`CONFIGFILES`) 

4607 

4608 Raises: 

4609 FileNotFoundError: A given file could not be found. 

4610 """ 

4611 obj = cls() 

4612 for fname in (fnames if fnames else CONFIGFILES): 

4613 try: 

4614 obj.loadfile(fname) 

4615 except FileNotFoundError: 

4616 if fnames: 

4617 raise 

4618 return obj 

4619 

4620 def __init__(self, *args, **kwargs): 

4621 """Create a new configuration. 

4622 

4623 Arguments: 

4624 args: Positional arguments used to initialize the back-end. 

4625 kwargs: Keyword arguments used as initial configuration values. 

4626 """ 

4627 super().__init__(*args) 

4628 for key, value in kwargs.items(): 

4629 setattr(self, key, value) 

4630 

4631 def getmanager(self, **variables) -> SieveManager: 

4632 """Open a :class:`SieveManager` connection with this configuration. 

4633 

4634 Arguments: 

4635 variables: Configuration variables. 

4636 

4637 Raises: 

4638 ShellOperationError: All supported SASL mechanisms failed. 

4639 netrc.NetrcParseError: :file:`.netrc` could not be parsed. 

4640 """ 

4641 

4642 conf = self | self.__class__(**variables) 

4643 mgr = SieveManager(backup=conf.backups, memory=conf.memory) 

4644 

4645 # Helper to obtain passwords and passphrases 

4646 def getpass_(passmgr: Optional[list[str]], prompt: str) -> str: 

4647 if passmgr and (pass_ := readoutput(*passmgr, logger=mgr.logger)): 

4648 return pass_ 

4649 return askpass(prompt) 

4650 

4651 # Logging level 

4652 mgr.logger.setLevel(conf.verbosity) 

4653 

4654 # TLS 

4655 sslcontext = mgr.sslcontext 

4656 if (cadir := conf.cadir) or (cafile := conf.cafile): 

4657 sslcontext.load_verify_locations(cafile, cadir) 

4658 if cert := conf.cert: 

4659 def getpassphrase(): 

4660 return getpass_(conf.getpassphrase, 'Certificate passphrase: ') 

4661 sslcontext.load_cert_chain(cert, conf.key, getpassphrase) 

4662 if conf.x509strict: 

4663 sslcontext.verify_flags |= ssl.VERIFY_X509_STRICT 

4664 

4665 # Connect 

4666 mgr.open(conf.host, port=conf.port, timeout=conf.timeout, 

4667 tls=conf.tls, ocsp=conf.ocsp) 

4668 

4669 # Authenticate 

4670 logauth = conf.verbosity <= LogLevel.AUTH 

4671 if (sasl := [s for s in conf.saslmechs if ExternalAuth in s.__mro__]): 

4672 with suppress(SASLCapabilityError): 

4673 mgr.authenticate(conf.login, owner=conf.owner, 

4674 prepare=conf.saslprep, sasl=sasl, 

4675 logauth=logauth) 

4676 return mgr 

4677 if (sasl := [s for s in conf.saslmechs if BasePwdAuth in s.__mro__]): 

4678 password = (conf.password if conf.password else 

4679 getpass_(conf.getpassword, 'Password: ')) 

4680 with suppress(SASLCapabilityError): 

4681 mgr.authenticate(conf.login, password, owner=conf.owner, 

4682 prepare=conf.saslprep, sasl=sasl, 

4683 logauth=logauth) 

4684 return mgr 

4685 raise ShellOperationError('SASL mechanisms exhausted') 

4686 

4687 def getshell(self, manager: SieveManager, **variables): 

4688 """Get a configured :class:`SieveShell` that wraps `manager`.""" 

4689 conf = self | self.__class__(**variables) 

4690 return SieveShell(manager, clobber=conf.clobber, confirm=conf.confirm) 

4691 

4692 def loadfile(self, fname: str): 

4693 """Read configuration from `fname`. 

4694 

4695 Raises: 

4696 ClientConfigError: Syntax error. 

4697 ClientSecurityError: Permissions are insecure. 

4698 """ 

4699 super().loadfile(fname) 

4700 mode = os.stat(fname).st_mode 

4701 if mode & 0o22: 

4702 raise ClientSecurityError(f'{fname}: Is group- or world-writable') 

4703 if mode & 0o44: 

4704 for account in (self, *self._sections.values()): 

4705 if 'password' in account: 

4706 message = f'{fname}: Is group- or world-readable' 

4707 raise ClientSecurityError(message) 

4708 

4709 def loadaccount(self, host: str = 'localhost', 

4710 login: Optional[str] = None): 

4711 """Load the section for `login` on `host`.""" 

4712 self |= self.__class__(host=host, login=login) 

4713 hosts = readnetrc(self.netrc) 

4714 

4715 # Host 

4716 for name, section in self.sections.items(): 

4717 if section.alias == self.host: 

4718 try: 

4719 self.host = section['host'] 

4720 except KeyError: 

4721 self.host = name.rsplit('@', maxsplit=1)[-1] 

4722 break 

4723 with suppress(KeyError): 

4724 self |= self.sections[self.host] 

4725 

4726 # Login 

4727 if not self.login: 

4728 try: 

4729 self.login = hosts[self.host][0] 

4730 except KeyError: 

4731 self.login = getpass.getuser() 

4732 with suppress(KeyError): 

4733 self |= self.sections[f'{self.login}@{self.host}'] 

4734 

4735 # Password 

4736 if not self.password: 

4737 with suppress(KeyError): 

4738 self.password = hosts[self.host][2] 

4739 

4740 alias: UniqueVar = UniqueVar() 

4741 """Alias for a host.""" 

4742 

4743 backups = NumVar(cls=int, default=0, minval=0) 

4744 """How many backups to keep.""" 

4745 

4746 cadir = FilenameVar() 

4747 """Custom CA directory.""" 

4748 

4749 cafile = FilenameVar() 

4750 """Custom CA file.""" 

4751 

4752 clobber = BoolVar(default=True) 

4753 """Overwrite files?""" 

4754 

4755 confirm = FlagVar(default=ShellCmd.ALL, cls=ShellCmd) 

4756 """Which shell commands must be confirmed?""" 

4757 

4758 cert = FilenameVar() 

4759 """Client TLS certificate.""" 

4760 

4761 getpassphrase = CmdVar() 

4762 """Command that prints the passphrase for the TLS key.""" 

4763 

4764 getpassword = CmdVar() 

4765 """Command that prints a password.""" 

4766 

4767 host = HostVar(default='localhost') 

4768 """Host to connect to by default.""" 

4769 

4770 key = FilenameVar() 

4771 """Client TLS key.""" 

4772 

4773 login = BaseVar() 

4774 """User to login as (authentication ID).""" 

4775 

4776 memory = NumVar(default=524_288, minval=0) 

4777 """How much memory to use for temporary data.""" 

4778 

4779 netrc: Optional[str] = os.getenv('NETRC') 

4780 """Filename of the .netrc file.""" 

4781 

4782 ocsp = BoolVar(default=True) 

4783 """Check whether server certificate was revoked?""" 

4784 

4785 owner = BaseVar(default='') 

4786 """User whose scripts to manage (authorization ID).""" 

4787 

4788 password = BaseVar() 

4789 """Password to login with.""" 

4790 

4791 port = NumVar(default=4190, minval=0, maxval=65535) 

4792 """Port to connect to by default.""" 

4793 

4794 saslmechs = SASLMechVar(default=BasePwdAuth.getmechs()) 

4795 """How to authenticate.""" 

4796 

4797 saslprep = FlagVar(default=SASLPrep.ALL, cls=SASLPrep) 

4798 """Which credentials to prepare.""" 

4799 

4800 timeout = NumVar(default=socket.getdefaulttimeout(), cls=float, minval=0) 

4801 """Network timeout.""" 

4802 

4803 tls = BoolVar(default=True) 

4804 """Use TLS?""" 

4805 

4806 verbosity = EnumVar(default=LogLevel.INFO, cls=LogLevel) 

4807 """Logging level.""" 

4808 

4809 x509strict = BoolVar(default=True) 

4810 """Be strict when verifying TLS certificates?""" 

4811 

4812 _section = 'account' 

4813 

4814 

4815# 

4816# Terminal I/O 

4817# 

4818 

4819class TermIO(io.TextIOWrapper): 

4820 """I/O for the controlling terminal.""" 

4821 

4822 def __init__(self, *args, **kwargs): 

4823 """Open the controlling terminal.""" 

4824 super().__init__(io.FileIO('/dev/tty', 'r+'), *args, **kwargs) 

4825 

4826 

4827# 

4828# Logging 

4829# 

4830 

4831LogIOWrapperT = TypeVar('LogIOWrapperT', bound='LogIOWrapper') 

4832"""Type variable for :class:`LogIOWrapper`.""" 

4833 

4834 

4835class LogIOWrapper(): 

4836 """Logger for file-like objects.""" 

4837 

4838 @classmethod 

4839 def wrap(cls: type[LogIOWrapperT], 

4840 file: Union[BinaryIO, io.BufferedRWPair], 

4841 logger: logging.Logger = logging.getLogger(__name__), 

4842 level: int = logging.DEBUG, 

4843 formats: tuple[str, str] = ('S: %s', 'C: %s'), 

4844 encoding: str = 'utf8') \ 

4845 -> Union[BinaryIO, io.BufferedRWPair, LogIOWrapperT]: 

4846 """Wrap a file in a :class:`LogIOWrapper` if logging is enabled. 

4847 

4848 Takes the same arguments as :meth:`__init__`. 

4849 

4850 Returns: 

4851 The file or a :class:`LogIOWrapper` that wraps the file. 

4852 """ 

4853 if logger.isEnabledFor(level): 

4854 return cls(file, encoding, level, logger, formats) 

4855 return file 

4856 

4857 def __init__(self, file: Union[BinaryIO, io.BufferedRWPair], 

4858 encoding: str = 'utf8', level: int = logging.DEBUG, 

4859 logger: logging.Logger = logging.getLogger(__name__), 

4860 formats: tuple[str, str] = ('S: %s', 'C: %s')): 

4861 """Log I/O to `file`. 

4862 

4863 Arguments: 

4864 file: File-like object opened in binary mode. 

4865 encoding: `file`'s encoding. 

4866 level: Logging priority. 

4867 logger: Logger. 

4868 formats: Message formats; "%s" is replaced with I/O. 

4869 """ 

4870 splitlines = re.compile(rb'\r?\n').split 

4871 buffers = (bytearray(), bytearray()) 

4872 

4873 def extv(buf: bytearray, vec: Iterable[Iterable[int]]) -> None: 

4874 for elem in vec: 

4875 buf.extend(elem) 

4876 

4877 def getdecorator( 

4878 buf: bytearray, fmt: str, arg=None, 

4879 ext: Callable[[bytearray, Iterable], None] = bytearray.extend 

4880 ) -> Callable[[Callable[..., T]], Callable[..., T]]: 

4881 def decorator(func: Callable[..., T]) -> Callable[..., T]: 

4882 def wrapper(*args, **kwargs) -> T: 

4883 retval = func(*args, **kwargs) 

4884 if not self.quiet: 

4885 data = retval if arg is None else args[arg] 

4886 ext(buf, data) # type: ignore 

4887 ptr: Union[bytes, bytearray] = buf 

4888 while True: 

4889 try: 

4890 line, ptr = splitlines(ptr, maxsplit=1) 

4891 except ValueError: 

4892 break 

4893 self.log(line, fmt) 

4894 buf[:] = ptr 

4895 return retval 

4896 return wrapper 

4897 return decorator 

4898 

4899 logread = getdecorator(buffers[0], formats[0]) 

4900 logreadinto = getdecorator(buffers[0], formats[0], arg=0) 

4901 logreadv = getdecorator(buffers[0], formats[0], ext=extv) 

4902 logwrite = getdecorator(buffers[1], formats[1], arg=0) 

4903 logwritev = getdecorator(buffers[1], formats[1], arg=0, ext=extv) 

4904 

4905 self.read = logread(file.read) 

4906 self.readline = logread(file.readline) 

4907 self.readlines = logreadv(file.readlines) 

4908 self.write = logwrite(file.write) 

4909 self.writelines = logwritev(file.writelines) 

4910 

4911 if isinstance(file, io.RawIOBase): 

4912 self.readall = logread(file.readall) 

4913 self.readinto = logreadinto(file.readinto) # type: ignore 

4914 

4915 if isinstance(file, io.BufferedIOBase): 

4916 self.read1 = logread(file.read1) 

4917 self.readinto1 = logread(file.readinto1) 

4918 self.readinto = logreadinto(file.readinto) 

4919 

4920 self.buffers = buffers 

4921 self.encoding = encoding 

4922 self.formats = formats 

4923 self.file = file 

4924 self.level = level 

4925 self.logger = logger 

4926 

4927 def __del__(self): 

4928 file = self.file 

4929 if not file.closed: 

4930 file.flush() 

4931 file.close() 

4932 if not self.quiet: 

4933 for buf, fmt in zip(self.buffers, self.formats): 

4934 if buf: 

4935 self.log(buf, fmt) 

4936 

4937 def __getattr__(self, name): 

4938 return getattr(self.file, name) 

4939 

4940 def __iter__(self): 

4941 return self 

4942 

4943 def __next__(self): 

4944 if line := self.readline(): 

4945 return line 

4946 raise StopIteration() 

4947 

4948 def log(self, line: Union[bytearray, bytes], fmt: str): 

4949 """Log `line` with `fmt`.""" 

4950 decoded = escapectrl(line.rstrip(b'\r\n').decode(self.encoding)) 

4951 self.logger.log(self.level, fmt, decoded) 

4952 

4953 buffers: tuple[bytearray, bytearray] 

4954 """Logging buffers.""" 

4955 

4956 encoding: str 

4957 """:attr:`file`'s encoding.""" 

4958 

4959 file: Union[BinaryIO, io.BufferedRWPair] 

4960 """Underlying file-like object.""" 

4961 

4962 formats: tuple[str, str] 

4963 """Logging formats.""" 

4964 

4965 level: int 

4966 """Logging level.""" 

4967 

4968 logger: logging.Logger 

4969 """Logger.""" 

4970 

4971 quiet: bool = False 

4972 """Log I/O?""" 

4973 

4974 read: Callable[..., bytes] 

4975 """Read from :attr:`file`.""" 

4976 

4977 readinto: Callable[..., int] 

4978 """Read from :attr:`file` into a buffer.""" 

4979 

4980 readline: Callable[..., bytes] 

4981 """Read a line from :attr:`file`.""" 

4982 

4983 readlines: Callable[..., list[bytes]] 

4984 """Read all lines from :attr:`file`.""" 

4985 

4986 write: Callable[..., int] 

4987 """Write to :attr:`file`.""" 

4988 

4989 writelines: Callable[..., None] 

4990 """Write lines to :attr:`file`.""" 

4991 

4992 

4993# 

4994# Signal handling 

4995# 

4996 

4997SignalHandlingFunc = Callable[[int, Union[types.FrameType, None]], Any] 

4998"""Alias for signal handling functions.""" 

4999 

5000 

5001SignalHandler = Union[SignalHandlingFunc, int, None] 

5002"""Alias for signal handlers.""" 

5003 

5004 

5005@dataclass(frozen=True) 

5006class SignalCaught(Exception): 

5007 """A signal was caught.""" 

5008 

5009 @classmethod 

5010 def throw(cls, signo: int, frame: Optional[types.FrameType]): 

5011 """Raise a :exc:`SignalCaught` exception.""" 

5012 raise cls(signo, frame) 

5013 

5014 @classmethod 

5015 def register(cls, signals: Iterable[int]) -> tuple[SignalHandler, ...]: 

5016 """Register :meth:`throw` as handler for the given `signals`. 

5017 

5018 Arguments: 

5019 signals: Signals to register :meth:`throw` as handler for. 

5020 

5021 Returns: 

5022 Old signal handlers. 

5023 """ 

5024 return tuple(signal.signal(s, cls.throw) for s in signals) 

5025 

5026 @classmethod 

5027 def catch(cls, *signals: int) -> \ 

5028 Callable[[Callable[..., T]], Callable[..., T]]: 

5029 """Decorator that :meth:`handles <handle>` `signals`. 

5030 

5031 If one of the given `signals` is caught, :exc:`SignalCaught` is raised. 

5032 If that exception is not caught, the process group is terminated, 

5033 the signal handler reset, and the signal re-raised. 

5034 """ 

5035 def decorator(func: Callable[..., T]) -> Callable[..., T]: 

5036 # pylint: disable=inconsistent-return-statements 

5037 def wrapper(*args, **kwargs) -> T: # type: ignore[return] 

5038 handlers = cls.register(signals) 

5039 try: 

5040 return func(*args, **kwargs) 

5041 except cls as exc: 

5042 logging.critical(exc) 

5043 signal.signal(SIGTERM, SIG_IGN) 

5044 os.killpg(os.getpgrp(), SIGTERM) 

5045 signal.signal(exc.signo, SIG_DFL) 

5046 signal.raise_signal(exc.signo) 

5047 finally: 

5048 for signo, handler in zip(signals, handlers): 

5049 signal.signal(signo, handler) 

5050 # NOTREACHED 

5051 for name in dir(func): 

5052 with suppress(AttributeError, TypeError, ValueError): 

5053 setattr(wrapper, name, getattr(func, name)) 

5054 return wrapper 

5055 return decorator 

5056 

5057 def __str__(self): 

5058 desc = signal.strsignal(self.signo) 

5059 return desc.split(':')[0] if desc else f'caught signal {self.signo}' 

5060 

5061 signo: int 

5062 """Signal number.""" 

5063 

5064 frame: Optional[types.FrameType] = None 

5065 """Stack frame.""" 

5066 

5067 

5068# 

5069# Errors 

5070# 

5071 

5072# Error types 

5073class Error(Exception): 

5074 """Base class for errors.""" 

5075 

5076 

5077class CapabilityError(Error): 

5078 """Base class for capability errors.""" 

5079 

5080 

5081class ConfigError(Error): 

5082 """Base class for configuration errors.""" 

5083 

5084 

5085class DataError(Error): 

5086 """Base class for data errors.""" 

5087 

5088 

5089class OperationError(Error): 

5090 """Base class for operation errors.""" 

5091 

5092 

5093class ProtocolError(Error): 

5094 """Base class for protocol errors. 

5095 

5096 .. danger:: 

5097 Continuing after a :exc:`ProtocolError` may cause undefined behaviour. 

5098 """ 

5099 

5100 

5101class SecurityError(Error): 

5102 """Base class for security errors. 

5103 

5104 .. danger:: 

5105 Continuing after a :exc:`SecurityError` compromises transmitted data. 

5106 """ 

5107 

5108 

5109class SoftwareError(Error): 

5110 """Base class for software errors.""" 

5111 

5112 

5113class UsageError(Error): 

5114 """Base class for usage errors.""" 

5115 

5116 

5117# Client errors 

5118class ClientError(Error): 

5119 """Base class for client errors.""" 

5120 

5121 

5122class ClientConfigError(ClientError, ConfigError): 

5123 """Client configuration error.""" 

5124 

5125 

5126class ClientConnectionError(ClientError, ConnectionError): 

5127 """Client-side connection error.""" 

5128 

5129 

5130class ClientOperationError(ClientError, OperationError): 

5131 """Client-side operation error.""" 

5132 

5133 

5134class ClientSecurityError(ClientError, SecurityError): 

5135 """Client security error.""" 

5136 

5137 

5138class ClientSoftwareError(ClientError, SoftwareError): 

5139 """Client software error (i.e., a bug).""" 

5140 

5141 

5142# DNS errors 

5143class DNSError(Error): 

5144 """Base class for DNS errors.""" 

5145 

5146 

5147class DNSDataError(Error): 

5148 """DNS data error.""" 

5149 

5150 

5151class DNSOperationError(DNSError, OperationError): 

5152 """DNS operation error.""" 

5153 

5154 

5155class DNSSoftwareError(DNSError, SoftwareError): 

5156 """DNS software error.""" 

5157 

5158 

5159# HTTP errors 

5160class HTTPError(Error): 

5161 """Base class for HTTP errors.""" 

5162 

5163 

5164class HTTPOperationError(HTTPError, OperationError): 

5165 """HTTP operation error.""" 

5166 

5167 

5168class HTTPUsageError(HTTPError, ProtocolError): 

5169 """HTTP usage error.""" 

5170 

5171 

5172# OCSP errors 

5173class OCSPError(Error): 

5174 """Base class for OCSP errors.""" 

5175 

5176 

5177class OCSPDataError(OCSPError, DataError): 

5178 """OCSP data error.""" 

5179 

5180 

5181class OCSPOperationError(OCSPError, OperationError): 

5182 """OCSP operation error.""" 

5183 

5184 

5185# SASL errors 

5186class SASLError(Error): 

5187 """Base class for SASL errors.""" 

5188 

5189 

5190class SASLCapabilityError(Error): 

5191 """SASL capability error.""" 

5192 

5193 

5194class SASLProtocolError(SASLError, ProtocolError): 

5195 """Server violated the SASL protocol.""" 

5196 

5197 

5198class SASLSecurityError(SASLError, SecurityError): 

5199 """SASL security error.""" 

5200 

5201 

5202# Shell errors 

5203class ShellError(Error): 

5204 """Base class for shell errors.""" 

5205 

5206 

5207class ShellDataError(ShellError, DataError): 

5208 """Shell data error.""" 

5209 

5210 

5211class ShellOperationError(ShellError, OperationError): 

5212 """Shell operation error.""" 

5213 

5214 

5215class ShellUsageError(ShellError, UsageError): 

5216 """Shell usage error.""" 

5217 

5218 

5219# ManageSieve errors 

5220class SieveError(Error): 

5221 """Base class for ManageSieve errors.""" 

5222 

5223 

5224class SieveCapabilityError(SieveError, CapabilityError): 

5225 """Capability not supported by the server.""" 

5226 

5227 

5228class SieveConnectionError(Response, SieveError, ConnectionError): 

5229 """Server said "BYE".""" 

5230 

5231 # pylint: disable=redefined-outer-name 

5232 def __init__(self, response: Atom = Atom('BYE'), 

5233 code: tuple[Word, ...] = (), 

5234 message: Optional[str] = None): 

5235 super().__init__(response=response, code=code, message=message) 

5236 

5237 

5238class SieveOperationError(Response, SieveError, OperationError): 

5239 """Server said "NO".""" 

5240 

5241 # pylint: disable=redefined-outer-name 

5242 def __init__(self, response: Atom = Atom('NO'), 

5243 code: tuple[Word, ...] = (), 

5244 message: Optional[str] = None): 

5245 super().__init__(response=response, code=code, message=message) 

5246 

5247 

5248class SieveProtocolError(SieveError, ProtocolError): 

5249 """Server violated the ManageSieve protocol error.""" 

5250 

5251 

5252# TLS errors 

5253class TLSError(Error): 

5254 """Base class for TLS errors.""" 

5255 

5256 

5257class TLSCapabilityError(TLSError, CapabilityError): 

5258 """TLS capability error.""" 

5259 

5260 

5261class TLSSecurityError(TLSError, SecurityError): 

5262 """TLS security error.""" 

5263 

5264 

5265class TLSSoftwareError(TLSError, SoftwareError): 

5266 """TLS software error.""" 

5267 

5268 

5269# 

5270# Helpers 

5271# 

5272 

5273def askpass(prompt: str) -> str: 

5274 """Prompt for a password on the controlling terminal.""" 

5275 with TermIO() as tty: 

5276 return getpass.getpass(prompt, stream=tty) 

5277 

5278 

5279def backup(file: str, keep: int, getfiles: Callable[[], Iterable[str]], 

5280 copy: Callable[[str, str], Any], remove: Callable[[str], Any]): 

5281 """Make an Emacs-style backup of `file`. 

5282 

5283 `keep` = 1 

5284 :file:`file` is backed up as :file:`file~`. 

5285 

5286 `keep` > 1 

5287 :file:`file` is backed up as :file:`file.~{n}~`, where 

5288 `n` starts with 1 and increments with each backup. 

5289 

5290 Arguments: 

5291 file: File to back up. 

5292 keep: How many copies to keep. 

5293 copy: Function that copies the file. 

5294 getfiles: Function that returns a list of files. 

5295 remove: Function that removes a file. 

5296 

5297 Raises: 

5298 ValueError: `keep` is < 0. 

5299 """ 

5300 if keep < 0: 

5301 raise ValueError('keep: must be >= 0') 

5302 if keep == 0: 

5303 return 

5304 if keep == 1: 

5305 copy(file, file + '~') 

5306 else: 

5307 backupexpr = re.escape(file) + r'\.~(\d+)~' 

5308 matchbackup = re.compile(backupexpr).fullmatch 

5309 backups = sorted((int(match.group(1)), file) for file in getfiles() 

5310 if (match := matchbackup(file))) 

5311 for _, bak in backups[:-(keep - 1)]: 

5312 remove(bak) 

5313 counter = backups[-1][0] + 1 if backups else 1 

5314 copy(file, f'{file}.~{counter}~') 

5315 

5316 

5317def bell(): 

5318 """Print a BEL to the controlling terminal, if there is one.""" 

5319 try: 

5320 with TermIO() as tty: 

5321 print('\a', end='', file=tty) 

5322 except FileNotFoundError: 

5323 pass 

5324 

5325 

5326def certrevoked(cert, logger: logging.Logger = logging.getLogger(__name__)) \ 

5327 -> bool: 

5328 """Check if `cert` has been revoked. 

5329 

5330 Raises: 

5331 OCSPDataError: `cert` contains no authority information. 

5332 OCSPOperationError: no authoritative response. 

5333 

5334 .. seealso:: 

5335 :rfc:`5019` 

5336 Lightweight OCSP Profile 

5337 :rfc:`6960` 

5338 Online Certificate Status Protocol (OCSP) 

5339 """ 

5340 try: 

5341 issuers, responders = getcertauthinfo(cert) 

5342 except ExtensionNotFound as err: 

5343 raise OCSPDataError('no authority information') from err 

5344 for caurl in issuers: 

5345 try: 

5346 der = httpget(caurl) 

5347 except (urllib.error.URLError, HTTPError) as err: 

5348 logger.error(err) 

5349 continue 

5350 ca = x509.load_der_x509_certificate(der) 

5351 builder = ocsp.OCSPRequestBuilder() 

5352 # SHA1 is mandated by RFC 5019. 

5353 req = builder.add_certificate(cert, ca, SHA1()).build() # nosec B303 

5354 # pylint: disable=redefined-outer-name 

5355 path = b64encode(req.public_bytes(Encoding.DER)).decode('ascii') 

5356 for responder in responders: 

5357 statusurl = urllib.parse.urljoin(responder, path) 

5358 try: 

5359 res = ocsp.load_der_ocsp_response(httpget(statusurl)) 

5360 except HTTPError as err: 

5361 logger.error(err) 

5362 continue 

5363 if res.response_status == OCSPResponseStatus.SUCCESSFUL: 

5364 try: 

5365 now = datetime.datetime.now(tz=datetime.UTC) # novermin 

5366 end = res.next_update_utc # type: ignore 

5367 if now < res.this_update_utc: # type: ignore 

5368 continue 

5369 if end is not None and now >= end: 

5370 continue 

5371 except AttributeError: 

5372 warnings.filterwarnings( 

5373 action='ignore', 

5374 category=CryptographyDeprecationWarning 

5375 ) 

5376 now = datetime.datetime.now() 

5377 if now < res.this_update: 

5378 continue 

5379 if (end := res.next_update) is not None and now >= end: 

5380 continue 

5381 if res.certificate_status == OCSPCertStatus.REVOKED: 

5382 return True 

5383 if res.certificate_status == OCSPCertStatus.GOOD: 

5384 return False 

5385 raise OCSPOperationError('no authoritative OCSP response') 

5386 

5387 

5388def escapectrl(chars: str) -> str: 

5389 """Escape control characters.""" 

5390 categories = map(unicodedata.category, chars) 

5391 escaped = [fr'\u{ord(char):04x}' if cat.startswith('C') else char 

5392 for char, cat in zip(chars, categories)] 

5393 return ''.join(escaped) 

5394 

5395 

5396def httpget(url: str) -> bytes: 

5397 """Download a file from `url` using HTTP. 

5398 

5399 Raises: 

5400 HTTPUsageError: `url` is not an HTTP URL. 

5401 HTTPOperationError: "GET" failed. 

5402 """ 

5403 while True: 

5404 if not url.startswith('http://'): 

5405 raise HTTPUsageError(f'{url}: not an HTTP URL') 

5406 with urllib.request.urlopen(url) as res: # nosec B310 

5407 if res.status == 200: 

5408 return res.read() 

5409 if res.status in (301, 302, 303, 307, 308): 

5410 if url := res.getheader('Location'): 

5411 continue 

5412 raise HTTPOperationError(f'GET {url}: {res.reason}') 

5413 # NOTREACHED 

5414 

5415 

5416def getcertauthinfo(cert) -> tuple[list[str], list[str]]: 

5417 """Get information about the authority that issued `cert`. 

5418 

5419 Returns: 

5420 CA issuer URLs and OCSP responder base URLs. 

5421 """ 

5422 exts = cert.extensions.get_extension_for_class(AuthorityInformationAccess) 

5423 issuers = [] 

5424 responders = [] 

5425 for field in exts.value: 

5426 oid = field.access_method 

5427 if oid == AuthorityInformationAccessOID.CA_ISSUERS: 

5428 issuers.append(field.access_location.value) 

5429 elif oid == AuthorityInformationAccessOID.OCSP: 

5430 responders.append(field.access_location.value) 

5431 return issuers, responders 

5432 

5433 

5434def getfilesize(file: IO) -> int: 

5435 """Get the size of file-like object relative to the current position.""" 

5436 try: 

5437 pos = file.tell() 

5438 except io.UnsupportedOperation: 

5439 pos = 0 

5440 try: 

5441 size = os.fstat(file.fileno()).st_size - pos 

5442 except io.UnsupportedOperation: 

5443 size = file.seek(0, SEEK_END) - pos 

5444 file.seek(pos, SEEK_SET) 

5445 return size 

5446 

5447 

5448def isdnsname(name: str) -> bool: 

5449 """Check whether `name` is a valid DNS name. 

5450 

5451 .. seealso:: 

5452 :rfc:`1035` (sec. 2.3.1) 

5453 Domain names - Preferred name syntax 

5454 :rfc:`2181` (sec. 11) 

5455 Clarifications to the DNS Specification - Name syntax 

5456 """ 

5457 return (all(0 < len(x) <= 63 for x in name.removesuffix('.').split('.')) 

5458 and len(name) <= 253) 

5459 

5460 

5461def ishostname(name: str) -> bool: 

5462 """Check whether `name` is a valid hostname. 

5463 

5464 .. seealso:: 

5465 :rfc:`921` 

5466 Domain Name System Implementation Schedule 

5467 :rfc:`952` 

5468 Internet host table specification 

5469 :rfc:`1123` (sec. 2.1) 

5470 Host Names and Numbers 

5471 """ 

5472 return (bool(re.fullmatch(r'((?!-)[a-z0-9-]+(?<!-)\.?)+', name, re.I)) 

5473 and isdnsname(name)) 

5474 

5475 

5476def isinetaddr(addr: str) -> bool: 

5477 """Check whether `addr` is an internet address.""" 

5478 try: 

5479 ipaddress.ip_address(addr) 

5480 except ValueError: 

5481 return False 

5482 return True 

5483 

5484 

5485def nwise(iterable: Iterable, n: Any) -> Iterator[tuple]: 

5486 """Iterate over n-tuples.""" 

5487 iterator = iter(iterable) 

5488 ntuple = deque(itertools.islice(iterator, n - 1), maxlen=n) 

5489 for x in iterator: 

5490 ntuple.append(x) 

5491 yield tuple(ntuple) 

5492 

5493 

5494def randomize(elems: Sequence[T], weights: Iterable[int]) -> list[T]: 

5495 """Randomise the order of `elems`.""" 

5496 nelems = len(elems) 

5497 weights = list(weights) 

5498 indices = list(range(len(elems))) 

5499 randomized = [] 

5500 for _ in range(nelems): 

5501 i, = random.choices(indices, weights, k=1) # noqa DUO102 # nosec B311 

5502 randomized.append(elems[i]) 

5503 weights[i] = 0 

5504 return randomized 

5505 

5506 

5507def readdir(dirname: str, predicate: Optional[Callable[[str], bool]] = None) \ 

5508 -> Iterator[str]: 

5509 """Get every filename in `dirname` that matches `predicate`.""" 

5510 for dirent in os.listdir(dirname if dirname else '.'): 

5511 fname = path.join(dirname, dirent) 

5512 if predicate is None or predicate(fname): 

5513 yield fname 

5514 

5515 

5516def readnetrc(fname: Optional[str]) -> dict[str, tuple[str, str, str]]: 

5517 """Read a .netrc file. 

5518 

5519 Arguments: 

5520 fname: Filename (default: :file:`~/.netrc`) 

5521 

5522 Returns: 

5523 Mapping from hosts to login-account-password 3-tuples. 

5524 

5525 Raises: 

5526 FileNotFoundError: `fname` was given but not found. 

5527 netrc.NetrcParseError: Syntax error. 

5528 """ 

5529 try: 

5530 if fname: 

5531 return netrc.netrc(fname).hosts 

5532 with suppress(FileNotFoundError): 

5533 return netrc.netrc().hosts 

5534 except netrc.NetrcParseError as err: 

5535 if sys.version_info < (3, 10): 

5536 logging.error(err) 

5537 else: 

5538 raise 

5539 return {} 

5540 

5541 

5542def readoutput(*command: str, encoding: str = ENCODING, 

5543 logger: logging.Logger = logging.getLogger(__name__)) -> str: 

5544 """Decode and return the output of `command`. 

5545 

5546 Returns: 

5547 Decoded output or, if `command` exited with a non-zero status, 

5548 the empty string. 

5549 

5550 Raises: 

5551 subprocess.CalledProcessError: `command` exited with a status >= 127. 

5552 """ 

5553 logger.debug('exec: %s', ' '.join(command)) 

5554 try: 

5555 cp = subprocess.run(command, capture_output=True, check=True) 

5556 except subprocess.CalledProcessError as err: 

5557 if err.returncode >= 127: 

5558 raise 

5559 logger.debug('%s exited with status %d', command[0], err.returncode) 

5560 return '' 

5561 return cp.stdout.rstrip().decode(encoding) 

5562 

5563 

5564# pylint: disable=redundant-returns-doc 

5565def resolvesrv(host: str) -> Iterator[SRV]: 

5566 """Resolve a DNS SRV record. 

5567 

5568 Arguments: 

5569 host: Hostname (e.g., :samp:`_sieve._tcp.imap.foo.example`) 

5570 

5571 Returns: 

5572 An iterator over `SRV` records sorted by their priority 

5573 and randomized according to their weight. 

5574 

5575 Raises: 

5576 DNSDataError: `host` is not a valid DNS name. 

5577 DNSOperationError: Lookup error. 

5578 DNSSoftwareError: dnspython_ is not available. 

5579 

5580 .. note:: 

5581 Requires dnspython_. 

5582 """ 

5583 if not HAVE_DNSPYTHON: 

5584 raise DNSSoftwareError('dnspython unavailable') 

5585 if not isdnsname(host): 

5586 raise DNSDataError('hostname or label is too long') 

5587 try: 

5588 answer = dns.resolver.resolve(host, 'SRV') 

5589 hosts: dict[str, list] = defaultdict(list) 

5590 byprio: dict[int, list[SRV]] = defaultdict(list) 

5591 for rec in answer.response.additional: 

5592 name = rec.name.to_text() 

5593 addrs = [i.address for i in rec.items 

5594 if i.rdtype == dns.rdatatype.A] 

5595 hosts[name].extend(addrs) 

5596 for rec in answer: # type: ignore 

5597 name = rec.target.to_text() # type: ignore 

5598 priority = rec.priority # type: ignore 

5599 weight = rec.weight # type: ignore 

5600 port = rec.port # type: ignore 

5601 if addrs := hosts.get(name): # type: ignore 

5602 srvs = [SRV(priority, weight, a, port) for a in addrs] 

5603 byprio[priority].extend(srvs) 

5604 else: 

5605 srv = SRV(priority, weight, name.rstrip('.'), port) 

5606 byprio[priority].append(srv) 

5607 for prio in sorted(byprio.keys()): 

5608 srvs = byprio[prio] 

5609 weights = [s.weight for s in srvs] 

5610 if sum(weights) > 0: 

5611 yield from randomize(srvs, weights) 

5612 else: 

5613 yield from srvs 

5614 except DNSException as err: 

5615 raise DNSOperationError(str(err)) from err 

5616 

5617 

5618def yamlescape(data: str): 

5619 """Strip and quote `data` for use as YAML scalar if needed.""" 

5620 indicators = ('-', '?', ':', ',', '[', ']', '{', '}', '#', '&', 

5621 '*', '!', '|', '>', "'", '"', '%', '@', '`') 

5622 data = data.strip() 

5623 if not data: 

5624 return '""' 

5625 if data.casefold() == 'no': 

5626 return '"' + data + '"' 

5627 if data[0] in indicators or ': ' in data or ' #' in data: 

5628 return '"' + data.replace('\\', '\\\\').replace('"', '\\\"') + '"' 

5629 return data 

5630 

5631 

5632# 

5633# Main 

5634# 

5635 

5636# pylint: disable=too-many-branches, too-many-statements 

5637@SignalCaught.catch(SIGHUP, SIGINT, SIGTERM) 

5638def main() -> NoReturn: 

5639 """sievemgr - manage remote Sieve scripts 

5640 

5641 Usage: sievemgr [server] [command] [argument ...] 

5642 sievemgr -e expression [...] [server] 

5643 sievemgr -s file [server] 

5644 

5645 Options: 

5646 -C Do not overwrite existing files. 

5647 -c file Read configuration from file. 

5648 -d Enable debugging mode. 

5649 -e expression Execute expression on the server. 

5650 -f Overwrite and remove files without confirmation. 

5651 -i Confirm removing or overwriting files. 

5652 -o key=value Set configuration key to value. 

5653 -q Be quieter. 

5654 -s file Execute expressions read from file. 

5655 -v Be more verbose. 

5656 

5657 -e, -o, -q, and -v can be given multiple times. 

5658 See sievemgr(1) for the complete list. 

5659 

5660 Report bugs to: <https://github.com/odkr/sievemgr/issues> 

5661 Home page: <https://odkr.codeberg.page/sievemgr> 

5662 """ 

5663 progname = path.basename(sys.argv[0]) 

5664 logging.basicConfig(format=f'{progname}: %(message)s') 

5665 

5666 # Options 

5667 try: 

5668 opts, args = getopt(sys.argv[1:], 'CN:Vc:de:fhio:qs:v', 

5669 ['help', 'version']) 

5670 except GetoptError as err: 

5671 error('%s', err, status=2) 

5672 

5673 optconf = SieveConfig() 

5674 debug = False 

5675 exprs: list[str] = [] 

5676 configfiles: list[str] = [] 

5677 script: Optional[TextIO] = None 

5678 volume = 0 

5679 for opt, arg in opts: 

5680 try: 

5681 if opt in ('-h', '--help'): 

5682 showhelp(main) 

5683 elif opt in ('-V', '--version'): 

5684 showversion() 

5685 elif opt == '-C': 

5686 optconf.clobber = False 

5687 elif opt == '-N': 

5688 optconf.netrc = arg 

5689 elif opt == '-c': 

5690 configfiles.append(arg) 

5691 elif opt == '-d': 

5692 optconf.verbosity = LogLevel.DEBUG 

5693 elif opt == '-e': 

5694 exprs.append(arg) 

5695 elif opt == '-f': 

5696 optconf.confirm = ShellCmd.NONE 

5697 elif opt == '-i': 

5698 optconf.confirm = ShellCmd.ALL 

5699 elif opt == '-o': 

5700 optconf.parse(arg) 

5701 elif opt == '-q': 

5702 volume -= 1 

5703 elif opt == '-s': 

5704 # pylint: disable=consider-using-with 

5705 script = open(arg) 

5706 elif opt == '-v': 

5707 volume += 1 

5708 except (AttributeError, TypeError, ValueError) as err: 

5709 error('option %s: %s', opt, err, status=2) 

5710 

5711 # Arguments 

5712 url = URL.fromstr(args.pop(0)) if args else None 

5713 command = args.pop(0) if args else '' 

5714 

5715 if script: 

5716 if command: 

5717 error('-s cannot be used together with a command', status=2) 

5718 if exprs: 

5719 error('-e and -s cannot be combined', status=2) 

5720 

5721 # Configuration 

5722 try: 

5723 conf = SieveConfig().fromfiles(*configfiles) | optconf 

5724 except FileNotFoundError as err: 

5725 assert err.errno 

5726 error('%s: %s', err.filename, os.strerror(err.errno)) 

5727 except (ConfigError, SecurityError) as err: 

5728 error('%s', err) 

5729 conf.loadaccount(host=url.hostname if url else conf.host, 

5730 login=url.username if url else None) 

5731 conf |= optconf 

5732 

5733 if url: 

5734 if url.username: 

5735 conf.login = url.username 

5736 if url.password: 

5737 conf.password = url.password 

5738 if url.owner: 

5739 conf.owner = url.owner 

5740 

5741 # Logging 

5742 conf.verbosity = conf.verbosity.fromdelta(volume) 

5743 logger = logging.getLogger() 

5744 logger.setLevel(conf.verbosity) 

5745 

5746 # Infos 

5747 for line in _ABOUT.strip().splitlines(): 

5748 logging.info('%s', line) 

5749 

5750 # Shell 

5751 try: 

5752 with conf.getmanager() as mgr: 

5753 shell = conf.getshell(mgr) 

5754 if exprs: 

5755 for expr in exprs: 

5756 shell.executeline(expr) 

5757 elif script: 

5758 shell.executescript(script) 

5759 elif command: 

5760 shell.execute(command, *args) 

5761 else: 

5762 shell.enter() 

5763 except (socket.herror, socket.gaierror, ConnectionError) as err: 

5764 error('%s', err.args[1]) 

5765 except (MemoryError, ssl.SSLError) as err: 

5766 error('%s', err) 

5767 except OSError as err: 

5768 error('%s', os.strerror(err.errno) if err.errno else err) 

5769 except subprocess.CalledProcessError as err: 

5770 error('%s exited with status %d', err.cmd[0], err.returncode) 

5771 except Error as err: 

5772 if debug: 

5773 raise 

5774 error('%s', err) 

5775 sys.exit(shell.retval) 

5776 

5777 

5778def error(*args, status: int = 1, **kwargs) -> NoReturn: 

5779 """Log an err and :func:`exit <sys.exit>` with `status`. 

5780 

5781 Arguments: 

5782 args: Positional arguments for :func:`logging.error`. 

5783 status: Exit status. 

5784 kwargs: Keyword arguments for :func:`logging.error`. 

5785 """ 

5786 logging.error(*args, **kwargs) 

5787 sys.exit(status) 

5788 

5789 

5790def showhelp(func: Callable) -> NoReturn: 

5791 """Print the docstring of `func` and :func:`exit <sys.exit>`.""" 

5792 assert func.__doc__ 

5793 lines = func.__doc__.splitlines() 

5794 prefix = path.commonprefix([line for line in lines[1:-1] if line]) 

5795 nlines = len(lines) 

5796 for i, line in enumerate(lines): 

5797 text = line.removeprefix(prefix) 

5798 if not (i == nlines - 1 and not text): 

5799 print(text) 

5800 sys.exit() 

5801 

5802 

5803def showversion() -> NoReturn: 

5804 """Print :attr:`_ABOUT` and :func:`exit <sys.exit>`.""" 

5805 print(_ABOUT.strip()) 

5806 sys.exit() 

5807 

5808 

5809# 

5810# Boilerplate 

5811# 

5812 

5813logging.getLogger(__name__).addHandler(logging.NullHandler()) 

5814 

5815if __name__ == '__main__': 

5816 main()