"""
Copyright 2025 Biglup Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import annotations
import json
from typing import List, Optional, Union, TYPE_CHECKING
from .._aiken_ffi import aiken_ffi, aiken_lib
from ..errors import CardanoError
from ..cbor import CborReader, CborWriter
from ..common.ex_units import ExUnits
from ..common.slot_config import SlotConfig
if TYPE_CHECKING:
from ..transaction import Transaction
from ..common.utxo_list import UtxoList
from ..common.utxo import Utxo
from ..witness_set import Redeemer
from ..protocol_params import Costmdls
from ..transaction_body import (
TransactionInput,
TransactionInputSet,
TransactionOutputList,
TransactionOutput,
)
DEFAULT_MAX_TX_EX_UNITS_MEMORY = 14_000_000
DEFAULT_MAX_TX_EX_UNITS_CPU = 10_000_000_000
# Aiken seems to slightly underestimate the required resources,
# so we apply a safety factor to the results.
SAFETY_MARGIN = 1.025
class TxEvaluationError(CardanoError):
"""Exception raised when transaction evaluation fails."""
[docs]
class AikenTxEvaluator:
"""
Transaction evaluator using the Aiken UPLC evaluator.
This evaluator calculates execution units for Plutus scripts using
the Aiken library's phase-two validation. It implements the TxEvaluatorProtocol.
Example:
>>> from cometa.aiken import AikenTxEvaluator
>>> from cometa import ExUnits, SlotConfig
>>>
>>> # Create evaluator with cost models (required for evaluation)
>>> evaluator = AikenTxEvaluator(cost_models=cost_models)
>>> redeemers = evaluator.evaluate(transaction, utxos)
>>>
>>> # For testnets or custom networks with custom budget
>>> evaluator = AikenTxEvaluator(
... cost_models=cost_models,
... slot_config=SlotConfig.preview(),
... max_tx_ex_units=ExUnits.new(28000000, 20000000000)
... )
>>>
>>> # Using protocol parameters for budget
>>> evaluator = AikenTxEvaluator(
... cost_models=protocol_params.cost_models,
... slot_config=SlotConfig.preprod(),
... max_tx_ex_units=protocol_params.max_tx_ex_units
... )
"""
def __init__(
self,
cost_models: "Costmdls",
slot_config: Optional[SlotConfig] = None,
max_tx_ex_units: Optional[ExUnits] = None,
) -> None:
"""
Creates a new AikenTxEvaluator.
Args:
cost_models: The cost models for script evaluation (required).
slot_config: Slot configuration for the network. Defaults to mainnet.
max_tx_ex_units: Maximum execution units for the transaction.
Defaults to mainnet values (14M memory, 10B CPU steps).
Can be obtained from protocol_params.max_tx_ex_units.
"""
self._cost_models = cost_models
self._slot_config = slot_config or SlotConfig.mainnet()
self._max_tx_ex_units = max_tx_ex_units or ExUnits.new(
DEFAULT_MAX_TX_EX_UNITS_MEMORY,
DEFAULT_MAX_TX_EX_UNITS_CPU
)
[docs]
def get_name(self) -> str:
"""
Get the human-readable name of this evaluator.
Returns:
The evaluator name.
"""
return "Aiken"
@property
def slot_config(self) -> SlotConfig:
"""The slot configuration used by this evaluator."""
return self._slot_config
@property
def max_tx_ex_units(self) -> ExUnits:
"""The maximum transaction execution units used by this evaluator."""
return self._max_tx_ex_units
@property
def cost_models(self) -> "Costmdls":
"""The cost models used by this evaluator."""
return self._cost_models
[docs]
def evaluate(
self,
transaction: "Transaction",
additional_utxos: Union["UtxoList", List["Utxo"], None],
) -> List["Redeemer"]:
"""
Evaluate the execution units required for a transaction.
This method calculates the execution units needed for each Plutus
script in the transaction using the Aiken UPLC evaluator.
Args:
transaction: The transaction to evaluate.
additional_utxos: UTXOs referenced by the transaction (inputs + reference inputs).
Returns:
A list of Redeemer objects with computed execution units.
Raises:
TxEvaluationError: If evaluation fails.
"""
utxo_list = _prepare_utxos(additional_utxos)
all_inputs = _collect_all_inputs(transaction)
resolved_outputs = _resolve_outputs(all_inputs, utxo_list)
result = self._call_eval_phase_two(
transaction.serialize_to_cbor(),
_serialize_inputs(all_inputs),
_serialize_outputs(resolved_outputs),
_serialize_cost_models(self._cost_models),
)
return _parse_result(result)
def _call_eval_phase_two(
self,
tx_hex: str,
inputs_hex: str,
outputs_hex: str,
cost_models_hex: str,
) -> dict:
"""Calls the native eval_phase_two function."""
slot_config = aiken_ffi.new("SlotConfig*")
slot_config.slot_length = self._slot_config.slot_length
slot_config.zero_slot = self._slot_config.zero_slot
slot_config.zero_time = self._slot_config.zero_time
initial_budget = aiken_ffi.new("InitialBudget*")
initial_budget.mem = int(self._max_tx_ex_units.memory / SAFETY_MARGIN)
initial_budget.cpu = int(self._max_tx_ex_units.cpu_steps / SAFETY_MARGIN)
# DEBUG
result_ptr = aiken_lib.eval_phase_two(
tx_hex.encode("utf-8") + b'\0',
inputs_hex.encode("utf-8") + b'\0',
outputs_hex.encode("utf-8") + b'\0',
cost_models_hex.encode("utf-8") + b'\0',
initial_budget,
slot_config,
)
try:
result_str = aiken_ffi.string(result_ptr)
return json.loads(result_str)
finally:
aiken_lib.drop_char_pointer(result_ptr)
def _prepare_utxos(
additional_utxos: Union["UtxoList", List["Utxo"], None]
) -> "UtxoList":
"""Converts additional_utxos to a UtxoList."""
from ..common.utxo_list import UtxoList
if additional_utxos is None:
return UtxoList()
if isinstance(additional_utxos, list):
return UtxoList.from_list(additional_utxos)
return additional_utxos
def _collect_all_inputs(transaction: "Transaction") -> "TransactionInputSet":
"""Collects all inputs including reference inputs."""
from ..transaction_body import TransactionInputSet
all_inputs = TransactionInputSet()
for tx_input in transaction.body.inputs:
all_inputs.add(tx_input)
ref_inputs = transaction.body.reference_inputs
if ref_inputs:
for ref_input in ref_inputs:
all_inputs.add(ref_input)
return all_inputs
def _resolve_outputs(
inputs: "TransactionInputSet",
utxos: "UtxoList",
) -> "TransactionOutputList":
"""Resolves transaction inputs to their corresponding outputs."""
from ..transaction_body import TransactionOutputList
outputs = TransactionOutputList()
for tx_input in inputs:
output = _find_utxo_output(tx_input, utxos)
outputs.add(output)
return outputs
def _find_utxo_output(tx_input: "TransactionInput", utxos: "UtxoList") -> "TransactionOutput":
"""Finds the output for a given input from the UTXO set."""
for utxo in utxos:
if utxo.input.transaction_id == tx_input.transaction_id and utxo.input.index == tx_input.index:
return utxo.output
raise TxEvaluationError(
f"Could not find UTXO for input: {tx_input.transaction_id.hex()}#{tx_input.index}"
)
def _serialize_inputs(inputs: "TransactionInputSet") -> str:
"""Serializes inputs to CBOR hex as an array (Aiken expects array, not set)."""
writer = CborWriter()
writer.write_start_array(len(inputs))
for tx_input in inputs:
tx_input.to_cbor(writer)
return writer.to_hex()
def _serialize_outputs(outputs: "TransactionOutputList") -> str:
"""Serializes outputs to CBOR hex."""
writer = CborWriter()
outputs.to_cbor(writer)
return writer.to_hex()
def _serialize_cost_models(cost_models: "Costmdls") -> str:
"""Serializes cost models to CBOR hex."""
writer = CborWriter()
cost_models.to_cbor(writer)
return writer.to_hex()
def _parse_result(result: dict) -> List["Redeemer"]:
"""Parses the evaluation result and returns redeemers."""
from ..witness_set import RedeemerList
status = result.get("status", "").upper()
if status != "SUCCESS":
error_msg = result.get("error", "Unknown evaluation error")
raise TxEvaluationError(f"Transaction evaluation failed: {error_msg}")
redeemer_cbor = result.get("redeemer_cbor", "")
if not redeemer_cbor:
return []
reader = CborReader.from_hex(redeemer_cbor)
redeemer_list = RedeemerList.from_cbor(reader)
for redeemer in redeemer_list:
if not redeemer.ex_units:
continue
redeemer.ex_units.memory = int(redeemer.ex_units.memory * SAFETY_MARGIN)
redeemer.ex_units.cpu_steps = int(redeemer.ex_units.cpu_steps * SAFETY_MARGIN)
return list(redeemer_list)