Version:

Gateway Class

Overview

The Gateway class provides the main interface for all Keychain operations in Python. It wraps the underlying C library and provides high-level methods for persona management, encryption, signing, and verification operations.

Package: keychain.gateway

from keychain.gateway import Gateway

Class Definition

class Gateway:
    """Main interface for keychain operations."""

Constructor

Gateway(settings)

def __init__(self, settings: Settings) -> None

Initialize a Gateway instance with the provided settings.

Parameters:

  • settings (Settings) - Settings object obtained from Gateway.init()

Example:

from keychain.gateway import Gateway
from keychain.utils.settings import Settings

# Initialize keychain
settings = Gateway.init("config.json")

# Create gateway instance
gateway = Gateway(settings)

Static Methods

init(config_path)

@staticmethod
def init(config_path: str) -> Settings

Initialize the Keychain library and return a settings object.

Parameters:

  • config_path (str) - Path to the configuration file

Returns: Settings object for gateway construction

Example:

settings = Gateway.init("/path/to/keychain.config")
gateway = Gateway(settings)

close()

@staticmethod
def close() -> None

Clean up static library resources. Call at application shutdown.

Example:

# At application shutdown
Gateway.close()

hash(clear_text)

@staticmethod
def hash(clear_text: Union[str, bytes]) -> str

Calculate SHA-256 hash of input data.

Parameters:

  • clear_text (str or bytes) - Data to hash

Returns: Hexadecimal hash string

Example:

hash_result = Gateway.hash("Hello, World!")
print(f"Hash: {hash_result}")

generate_signature_by_private_key(cleartext, private_key_hex, algorithm)

@staticmethod
def generate_signature_by_private_key(
    cleartext: Union[str, bytes],
    private_key_hex: str,
    algorithm: SignatureScheme
) -> str

Generate a signature using an external private key.

Parameters:

  • cleartext (str or bytes) - Data to sign

  • private_key_hex (str) - Private key in hexadecimal format

  • algorithm (SignatureScheme) - Signature algorithm to use

Returns: Signature in hexadecimal format

verify_raw_data(cleartext, public_key_hex, signature_hex, algorithm)

@staticmethod
def verify_raw_data(
    cleartext: Union[str, bytes],
    public_key_hex: str,
    signature_hex: str,
    algorithm: SignatureScheme
) -> bool

Verify a signature against a public key without persona context.

Parameters:

  • cleartext (str or bytes) - Original signed data

  • public_key_hex (str) - Public key in hexadecimal format

  • signature_hex (str) - Signature in hexadecimal format

  • algorithm (SignatureScheme) - Signature algorithm used

Returns: True if signature is valid, False otherwise

Persona Management Methods

create_persona(name, subname, security_level, auto_renew)

def create_persona(
    self,
    name: str,
    subname: str,
    security_level: SecurityLevel,
    auto_renew: bool
) -> Persona

Create a new persona with default cryptographic parameters.

Parameters:

  • name (str) - Primary name for the persona

  • subname (str) - Secondary name for the persona

  • security_level (SecurityLevel) - Security level (LIGHTWEIGHT, LOW, MEDIUM, HIGH, ULTRA)

  • auto_renew (bool) - Whether to automatically renew certificates

Returns: New Persona object

Example:

from keychain.constants import SecurityLevel

persona = gateway.create_persona(
    name="alice",
    subname="company.com",
    security_level=SecurityLevel.HIGH,
    auto_renew=True
)

create_persona_from_algorithm_classes(name, subname, security_level, auto_renew, encryption_algorithm, signature_algorithm, cipher)

def create_persona_from_algorithm_classes(
    self,
    name: str,
    subname: str,
    security_level: SecurityLevel,
    auto_renew: bool,
    encryption_algorithm: EncryptionScheme,
    signature_algorithm: SignatureScheme,
    cipher: Cipher
) -> Persona

Create a new persona with specified algorithm classes.

Parameters:

  • name (str) - Primary name for the persona

  • subname (str) - Secondary name for the persona

  • security_level (SecurityLevel) - Security level

  • auto_renew (bool) - Whether to automatically renew certificates

  • encryption_algorithm (EncryptionScheme) - Encryption algorithm

  • signature_algorithm (SignatureScheme) - Signature algorithm

  • cipher (Cipher) - Symmetric cipher algorithm

Returns: New Persona object

find_persona(name, subname)

def find_persona(self, name: str, subname: str) -> Persona

Find an existing persona by name and subname.

Parameters:

  • name (str) - Primary name to search for

  • subname (str) - Secondary name to search for

Returns: Persona object if found

Raises: KeychainNotFoundError if persona not found

Example:

try:
    persona = gateway.find_persona("alice", "company.com")
    print(f"Found persona: {persona.name}.{persona.subname}")
except KeychainNotFoundError:
    print("Persona not found")

retrieve_personas()

def retrieve_personas(self) -> List[Persona]

Retrieve all personas from the local cache.

Returns: List of Persona objects

Example:

personas = gateway.retrieve_personas()
print(f"Found {len(personas)} personas")

for persona in personas:
    print(f"  {persona.name}.{persona.subname}")

add_contact(persona, name, subname, did)

def add_contact(
    self,
    persona: Persona,
    name: str,
    subname: str,
    did: PersonaDID
) -> Contact

Add a new contact to a persona.

Parameters:

  • persona (Persona) - Persona to add contact to

  • name (str) - Contact’s primary name

  • subname (str) - Contact’s secondary name

  • did (PersonaDID) - Contact’s decentralized identifier

Returns: New Contact object

Example:

# Assuming bob_did is a PersonaDID object
contact = gateway.add_contact(
    persona=alice_persona,
    name="bob",
    subname="company.com",
    did=bob_did
)

refresh_persona(persona)

def refresh_persona(self, persona: Persona) -> None

Refresh persona data from the database.

Parameters:

  • persona (Persona) - Persona to refresh

set_auto_renew(persona, auto_renew)

def set_auto_renew(self, persona: Persona, auto_renew: bool) -> None

Configure automatic certificate renewal for a persona.

Parameters:

  • persona (Persona) - Persona to configure

  • auto_renew (bool) - Whether to enable auto renewal

renew_certificate(persona)

def renew_certificate(self, persona: Persona) -> None

Manually initiate certificate renewal for a persona.

Parameters:

  • persona (Persona) - Persona to renew

mature_persona_exists()

def mature_persona_exists(self) -> bool

Check if any mature persona exists in the gateway.

Returns: True if at least one mature persona exists

Cryptographic Operations

encrypt(persona, clear_text, recipients)

def encrypt(
    self,
    persona: Persona,
    clear_text: Union[str, bytes],
    recipients: List[Contact]
) -> EncryptedData

Encrypt data for specified recipients using envelope encryption.

Parameters:

  • persona (Persona) - Encrypting persona

  • clear_text (str or bytes) - Data to encrypt

  • recipients (List[Contact]) - List of recipient contacts

Returns: EncryptedData object

Example:

# Encrypt message for multiple recipients
encrypted = gateway.encrypt(
    persona=alice_persona,
    clear_text="Confidential message",
    recipients=[bob_contact, charlie_contact]
)

decrypt(persona, ciphertext)

def decrypt(
    self,
    persona: Persona,
    ciphertext: EncryptedData
) -> Tuple[Union[str, bytes], CharEncoding]

Decrypt encrypted data using the persona’s private key.

Parameters:

  • persona (Persona) - Decrypting persona

  • ciphertext (EncryptedData) - Encrypted data to decrypt

Returns: Tuple of (decrypted data, character encoding)

Example:

decrypted_data, encoding = gateway.decrypt(bob_persona, encrypted)
print(f"Decrypted: {decrypted_data}")

add_decrypt_access(encrypted_data, persona, new_recipient)

def add_decrypt_access(
    self,
    encrypted_data: EncryptedData,
    persona: Persona,
    new_recipient: Contact
) -> EncryptedData

Grant decrypt access to an additional recipient.

Parameters:

  • encrypted_data (EncryptedData) - Existing encrypted data

  • persona (Persona) - Persona granting access (must have decrypt access)

  • new_recipient (Contact) - New recipient to grant access to

Returns: Updated EncryptedData object

sign(persona, cleartext, approval, tags, variables)

def sign(
    self,
    persona: Persona,
    cleartext: SerializedData,
    approval: bool = False,
    tags: Optional[TagSet] = None,
    variables: Optional[TagSet] = None
) -> VerifiableData

Create verifiable (signed) data using the persona’s private key.

Parameters:

  • persona (Persona) - Signing persona

  • cleartext (SerializedData) - Data to sign

  • approval (bool) - Whether this is an approval signature

  • tags (TagSet, optional) - Metadata tags

  • variables (TagSet, optional) - Variable metadata

Returns: VerifiableData object

Example:

from keychain.core.serialized_data import SerializedData
from keychain.constants import DataType

# Create serialized data
data = SerializedData.from_string("Document content", DataType.UTF8_STRING)

# Sign the data
signed_data = gateway.sign(
    persona=alice_persona,
    cleartext=data,
    approval=True
)

generate_signature_by_persona(persona, cleartext)

def generate_signature_by_persona(
    self,
    persona: Persona,
    cleartext: Union[str, bytes]
) -> str

Generate a raw signature using a persona’s private key.

Parameters:

  • persona (Persona) - Signing persona

  • cleartext (str or bytes) - Data to sign

Returns: Signature in hexadecimal format

create_credential(persona, credential_id, credential_type, subject_id, start_timestamp, end_timestamp, claims, tags, variables)

def create_credential(
    self,
    persona: Persona,
    credential_id: str,
    credential_type: str,
    subject_id: str,
    start_timestamp: int,
    end_timestamp: int,
    claims: TagSet,
    tags: Optional[TagSet] = None,
    variables: Optional[TagSet] = None
) -> Credential

Create and sign a W3C verifiable credential.

Parameters:

  • persona (Persona) - Issuing persona

  • credential_id (str) - Unique credential identifier

  • credential_type (str) - Type of credential

  • subject_id (str) - Subject’s identifier

  • start_timestamp (int) - Validity start time (milliseconds since epoch)

  • end_timestamp (int) - Validity end time (milliseconds since epoch)

  • claims (TagSet) - Credential claims

  • tags (TagSet, optional) - Metadata tags

  • variables (TagSet, optional) - Variable metadata

Returns: Credential object

Example:

import time
from keychain.core.tag_set import TagSet

# Create claims
claims = TagSet()
claims.set_tag_value("", "name", "Alice Smith")
claims.set_tag_value("", "role", "Senior Developer")
claims.set_tag_value("", "department", "Engineering")

# Create credential (valid for 1 year)
now = int(time.time() * 1000)
one_year = now + (365 * 24 * 60 * 60 * 1000)

credential = gateway.create_credential(
    persona=hr_persona,
    credential_id="emp-12345",
    credential_type="EmployeeCredential",
    subject_id="did:keychain:alice.company.com",
    start_timestamp=now,
    end_timestamp=one_year,
    claims=claims
)

create_transaction(persona, consensus_algorithm, quorum, tags, base_variables)

def create_transaction(
    self,
    persona: Persona,
    consensus_algorithm: ConsensusAlgorithm,
    quorum: TagSet,
    tags: Optional[TagSet] = None,
    base_variables: Optional[TagSet] = None
) -> Transaction

Create and sign a consensus transaction.

Parameters:

  • persona (Persona) - Creating persona

  • consensus_algorithm (ConsensusAlgorithm) - Consensus algorithm to use

  • quorum (TagSet) - Participant definitions

  • tags (TagSet, optional) - Metadata tags

  • base_variables (TagSet, optional) - Initial variables

Returns: Transaction object

Verification Operations

verify(persona, data)

def verify(
    self,
    persona: Persona,
    data: Union[VerifiableData, Credential, Transaction]
) -> VerificationResult

Verify signed data using contact information and PKI data.

Parameters:

  • persona (Persona) - Verifying persona

  • data (VerifiableData, Credential, or Transaction) - Data to verify

Returns: VerificationResult object

Example:

# Verify signed data
result = gateway.verify(bob_persona, signed_data)

if result.is_verified():
    print("Signature is valid")
    signer = result.signer()
    print(f"Signed by: {signer.name}.{signer.subname}")
else:
    print("Signature verification failed")

verify_verifiable_data(persona, verifiable_data)

def verify_verifiable_data(
    self,
    persona: Persona,
    verifiable_data: VerifiableData
) -> VerificationResult

Verify verifiable data signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • verifiable_data (VerifiableData) - Data to verify

Returns: VerificationResult object

verify_credential(persona, credential)

def verify_credential(
    self,
    persona: Persona,
    credential: Credential
) -> VerificationResult

Verify credential signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • credential (Credential) - Credential to verify

Returns: VerificationResult object

verify_transaction(persona, transaction)

def verify_transaction(
    self,
    persona: Persona,
    transaction: Transaction
) -> VerificationResult

Verify transaction signatures.

Parameters:

  • persona (Persona) - Verifying persona

  • transaction (Transaction) - Transaction to verify

Returns: VerificationResult object

Monitor Operations

on_start()

def on_start(self) -> None

Start PKI monitoring services.

on_resume()

def on_resume(self) -> None

Resume PKI monitoring services.

on_pause()

def on_pause(self) -> None

Pause PKI monitoring services.

on_stop()

def on_stop(self) -> None

Stop PKI monitoring services.

force_update_cache()

def force_update_cache(self) -> None

Force an update of the PKI cache state.

await_new_block_height()

def await_new_block_height(self) -> int

Wait for PKI block notifications and return the new block height.

Returns: New block height

await_persona_maturity(persona)

def await_persona_maturity(self, persona: Persona) -> None

Block until the specified persona becomes mature.

Parameters:

  • persona (Persona) - Persona to wait for

Example: Complete Workflow

from keychain.gateway import Gateway
from keychain.constants import SecurityLevel, DataType
from keychain.core.serialized_data import SerializedData
from keychain.core.tag_set import TagSet

# Initialize gateway
settings = Gateway.init("keychain.config")
gateway = Gateway(settings)

try:
    # Create or find personas
    try:
        alice = gateway.find_persona("alice", "company.com")
    except KeychainNotFoundError:
        alice = gateway.create_persona(
            name="alice",
            subname="company.com",
            security_level=SecurityLevel.HIGH,
            auto_renew=True
        )

    try:
        bob = gateway.find_persona("bob", "company.com")
    except KeychainNotFoundError:
        bob = gateway.create_persona(
            name="bob",
            subname="company.com",
            security_level=SecurityLevel.HIGH,
            auto_renew=True
        )

    # Add Bob as a contact for Alice
    bob_contact = gateway.add_contact(
        persona=alice,
        name="bob",
        subname="company.com",
        did=bob.did
    )

    # Encrypt a message from Alice to Bob
    message = "Confidential project update"
    encrypted = gateway.encrypt(
        persona=alice,
        clear_text=message,
        recipients=[bob_contact]
    )

    print("Message encrypted successfully")

    # Bob decrypts the message
    decrypted_data, encoding = gateway.decrypt(bob, encrypted)
    print(f"Decrypted message: {decrypted_data}")

    # Alice signs a document
    document = SerializedData.from_string(
        "Important contract terms",
        DataType.UTF8_STRING
    )

    signed_document = gateway.sign(
        persona=alice,
        cleartext=document,
        approval=True
    )

    print("Document signed successfully")

    # Bob verifies Alice's signature
    verification_result = gateway.verify(bob, signed_document)

    if verification_result.is_verified():
        signer = verification_result.signer()
        print(f"Signature verified! Signed by: {signer.name}.{signer.subname}")
    else:
        print("Signature verification failed")

finally:
    # Cleanup
    Gateway.close()

Error Handling

The Gateway class methods can raise various exceptions from the keychain.exceptions module:

  • KeychainInitializationError - Library initialization failures

  • KeychainValidationError - Input validation errors

  • KeychainNotFoundError - Resource not found

  • KeychainSecurityError - Cryptographic operation failures

  • KeychainMemoryError - Memory allocation failures

Always wrap Gateway operations in appropriate try-catch blocks for robust error handling.

Feature Availability Notes

Storage Operations (Not Available in Python)

The following storage operations are available in the C++ API but are not currently implemented in the Python bindings:

  • persist() - Store verifiable data, credentials, transactions, or encrypted data objects in the database

  • purge() - Delete objects from the database

  • retrieve_verifiable_data() - Retrieve verifiable data by timestamp and hash

  • retrieve_credential() - Retrieve credential by timestamp and hash

  • retrieve_transaction_history() - Retrieve transaction history by timestamp and hash

  • retrieve_encrypted_data() - Retrieve encrypted data by ciphertext hash

These operations are planned for future releases of the Python bindings.

Ledger Consensus Operations (Not Available in Python)

The following ledger consensus operations are available in the C++ API but are not currently implemented in the Python bindings:

  • ledger_start() - Create the first transaction of a ledger consensus process

  • ledger_receive() - Verify and store a ledger response from a counterparty

  • ledger_check_state_transition() - Progress the consensus stage based on received signatures

  • ledger_sign() - Prepare a signed response to a transaction from a counterparty

These multi-party consensus operations are planned for future releases of the Python bindings.

See Also