Python module

Synopsis

from sievemgr import SieveManager

Description

class sievemgr.SieveManager(*args, backups: int = 0, checksize: int = -1, maxmemory: int = 524_288, **kwargs)[source]

Bases: SieveConn, AbstractContextManager

Connection to a ManageSieve server.

args and kwargs are passed to open() if given. Otherwise, no connection is established.

Parameters:
  • backups – How many backups to keep by default.

  • checksize – Check whether there is enough space before uploading scripts that are larger than this size in bytes. Set to a negative number to disable this check.

  • maxmemory – See max_size in tempfile.SpooledTemporaryFile.

  • args – Positional arguments for open().

  • kwargs – Keyword arguments for open().

Raises:

For example:

>>> with SieveManager('imap.foo.example') as mgr:
>>>     mgr.authenticate('user', 'password')
>>>     with open('sieve.script', newline='') as script:
>>>         mgr.putscript(script, script.name)
>>>     mgr.setactive('sieve.script')

Warning

SieveManager is not thread-safe.

authenticate(login: str, *auth, owner: str = '', sasl: Iterable[type[BaseAuth]] = (), logauth: bool = False, **kwauth)

Authenticate as login.

How the user is authenticated depends on the type of SASL mechanisms given in sasl (e.g., password-based or external).

If no mechanisms are given, authentication is attempted with every non-obsolete password-based mechanism that is supported, starting with those with better security properties and progressing to those with worse security properties.

Unrecognized arguments are passed on to SASL mechanism constructors. Password-based mechanisms require a password:

>>> mgr.authenticate('user', 'password')

By contrast, the “EXTERNAL” mechanism takes no arguments:

>>> mgr.authenticate('user', sasl=(ExternalAuth,))

If an owner is given, the scripts of that owner are managed instead of those owned by login. This requires elevated privileges.

Parameters:
  • login – User to login as (authentication ID).

  • owner – User whose scripts to manage (authorization ID).

  • sasl – SASL mechanisms (default: BasePwdAuth.getmechs()).

  • logauth – Log authentication exchange?

  • auth – Positional arguments for SASL mechanism constructors.

  • kwauth – Keyword arguments for SASL mechanism constructors.

Raises:

Note

If an owner is given, but the selected authentication mechanism does not support proxy authentication, an error is logged to the console and authentication is attempted with the next mechanism.

backupscript(script: str, keep: int = 1) str[source]

Make an Emacs-style backup of script.

keep = 0

Do nothing.

keep = 1

script is backed up as script~.

keep > 1

script is backed up as script.~n~. n starts with 1 and increments with each backup. Old backups are deleted if there are more than keep backups.

For example:

>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=0)
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=1)
>>> mgr.listscripts()
[('script.sieve', True), ('script.sieve~', False)]
>>> mgr.listscripts()
[('script.sieve', True)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True), ('script.sieve.~1~', False)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True),
 ('script.sieve.~1~', False),
 ('script.sieve.~2~', False)]
>>> mgr.backupscript('script.sieve', keep=2)
>>> mgr.listscripts()
[('script.sieve', True),
 ('script.sieve.~2~', False),
 ('script.sieve.~3~', False)]
Parameters:
  • script – Script name.

  • keep – How many backups to keep.

Returns:

Backup filename or the empty string if no backup was made.

Raises:
checkscript(script: str | IO)[source]

Check whether script is valid.

Syntax errors trigger a SieveOperationError. Semantic errors are reported in warning.

For example:

>>> checkscript('foo')
Traceback (most recent call last):
    [...]
SieveOperationError: line 1: error: expected end of command ';'
error: parse failed.
>>> checkscript('# foo')
>>>
Parameters:

script – Script (not script name).

Raises:

Important

Sieve scripts must be encoded in UTF-8.

close()

Close the client side of the connection.

Warning

Call only when the server has closed the connection.

copyscript(source: str, target: str, backups: int | None = None)[source]

Download source and re-upload it as target.

Parameters:
  • source – Source name.

  • target – Target name.

  • backups – How many backups to keep (default: backups).

Raises:
deletescript(script: str)[source]

Delete script.

Raises:
getactive() str | None[source]

Get the name of the active script.

Raises:
getscript(script: str) str[source]

Download script.

For example:

>>> with open('foo.sieve', 'w', encoding='utf8') as file:
>>>     file.write(mgr.getscript('foo.sieve'))
Parameters:

script – Script name.

Raises:
geturl() URL | None

URL of the current connection.

For example:

>>> with SieveManager('imap.foo.example') as mgr:
>>>     mgr.authenticate('user', 'password')
>>>     mgr.geturl()
'sieve://user@imap.foo.example'
havespace(script: str, size: int)[source]

Check whether there is enough space for script.

Parameters:
  • script – Script name.

  • size – Script size in bytes.

Raises:
listscripts(cached: bool = False) list[ScriptRecord][source]

List scripts and whether they are the active script.

For example:

>>> mgr.listscripts()
[('foo.sieve', False), ('bar.sieve', True)]
>>> scripts = [script for script, _ in mgr.listscripts()]
Parameters:

cached – Return cached response? [1]

Returns:

A list of script name/status tuples.

Raises:
localscripts(scripts: Iterable[str], *args, create: bool = False, **kwargs) LocalScripts[source]

Temporarily download scripts for editing.

For example:

>>> from subprocess import run
>>>
>>> with manager.localscripts(('foo', 'bar'), create=True) as local:
>>>     while local.scripts:
>>>         run(['vi'] + local.scripts, check=True)
>>>         try:
>>>             local.reupload()
>>>         except Exception as err:
>>>             print(err, file=sys.stderr)
>>>             if input('Retry? ').casefold() not in ('y', 'yes'):
>>>                 break
Parameters:
Raises:
logout()[source]

Log out.

Note

logout() should be called to close the connection unless SieveManager is used as a context manager.

Warning

Logging out is unsafe after a ProtocolError. Use shutdown() instead.

noop(tag: str | None = None) str | None[source]

Request a no-op.

For example:

>>> mgr.noop('foo')
'foo'
Parameters:

tag – String for the server to echo back.

Returns:

Server echo.

Raises:
open(host: str, port: int = 4190, source: tuple[str, int] = ('', 0), timeout: float | None = socket.getdefaulttimeout(), tls: bool = True, ocsp: bool = True)

Connect to host at port.

Parameters:
  • host – Server name or address.

  • port – Server port.

  • source – Source address and port.

  • timeout – Timeout in seconds.

  • tls – Secure the connection?

  • ocsp – Check whether the server’s TLS certificate was revoked?

Raises:
putscript(source: bytes | str | IO, target: str, backups: int | None = None)[source]

Upload source to the server as target.

The server should reject syntactically invalid scripts and may issue a warning for semantically invalid ones. Updates should be atomic.

For example:

>>> mgr.putscript('# empty', 'foo.sieve')
>>> with open('foo.sieve', newline='') as file:
>>>     mgr.putscript(file, 'foo.sieve')
Parameters:
  • source – Script (not script name).

  • target – Script name.

  • backups – How many backups to keep (default: backups).

Raises:

Important

Sieve scripts must be encoded in UTF-8.

Attention

File-like objects must be opened in binary mode or with open()’s newline argument set to the empty string.

renamescript(source: str, target: str, emulate: bool = True)[source]

Rename source to target.

Some servers do not the support the “RENAMESCRIPT” command. On such servers, renaming is emulated by downloading source, re-uploading it as target, marking target as the active script if source is the active script, and then deleting source.

For example:

>>> mgr.renamescript('foo.sieve', 'bar.sieve', emulate=False)
Parameters:
  • source – Script name.

  • target – Script name.

  • emulate – Emulate “RENAMESCRIPT” if the server does not support it?

Raises:
scriptexists(script: str, cached: bool = False) bool[source]

Check if script exists.

Parameters:
  • script – Script name.

  • cached – Return cached response? [1]

Raises:
setactive(script: str)[source]

Mark script as the active script.

Raises:
shutdown()

Shut the connection down.

Note

Use only when logging out would be unsafe.

unauthenticate()[source]

Unauthenticate.

Raises:
unsetactive()[source]

Deactivate the active script.

Raises:
classmethod validname(script: str, check: bool = False) bool[source]

Check whether script is a valid script name.

Parameters:
  • script – Script name

  • check – Raise an error if script is not a valid script name?

Raises:

ValueErrorscript is not valid. [3]

See also

RFC 5198 (sec. 2)

Definition of unicode format for network interchange.

RFC 5804 (sec. 1.6)

ManageSieve script names.

backups: int = 0

How many backups to keep.

capabilities: Capabilities | None = None

Server capabilities.

host: str | None = None

Remote address.

logger: Logger = <Logger sievemgr (WARNING)>

Logger to use.

Messages are logged with the following priorities:

Priority

Used for

logging.ERROR

Non-fatal errors

logging.INFO

State changes

logging.DEBUG

Data sent to/received from the server

Suppress logging:

>>> from logging import getLogger
>>> getLogger('sievemgr').setLevel(logging.CRITICAL)

Use a custom logger:

>>> from logging import getLogger
>>> mgr.logger = getLogger('foo').addHandler(logging.NullHandler())

Print data send to/received from the server to standard error:

>>> from logging import getLogger
>>> getLogger('sievemgr').setLevel(logging.DEBUG)
>>> mgr.listscripts()
C: LISTSCRIPTS
S: "foo.sieve" ACTIVE
S: "bar.sieve"
S: "baz.sieve"
S: OK "Listscripts completed"
(Response(response=Atom('OK'), code=(), message=None),
 [('foo.sieve', True), ('bar.sieve', False), ('baz.sieve', False)])
login: str = ''

Login name (authentication ID).

owner: str = ''

User whose scripts are managed (authorization ID).

port: int | None = None

Remote port.

sock: socket | None = None

Underlying socket.

property timeout: float | None

Connection timeout in seconds.

Set timeout to 500 ms:

>>> mgr.timeout = 0.5

Note

The timeout can only be set while a connection is open.

property tls: str | None

TLS version.

tlscontext: SSLContext = <ssl.SSLContext object>

Settings for negotiating Transport Layer Security (TLS).

Disable workarounds for broken X.509 certificates:

>>> with SieveManager() as mgr:
>>>     mgr.tlscontext.verify_flags |= ssl.VERIFY_X509_STRICT
>>>     mgr.open('imap.foo.example')
>>>     ...

Load client certificate/key pair:

>>> with SieveManager() as mgr:
>>>     mgr.tlscontext.load_cert_chain(cert='cert.pem')
>>>     mgr.open('imap.foo.example')
>>>     ...

Use a custom certificate authority:

>>> with SieveManager() as mgr:
>>>     mgr.tlscontext.load_verify_locations(cafile='ca.pem')
>>>     mgr.open('imap.foo.example')
>>>     ...
warning: str | None = None

Warning issued in response to the last “CHECKSCRIPT” or “PUTSCRIPT”.

For example:

>>> with open('script.sieve', 'br') as file:
>>>     mgr.execute('putscript', file, 'script.sieve')
(Response(response=Atom('OK'), code=('warnings,'),
 message='line 7: may need to be frobnicated'), [])
>>> mgr.warning
'line 7: may need to be frobnicated'

See also

RFC 5804 (sec. 1.3)

ManageSieve “WARNINGS” response code.

class sievemgr.Capabilities(implementation: str | None = None, sieve: tuple[str, ...] = (), language: str | None = None, maxredirects: int | None = None, notify: tuple[str, ...] = (), owner: str = '', sasl: tuple[str, ...] = (), starttls: bool = False, unauthenticate: bool = False, version: str | None = None, notunderstood: dict = <factory>)[source]

Bases: object

Server capabilities.

classmethod fromlines(lines: Iterable[Line]) CapabilitiesT[source]

Create a Capabilities object from a server response.

implementation: str | None = None

Server application (e.g. “Dovecot Pigeonhole”).

language: str | None = None

RFC 5646 tag of the natural language used for messages.

maxredirects: int | None = None

Maximum redirects per operation.

notify: tuple[str, ...] = ()

URI schemas for supported notification methods (e.g., “mailto”).

notunderstood: dict

Capabilities not understood by SieveManager.

owner: str = ''

Canonical name of the user whose scripts are managed.

sasl: tuple[str, ...] = ()

Supported authentication methods (e.g., “plain”, “login”).

sieve: tuple[str, ...] = ()

Supported Sieve modules (e.g., “fileinto”, “date”, …).

starttls: bool = False

Is “STARTTLS” available?

unauthenticate: bool = False

Is “UNAUTHENTICATE” available?

version: str | None = None

ManageSieve protocol version (e.g., “1.0”).

class sievemgr.LocalScripts(manager: SieveManager, scripts: Iterable[str], *args, create: bool = False, **kwargs)[source]

Bases: TemporaryDirectory

Temporarily download scripts for local editing.

For example:

>>> from subprocess import run
>>>
>>> def compare(manager: SieveManager, script1: str, script2: str) -> bool:
>>>     with LocalScripts(manager, (script1, script2)) as local:
>>>         cp = run(['cmp', '-s'] + local.scripts, check=False)
>>>     return cp.returncode == 0
Parameters:
Raises:
reupload()[source]

Re-upload scripts that have been changed.

Files are removed from scripts after having been re-uploaded.

For example:

>>> from subprocess import run
>>>
>>> with LocalScripts(manager, ('foo', 'bar'), create=True) as local:
>>>     while local.scripts:
>>>         run(['vi'] + local.scripts, check=True)
>>>         try:
>>>             local.reupload()
>>>         except Exception as err:
>>>             print(err, file=sys.stderr)
>>>             if input('Retry? ').casefold() not in ('y', 'yes')
>>>                 break
Raises:
scripts: list[str] = []

Local filenames.

class sievemgr.Response(response: Atom, code: tuple[Word, ...] = (), message: str | None = None)[source]

Bases: object

Server response to a command.

See also

RFC 5804 (secs. 1.2, 1.3, 4, 6.4, and passim)

ManageSieve responses

__str__() str[source]

message or, if no message was returned, a stub message.

classmethod fromline(line: Line) ResponseT[source]

Create a Response object from a Line.

matches(*categories: str) bool[source]

Check if code matches any of the given categories.

Returns False if code is empty. Matching is case-insensitive.

For example:

>>> with open('script.sieve', newline='') as script:
>>>     try:
>>>         mgr.putscript(script, script.name)
>>>     except SieveOperationError as err:
>>>         if err.matches('QUOTA'):
>>>             print('over quota')

Print more informative messages:

>>> with open('script.sieve', newline='') as script:
>>>     try:
>>>         mgr.putscript(script, script.name)
>>>     except SieveOperationError as err:
>>>         if err.matches('QUOTA/MAXSCRIPTS'):
>>>             print('too many scripts')
>>>         elif err.matches('QUOTA/MAXSIZE'):
>>>             print(f'{script.name} is too large')
>>>         elif err.matches('QUOTA'):
>>>             print('over quota')
toerror() SieveError[source]

Convert a Response into an error.

code: tuple[Word, ...] = ()

Response code.

ManageSieve response codes are lists of categories, separated by slashes (“/”), where each category is the super-category of the next (e.g., “quota/maxsize”).

Some response codes carry data (e.g., TAG "SYNC-123").

See RFC 5804 (sec. 1.3) for a list of response codes.

Warning

Servers need not return response codes.

message: str | None = None

Human-readable message.

Warning

Servers need not return a message.

response: Atom

‘OK’, ‘NO’, or ‘BYE’.

Response

Meaning

‘OK’

Success

‘NO’

Failure

‘BYE’

Connection closed by server

class sievemgr.URL(hostname: str, scheme: str = 'sieve', username: str | None = None, password: str | None = None, port: int | None = None, owner: str | None = None, scriptname: str | None = None)[source]

Bases: object

Sieve URL.

See also

RFC 5804 (sec. 3)

Sieve URL Scheme

__str__()[source]

String representation of the URL.

classmethod fromstr(url: str) URLT[source]

Create a URL object from a URL string.

For example:

>>> URL.fromstr('sieve://user@imap.foo.example')
URL(hostname='imap.foo.example', scheme='sieve', username='user',
    password=None, port=None, owner=None, scriptname=None)
Raises:

ValueError – Not a valid Sieve URL.

class sievemgr.Atom[source]

Bases: str

ManageSieve keyword (e.g., LISTSCRIPTS, OK).

sievemgr.Line

List of Word-s.

sievemgr.Word

Alias for Atom, None, int, str, and Line.

SASL

Note

You need not read this section unless you want to implement an authentication mechanism.

        sequenceDiagram
    participant auth as authenticate()
    participant mech as :SASL mechanism
    participant conn as :SASL adapter
    auth ->> mech: .__init__(conn, authcid, authzid, ...)
    activate auth
    activate mech
    mech ->> mech: Prepare credentials
    break Encoding invalid
        mech --) auth: ValueError
    end
    deactivate auth
    deactivate mech
    auth ->> mech: .__call__()
    activate auth
    activate mech
    mech ->> conn: .begin(name, ...)
    activate conn
    deactivate conn
    loop SASL exchange
        mech ->> conn: .receive()
        activate conn
        conn --) mech: Message
        deactivate conn
        mech ->> conn: .send(message)
        activate conn
        deactivate conn
    end
    mech ->> conn: .end()
    activate conn
    break Authentication failed
        conn --) auth: OperationError
    end
    conn --) mech: Capabilities
    deactivate conn
    mech --) auth: Capabilities
    deactivate mech
    auth ->> mech: .authcid
    mech --) auth: Authentication ID
    auth ->> mech: .authzid
    mech --) auth: Authorisation ID
    deactivate auth
    

Authentication architecture

class sievemgr.BaseAuth(adapter: BaseSASLAdapter, authcid: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: ABC

Base class for authentication mechanisms.

The ManageSieve “AUTHENTICATE” command performs a Simple Authentication and Security Layer (SASL) exchange. The SASL protocol defines multiple authentication mechanisms, and SieveManager.authenticate() delegates the SASL exchange to classes that implement such mechanisms. Such classes must subclass BaseAuth and have a name attribute that declares which mechanism they implement.

Credentials must be prepared in __init__. Subclasses should pass their connection, authcid, authzid, and prepare arguments on to BaseAuth.__init__() and then use BaseAuth.prepare() to prepare the remaining credentials.

For example:

def __init__(self, connection: BaseSASLAdapter,
             authcid: str, password: str, authzid: str = '',
             prepare: SASLPrep = SASLPrep.ALL):
    """Prepare authentication.

    `authcid`, `authzid`, and `password` are prepared according to
    :rfc:`3454` and :rfc:`4013` if requested in `prepare`.

    Arguments:
        conn: Connection over which to authenticate.
        authcid: Authentication ID (user to login as).
        password: Password.
        authzid: Authorization ID (user whose rights to acquire).
        prepare: Which credentials to prepare.

    Raises:
        ValueError: Bad characters in username or password.
    """
    super().__init__(connection, authcid, authzid, prepare)
    prepare &= SASLPrep.PASSWORDS   # type: ignore[assignment]
    self.password = self.prepare(password) if prepare else password

Starting and ending the SASL exchange should be left to BaseAuth(). The SASL exchange itself should be implemented in exchange() by using send() and receive(). For example:

def exchange(self):
    credentials = '\0'.join((self.authzid, self.authcid, self.password))
    self.send(credentials.encode('utf8'))

That said, BaseAuth’s methods can be overriden. For example:

class ExternalAuth(BaseAuth):
    """EXTERNAL authentication.

    .. seealso::
        :rfc:`4422` (App. A)
            Definition of the EXTERNAL mechanism.
    """

    def __call__(self):
        """Authenticate."""
        args = (self.authzid.encode('utf8'),) if self.authzid else ()
        self.begin(*args)
        self.receive()
        self.send(b'')
        self.end()

    def exchange(self):
        """No-op."""

    name = 'EXTERNAL'

See also

BaseSASLAdapter

Abstract base class for sending and receiving SASL messages.

RFC 4422

Simple Authentication and Security Layer (SASL)

RFC 5804 (sec. 2.1)

ManageSieve “AUTHENTICATE” command

__call__() Any | None[source]

Authenticate as authcid.

authcid is authorized as authzid if authzid is set (proxy authentication).

Returns:

Data returned by the server, if any.

Raises:

Note

Calls exchange() and end().

__init__(adapter: BaseSASLAdapter, authcid: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Prepare authentication.

authcid and authzid are prepared according to RFC 3454 and RFC 4013 if requested in prepare.

Parameters:
  • conn – Connection over which to authenticate.

  • authcid – Authentication ID (user to login as).

  • authzid – Authorization ID (user whose rights to acquire).

  • prepare – Which credentials to prepare.

Raises:

ValueError – Bad characters in username.

abort()[source]

Abort authentication.

Raises:

ProtocolError – Protocol violation.

begin(data: bytes | None = None)[source]

Begin authentication.

Parameters:

data – Optional client-first message.

Raises:
end()[source]

Conclude authentication.

Raises:
abstract exchange()[source]

Exchange SASL messages.

classmethod getmechs(sort: bool = True, obsolete: bool = False) list[type[BaseAuthT]][source]

Get authentication classes that subclass this class.

Parameters:
  • sort – Sort mechanisms by order?

  • obsolete – Return obsolete mechanisms?

static prepare(string: str) str[source]

Prepare string according to RFC 3454 and RFC 4013.

Returns:

Prepared string.

Raises:

ValueErrorString is malformed.

See also

RFC 3454

Preparation of Internationalized Strings

RFC 4013

Stringprep Profile for User Names and Passwords

receive() bytes[source]

Receive and decode an SASL message.

Raises:

Note

Calls begin() if needed.

send(data: bytes)[source]

Encode and send an SASL message.

Raises:

Note

Calls begin() if needed.

authcid: str

Authentication ID (user to login as).

authzid: str = ''

Authorization ID (user whose rights to acquire).

name: ClassVar[str]

Mechanism name.

obsolete: bool = False

Is this mechanism obsolete?

order: int = 0

Mechanism precedence.

property sock: socket | SSLSocket

Underlying socket.

class sievemgr.BaseSASLAdapter[source]

Bases: ABC

Abstract base class for sending and receiving SASL messages.

SASL messages must be translated to the underlying protocol. This class defines the types of messages that may occur in an SASL protocol exchange. Classes that translate between SASL and the underlying protocol must subclass this class.

See also

BaseAuth

Abstract base class for SASL mechanisms.

RFC 4422

Simple Authentication and Security Layer (SASL)

abstract abort()[source]

Abort authentication.

Raises:

ProtocolError – Protocol violation.

abstract begin(name: str, data: bytes | None = None)[source]

Begin authentication.

Parameters:
  • name – SASL mechanism name.

  • data – Optional client-first message.

Raises:
abstract end()[source]

Conclude authentication.

Raises:
abstract receive() bytes[source]

Receive and decode an SASL message.

Raises:
abstract send(data: bytes)[source]

Encode and send an SASL message.

Raises:
abstract property sock: socket | SSLSocket

Underlying socket.

class sievemgr.BasePwdAuth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseAuth, ABC

Base class for password-based authentication mechanisms.

Prepares credentials, so that subclasses need only implement exchange().

For example:

class PlainAuth(BasePwdAuth):
    """PLAIN authentication.

    .. seealso::
        :rfc:`4616`
            PLAIN authentication mechanism.
    """

    def exchange(self):
        credentials = '\0'.join((self.authzid, self.authcid, self.password))
        self.send(credentials.encode('utf8'))

    name = 'PLAIN'
__init__(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Prepare authentication.

authcid, authzid, and password are prepared according to RFC 3454 and RFC 4013 if requested in prepare.

Parameters:
  • conn – Connection over which to authenticate.

  • authcid – Authentication ID (user to login as).

  • password – Password.

  • authzid – Authorization ID (user whose rights to acquire).

  • prepare – Which credentials to prepare.

Raises:

ValueError – Bad characters in username or password.

password: str

Password.

class sievemgr.BaseScramAuth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BasePwdAuth, ABC

Base class for SCRAM authentication mechanisms.

Implements exchange(), so that subclasses need only define a digest. For example:

class ScramSHA1Auth(BaseScramAuth):
    """SCRAM-SHA-1 authentication."""

    @property
    def digest(self) -> str:
        return 'sha1'

    name = 'SCRAM-SHA-1'
    order = -10

See also

RFC 5802

Salted Challenge Response Authentication Mechanism (SCRAM).

RFC 7677

SCRAM-SHA-256 and SCRAM-SHA-256-PLUS.

https://datatracker.ietf.org/doc/html/draft-melnikov-scram-bis

Updated recommendations for implementing SCRAM.

https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha-512

SCRAM-SHA-512 and SCRAM-SHA-512-PLUS.

https://datatracker.ietf.org/doc/html/draft-melnikov-scram-sha3-512

SCRAM-SHA3-512 and SCRAM-SHA3-512-PLUS.

https://csb.stevekerrison.com/post/2022-01-channel-binding

Discussion of TLS channel binding.

https://csb.stevekerrison.com/post/2022-05-scram-detail

Discussion of SCRAM.

exchange()[source]

Exchange SASL messages.

cbdata: bytes = b''

TLS channel-binding data.

cbtype: str = ''

TLS channel-binding type.

abstract property digest: str

Digest name as used by hashlib and hmac.

noncelen: int = 18

Client nonce length in bytes.

class sievemgr.BaseScramPlusAuth(*args, **kwargs)[source]

Bases: BaseScramAuth, ABC

Base class for SCRAM mechanisms with channel binding.

For example:

class ScramSHA1PlusAuth(BaseScramPlusAuth, ScramSHA1Auth):
    """SCRAM-SHA-1-PLUS authentication."""
    name = 'SCRAM-SHA-1-PLUS'
    order = -1000
class sievemgr.AuthzUnsupportedMixin(*args, **kwargs)[source]

Bases: object

Mixin for SASL mechanisms that do not support authorization.

For example:

Raises:

SASLCapabilityErrorauthzid is set.

class sievemgr.CramMD5Auth(*args, **kwargs)[source]

Bases: AuthzUnsupportedMixin, BasePwdAuth

CRAM-MD5 authentication.

See also

RFC 2195 (sec. 2)

Definition of CRAM-MD5.

name: ClassVar[str] = 'CRAM-MD5'

Mechanism name.

obsolete: bool = True

Is this mechanism obsolete?

class sievemgr.ExternalAuth(adapter: BaseSASLAdapter, authcid: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseAuth

EXTERNAL authentication.

See also

RFC 4422 (App. A)

Definition of the EXTERNAL mechanism.

name: ClassVar[str] = 'EXTERNAL'

Mechanism name.

class sievemgr.LoginAuth(*args, **kwargs)[source]

Bases: AuthzUnsupportedMixin, BasePwdAuth

LOGIN authentication.

Parameters:
  • conn – Connection over which to authenticate.

  • authcid – Authentication ID (user to login as).

  • password – Password.

  • authzid – Authorization ID (user whose rights to acquire).

  • prepare – Which credentials to prepare.

Raises:

ValueError – Password contains CR, LF, or NUL.

See also

https://datatracker.ietf.org/doc/draft-murchison-sasl-login

Definition of the LOGIN mechanism.

name: ClassVar[str] = 'LOGIN'

Mechanism name.

obsolete: bool = True

Is this mechanism obsolete?

class sievemgr.PlainAuth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BasePwdAuth

PLAIN authentication.

See also

RFC 4616

PLAIN authentication mechanism.

name: ClassVar[str] = 'PLAIN'

Mechanism name.

class sievemgr.ScramSHA1Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-1 authentication.

property digest: str

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA-1'

Mechanism name.

order: int = -10

Mechanism precedence.

class sievemgr.ScramSHA1PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA1Auth

SCRAM-SHA-1-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA-1-PLUS'

Mechanism name.

order: int = -1000

Mechanism precedence.

class sievemgr.ScramSHA224Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-224 authentication.

property digest: str

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA-224'

Mechanism name.

order: int = -20

Mechanism precedence.

class sievemgr.ScramSHA224PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA224Auth

SCRAM-SHA-224-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA-224-PLUS'

Mechanism name.

order: int = -2000

Mechanism precedence.

class sievemgr.ScramSHA256Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-256 authentication.

property digest: str

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA-256'

Mechanism name.

order: int = -30

Mechanism precedence.

class sievemgr.ScramSHA256PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA256Auth

SCRAM-SHA-256-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA-256-PLUS'

Mechanism name.

order: int = -3000

Mechanism precedence.

class sievemgr.ScramSHA384Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-384 authentication.

property digest: str

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA-384'

Mechanism name.

order: int = -40

Mechanism precedence.

class sievemgr.ScramSHA384PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA384Auth

SCRAM-SHA-384-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA-384-PLUS'

Mechanism name.

order: int = -4000

Mechanism precedence.

class sievemgr.ScramSHA512Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-512 authentication.

property digest

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA-512'

Mechanism name.

order: int = -50

Mechanism precedence.

class sievemgr.ScramSHA512PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA512Auth

SCRAM-SHA-512-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA-512-PLUS'

Mechanism name.

order: int = -5000

Mechanism precedence.

class sievemgr.ScramSHA3_512Auth(connection: BaseSASLAdapter, authcid: str, password: str, authzid: str = '', prepare: SASLPrep = SASLPrep.ALL)[source]

Bases: BaseScramAuth

SCRAM-SHA-512 authentication.

property digest

Digest name as used by hashlib and hmac.

name: ClassVar[str] = 'SCRAM-SHA3-512'

Mechanism name.

order: int = -60

Mechanism precedence.

class sievemgr.ScramSHA3_512PlusAuth(*args, **kwargs)[source]

Bases: BaseScramPlusAuth, ScramSHA3_512Auth

SCRAM-SHA-512-PLUS authentication.

name: ClassVar[str] = 'SCRAM-SHA3-512-PLUS'

Mechanism name.

order: int = -6000

Mechanism precedence.

enum sievemgr.SASLPrep(value)[source]

Bases: IntEnum

Controls which strings are prepared for authentication.

See also

RFC 3454

Preparation of Internationalized Strings

RFC 4013

Stringprep Profile for User Names and Passwords

RFC 4422 (sec. 4)

SASL protocol requirements

Member Type:

int

Valid values are as follows:

NONE = <SASLPrep.NONE: 0>
USERNAMES = <SASLPrep.USERNAMES: 1>
PASSWORDS = <SASLPrep.PASSWORDS: 2>
ALL = <SASLPrep.ALL: 3>

ERRORS

exception sievemgr.Error[source]

Bases: Exception

Base class for errors.

exception sievemgr.CapabilityError[source]

Bases: Error

Base class for capability errors.

exception sievemgr.ConfigError[source]

Bases: Error

Base class for configuration errors.

exception sievemgr.DataError[source]

Bases: Error

Base class for data errors.

exception sievemgr.OperationError[source]

Bases: Error

Base class for operation errors.

exception sievemgr.ProtocolError[source]

Bases: Error

Base class for protocol errors.

Danger

Continuing after a ProtocolError leads to undefined behavior.

exception sievemgr.SecurityError[source]

Bases: Error

Base class for security errors.

Danger

Continuing after a SecurityError compromises transmitted data.

exception sievemgr.SoftwareError[source]

Bases: Error

Base class for software errors (i.e., bugs).

Danger

Continuing after a SoftwareError leads to undefined behavior.

exception sievemgr.UsageError[source]

Bases: Error

Base class for usage errors.

exception sievemgr.ClientError[source]

Bases: Error

Base class for client errors.

exception sievemgr.ClientConfigError[source]

Bases: ClientError, ConfigError

Client configuration error.

exception sievemgr.ClientConnectionError[source]

Bases: ClientError, ConnectionError

Client-side connection error.

exception sievemgr.ClientOperationError[source]

Bases: ClientError, OperationError

Client-side operation error.

exception sievemgr.ClientSecurityError[source]

Bases: ClientError, SecurityError

Client security error.

exception sievemgr.ClientSoftwareError[source]

Bases: ClientError, SecurityError

Client software error (i.e., bug).

exception sievemgr.OCSPError[source]

Bases: Error

Base class for OCSP errors.

exception sievemgr.OCSPDataError[source]

Bases: OCSPError, DataError

OCSP data error.

exception sievemgr.OCSPOperationError[source]

Bases: OCSPError, OperationError

OCSP operation error.

exception sievemgr.SASLError[source]

Bases: Error

Base class for SASL errors.

exception sievemgr.SASLCapabilityError[source]

Bases: Error

SASL capability error.

exception sievemgr.SASLProtocolError[source]

Bases: SASLError, ProtocolError

Server violated the SASL protocol.

exception sievemgr.SASLSecurityError[source]

Bases: SASLError, SecurityError

SASL security error.

exception sievemgr.SieveError[source]

Bases: Error

Base class for ManageSieve errors.

exception sievemgr.SieveCapabilityError[source]

Bases: SieveError, CapabilityError

Capability not supported by the server.

exception sievemgr.SieveConnectionError(response: Atom = Atom('BYE'), code: tuple[Word, ...] = (), message: str | None = None)[source]

Bases: Response, SieveError, ConnectionError

Server said “BYE”.

exception sievemgr.SieveOperationError(response: Atom = Atom('NO'), code: tuple[Word, ...] = (), message: str | None = None)[source]

Bases: Response, SieveError, OperationError

Server said “NO”.

exception sievemgr.SieveProtocolError[source]

Bases: SieveError, ProtocolError

Server violated the ManageSieve protocol error.

exception sievemgr.TLSError[source]

Bases: Error

Base class for TLS errors.

exception sievemgr.TLSCapabilityError[source]

Bases: TLSError, CapabilityError

TLS capability error.

exception sievemgr.TLSSecurityError[source]

Bases: TLSError, SecurityError

TLS security error.

Example

Patch the active Sieve script of every user:

from contextlib import suppress
from sievemgr import SieveManager, ExternalAuth
from subprocess import CalledProcessError, run

with SieveManager('imap.host.example') as mgr:
    for user in users:
        try:
            mgr.authenticate('admin', owner=user, sasl=(ExternalAuth,))
            with mgr.localscripts((mgr.getactive(),)) as local:
                activescript, = local.scripts
                run(['patch', activescript, patchfile], check=True)
        except Exception as err:
            print(err, file=sys.stderr)
            continue
        finally:
            with suppress(SieveOperationError):
                mgr.unauthenticate()

Security

Connections are secured with Transport Layer Security (TLS) by default. TLS should not be disabled.

Credentials are stored in memory so that they need not be entered again in case of a referral. However, because page-locking is unfeasible in Python, they may be swapped out to the disk.

Privacy

Checking whether a server’s certificate has been revoked using OCSP enables the certificate issuer to infer that the server is accessed from your internet address.