Gateway Class

Overview

The Gateway class provides the main interface for all Keychain operations in Python. It wraps the underlying C library and provides high-level methods for persona management, encryption, signing, and verification operations.

Package: keychain.gateway

from keychain.gateway import Gateway

Class Definition

class Gateway:
    """Main interface for keychain operations."""

Constructor

Gateway(settings)

def __init__(self, settings: Settings) -> None

Initialize a Gateway instance with the provided settings.

Parameters:

  • settings (Settings) - Settings object obtained from Gateway.init()

Example:

from keychain.gateway import Gateway
from keychain.utils.settings import Settings

# Initialize keychain
settings = Gateway.init("config.json")

# Create gateway instance
gateway = Gateway(settings)

Static Methods

init(config_path)

@staticmethod
def init(config_path: str) -> Settings

Initialize the Keychain library and return a settings object.

Parameters:

  • config_path (str) - Path to the configuration file

Returns: Settings object for gateway construction

Example:

settings = Gateway.init("/path/to/keychain.config")
gateway = Gateway(settings)

close()

@staticmethod
def close() -> None

Clean up static library resources. Call at application shutdown.

Example:

# At application shutdown
Gateway.close()

hash(clear_text)

@staticmethod
def hash(clear_text: Union[str, bytes]) -> str

Calculate SHA-256 hash of input data.

Parameters:

  • clear_text (str or bytes) - Data to hash

Returns: Hexadecimal hash string

Example:

hash_result = Gateway.hash("Hello, World!")
print(f"Hash: {hash_result}")

generate_signature_by_private_key(cleartext, private_key_hex, algorithm)

@staticmethod
def generate_signature_by_private_key(
    cleartext: Union[str, bytes],
    private_key_hex: str,
    algorithm: SignatureScheme
) -> str

Generate a signature using an external private key.

Parameters:

  • cleartext (str or bytes) - Data to sign

  • private_key_hex (str) - Private key in hexadecimal format

  • algorithm (SignatureScheme) - Signature algorithm to use

Returns: Signature in hexadecimal format

verify_raw_data(cleartext, public_key_hex, signature_hex, algorithm)

@staticmethod
def verify_raw_data(
    cleartext: Union[str, bytes],
    public_key_hex: str,
    signature_hex: str,
    algorithm: SignatureScheme
) -> bool

Verify a signature against a public key without persona context.

Parameters:

  • cleartext (str or bytes) - Original signed data

  • public_key_hex (str) - Public key in hexadecimal format

  • signature_hex (str) - Signature in hexadecimal format

  • algorithm (SignatureScheme) - Signature algorithm used

Returns: True if signature is valid, False otherwise

Persona Management Methods

create_persona(name, subname, security_level, auto_renew)

def create_persona(
    self,
    name: str,
    subname: str,
    security_level: SecurityLevel,
    auto_renew: bool
) -> Persona

Create a new persona with default cryptographic parameters.

Parameters:

  • name (str) - Primary name for the persona

  • subname (str) - Secondary name for the persona

  • security_level (SecurityLevel) - Security level (LIGHTWEIGHT, LOW, MEDIUM, HIGH, ULTRA)

  • auto_renew (bool) - Whether to automatically renew certificates

Returns: New Persona object

Example:

from keychain.constants import SecurityLevel

persona = gateway.create_persona(
    name="alice",
    subname="company.com",
    security_level=SecurityLevel.HIGH,
    auto_renew=True
)

create_persona_from_algorithm_classes(name, subname, security_level, auto_renew, encryption_algorithm, signature_algorithm, cipher)

def create_persona_from_algorithm_classes(
    self,
    name: str,
    subname: str,
    security_level: SecurityLevel,
    auto_renew: bool,
    encryption_algorithm: EncryptionScheme,
    signature_algorithm: SignatureScheme,
    cipher: Cipher
) -> Persona

Create a new persona with specified algorithm classes.

Parameters:

  • name (str) - Primary name for the persona

  • subname (str) - Secondary name for the persona

  • security_level (SecurityLevel) - Security level

  • auto_renew (bool) - Whether to automatically renew certificates

  • encryption_algorithm (EncryptionScheme) - Encryption algorithm

  • signature_algorithm (SignatureScheme) - Signature algorithm

  • cipher (Cipher) - Symmetric cipher algorithm

Returns: New Persona object

find_persona(name, subname)

def find_persona(self, name: str, subname: str) -> Persona

Find an existing persona by name and subname.

Parameters:

  • name (str) - Primary name to search for

  • subname (str) - Secondary name to search for

Returns: Persona object if found

Raises: KeychainNotFoundError if persona not found

Example:

try:
    persona = gateway.find_persona("alice", "company.com")
    print(f"Found persona: {persona.name}.{persona.subname}")
except KeychainNotFoundError:
    print("Persona not found")

retrieve_personas()

def retrieve_personas(self) -> List[Persona]

Retrieve all personas from the local cache.

Returns: List of Persona objects

Example:

personas = gateway.retrieve_personas()
print(f"Found {len(personas)} personas")

for persona in personas:
    print(f"  {persona.name}.{persona.subname}")

add_contact(persona, name, subname, did)

def add_contact(
    self,
    persona: Persona,
    name: str,
    subname: str,
    did: PersonaDID
) -> Contact

Add a new contact to a persona.

Parameters:

  • persona (Persona) - Persona to add contact to

  • name (str) - Contact’s primary name

  • subname (str) - Contact’s secondary name

  • did (PersonaDID) - Contact’s decentralized identifier

Returns: New Contact object

Example:

# Assuming bob_did is a PersonaDID object
contact = gateway.add_contact(
    persona=alice_persona,
    name="bob",
    subname="company.com",
    did=bob_did
)

refresh_persona(persona)

def refresh_persona(self, persona: Persona) -> None

Refresh persona data from the database.

Parameters:

  • persona (Persona) - Persona to refresh

set_auto_renew(persona, auto_renew)

def set_auto_renew(self, persona: Persona, auto_renew: bool) -> None

Configure automatic certificate renewal for a persona.

Parameters:

  • persona (Persona) - Persona to configure

  • auto_renew (bool) - Whether to enable auto renewal

renew_certificate(persona)

def renew_certificate(self, persona: Persona) -> None

Manually initiate certificate renewal for a persona.

Parameters:

  • persona (Persona) - Persona to renew

mature_persona_exists()

def mature_persona_exists(self) -> bool

Check if any mature persona exists in the gateway.

Returns: True if at least one mature persona exists

Cryptographic Operations

encrypt(persona, clear_text, recipients)

def encrypt(
    self,
    persona: Persona,
    clear_text: Union[str, bytes],
    recipients: List[Contact]
) -> EncryptedData

Encrypt data for specified recipients using envelope encryption.

Parameters:

  • persona (Persona) - Encrypting persona

  • clear_text (str or bytes) - Data to encrypt

  • recipients (List[Contact]) - List of recipient contacts

Returns: EncryptedData object

Example:

# Encrypt message for multiple recipients
encrypted = gateway.encrypt(
    persona=alice_persona,
    clear_text="Confidential message",
    recipients=[bob_contact, charlie_contact]
)

decrypt(persona, ciphertext)

def decrypt(
    self,
    persona: Persona,
    ciphertext: EncryptedData
) -> Tuple[Union[str, bytes], CharEncoding]

Decrypt encrypted data using the persona’s private key.

Parameters:

  • persona (Persona) - Decrypting persona

  • ciphertext (EncryptedData) - Encrypted data to decrypt

Returns: Tuple of (decrypted data, character encoding)

Example:

decrypted_data, encoding = gateway.decrypt(bob_persona, encrypted)
print(f"Decrypted: {decrypted_data}")

add_decrypt_access(encrypted_data, persona, new_recipient)

def add_decrypt_access(
    self,
    encrypted_data: EncryptedData,
    persona: Persona,
    new_recipient: Contact
) -> EncryptedData

Grant decrypt access to an additional recipient.

Parameters:

  • encrypted_data (EncryptedData) - Existing encrypted data

  • persona (Persona) - Persona granting access (must have decrypt access)

  • new_recipient (Contact) - New recipient to grant access to

Returns: Updated EncryptedData object

sign(persona, cleartext, approval, tags, variables)

def sign(
    self,
    persona: Persona,
    cleartext: SerializedData,
    approval: bool = False,
    tags: Optional[TagSet] = None,
    variables: Optional[TagSet] = None
) -> VerifiableData

Create verifiable (signed) data using the persona’s private key.

Parameters:

  • persona (Persona) - Signing persona

  • cleartext (SerializedData) - Data to sign

  • approval (bool) - Whether this is an approval signature

  • tags (TagSet, optional) - Metadata tags

  • variables (TagSet, optional) - Variable metadata

Returns: VerifiableData object

Example:

from keychain.core.serialized_data import SerializedData
from keychain.constants import DataType

# Create serialized data
data = SerializedData.from_string("Document content", DataType.UTF8_STRING)

# Sign the data
signed_data = gateway.sign(
    persona=alice_persona,
    cleartext=data,
    approval=True
)

generate_signature_by_persona(persona, cleartext)

def generate_signature_by_persona(
    self,
    persona: Persona,
    cleartext: Union[str, bytes]
) -> str

Generate a raw signature using a persona’s private key.

Parameters:

  • persona (Persona) - Signing persona

  • cleartext (str or bytes) - Data to sign

Returns: Signature in hexadecimal format

create_credential(persona, credential_id, credential_type, subject_id, start_timestamp, end_timestamp, claims, tags, variables)

def create_credential(
    self,
    persona: Persona,
    credential_id: str,
    credential_type: str,
    subject_id: str,
    start_timestamp: int,
    end_timestamp: int,
    claims: TagSet,
    tags: Optional[TagSet] = None,
    variables: Optional[TagSet] = None
) -> Credential

Create and sign a W3C verifiable credential.

Parameters:

  • persona (Persona) - Issuing persona

  • credential_id (str) - Unique credential identifier

  • credential_type (str) - Type of credential

  • subject_id (str) - Subject’s identifier

  • start_timestamp (int) - Validity start time (milliseconds since epoch)

  • end_timestamp (int) - Validity end time (milliseconds since epoch)

  • claims (TagSet) - Credential claims

  • tags (TagSet, optional) - Metadata tags

  • variables (TagSet, optional) - Variable metadata

Returns: Credential object

Example:

import time
from keychain.core.tag_set import TagSet

# Create claims
claims = TagSet()
claims.set_tag_value("", "name", "Alice Smith")
claims.set_tag_value("", "role", "Senior Developer")
claims.set_tag_value("", "department", "Engineering")

# Create credential (valid for 1 year)
now = int(time.time() * 1000)
one_year = now + (365 * 24 * 60 * 60 * 1000)

credential = gateway.create_credential(
    persona=hr_persona,
    credential_id="emp-12345",
    credential_type="EmployeeCredential",
    subject_id="did:keychain:alice.company.com",
    start_timestamp=now,
    end_timestamp=one_year,
    claims=claims
)

create_transaction(persona, consensus_algorithm, quorum, tags, base_variables)

def create_transaction(
    self,
    persona: Persona,
    consensus_algorithm: ConsensusAlgorithm,
    quorum: TagSet,
    tags: Optional[TagSet] = None,
    base_variables: Optional[TagSet] = None
) -> Transaction

Create and sign a consensus transaction.

Parameters:

  • persona (Persona) - Creating persona

  • consensus_algorithm (ConsensusAlgorithm) - Consensus algorithm to use

  • quorum (TagSet) - Participant definitions

  • tags (TagSet, optional) - Metadata tags

  • base_variables (TagSet, optional) - Initial variables

Returns: Transaction object

Verification Operations

verify(persona, data)

def verify(
    self,
    persona: Persona,
    data: Union[VerifiableData, Credential, Transaction]
) -> VerificationResult

Verify signed data using contact information and PKI data.

Parameters:

  • persona (Persona) - Verifying persona

  • data (VerifiableData, Credential, or Transaction) - Data to verify

Returns: VerificationResult object

Example:

# Verify signed data
result = gateway.verify(bob_persona, signed_data)

if result.is_verified():
    print("Signature is valid")
    signer = result.signer()
    print(f"Signed by: {signer.name}.{signer.subname}")
else:
    print("Signature verification failed")

verify_verifiable_data(persona, verifiable_data)

def verify_verifiable_data(
    self,
    persona: Persona,
    verifiable_data: VerifiableData
) -> VerificationResult

Verify verifiable data signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • verifiable_data (VerifiableData) - Data to verify

Returns: VerificationResult object

verify_credential(persona, credential)

def verify_credential(
    self,
    persona: Persona,
    credential: Credential
) -> VerificationResult

Verify credential signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • credential (Credential) - Credential to verify

Returns: VerificationResult object

verify_transaction(persona, transaction)

def verify_transaction(
    self,
    persona: Persona,
    transaction: Transaction
) -> VerificationResult

Verify transaction signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • transaction (Transaction) - Transaction to verify

Returns: VerificationResult object

Monitor Operations

on_start()

def on_start(self) -> None

Start PKI monitoring services.

on_resume()

def on_resume(self) -> None

Resume PKI monitoring services.

on_pause()

def on_pause(self) -> None

Pause PKI monitoring services.

on_stop()

def on_stop(self) -> None

Stop PKI monitoring services.

force_update_cache()

def force_update_cache(self) -> None

Force an update of the PKI cache state.

await_new_block_height()

def await_new_block_height(self) -> int

Wait for PKI block notifications and return the new block height.

Returns: New block height

await_persona_maturity(persona)

def await_persona_maturity(self, persona: Persona) -> None

Block until the specified persona becomes mature.

Parameters:

  • persona (Persona) - Persona to wait for

Example: Complete Workflow

from keychain.gateway import Gateway
from keychain.constants import SecurityLevel, DataType
from keychain.core.serialized_data import SerializedData
from keychain.core.tag_set import TagSet

# Initialize gateway
settings = Gateway.init("keychain.config")
gateway = Gateway(settings)

try:
    # Create or find personas
    try:
        alice = gateway.find_persona("alice", "company.com")
    except KeychainNotFoundError:
        alice = gateway.create_persona(
            name="alice",
            subname="company.com",
            security_level=SecurityLevel.HIGH,
            auto_renew=True
        )

    try:
        bob = gateway.find_persona("bob", "company.com")
    except KeychainNotFoundError:
        bob = gateway.create_persona(
            name="bob",
            subname="company.com",
            security_level=SecurityLevel.HIGH,
            auto_renew=True
        )

    # Add Bob as a contact for Alice
    bob_contact = gateway.add_contact(
        persona=alice,
        name="bob",
        subname="company.com",
        did=bob.did
    )

    # Encrypt a message from Alice to Bob
    message = "Confidential project update"
    encrypted = gateway.encrypt(
        persona=alice,
        clear_text=message,
        recipients=[bob_contact]
    )

    print("Message encrypted successfully")

    # Bob decrypts the message
    decrypted_data, encoding = gateway.decrypt(bob, encrypted)
    print(f"Decrypted message: {decrypted_data}")

    # Alice signs a document
    document = SerializedData.from_string(
        "Important contract terms",
        DataType.UTF8_STRING
    )

    signed_document = gateway.sign(
        persona=alice,
        cleartext=document,
        approval=True
    )

    print("Document signed successfully")

    # Bob verifies Alice's signature
    verification_result = gateway.verify(bob, signed_document)

    if verification_result.is_verified():
        signer = verification_result.signer()
        print(f"Signature verified! Signed by: {signer.name}.{signer.subname}")
    else:
        print("Signature verification failed")

finally:
    # Cleanup
    Gateway.close()

Error Handling

The Gateway class methods can raise various exceptions from the keychain.exceptions module:

  • KeychainInitializationError - Library initialization failures

  • KeychainValidationError - Input validation errors

  • KeychainNotFoundError - Resource not found

  • KeychainSecurityError - Cryptographic operation failures

  • KeychainMemoryError - Memory allocation failures

Always wrap Gateway operations in appropriate try-catch blocks for robust error handling.

See Also