#![cfg_attr(not(feature = "std"), no_std)]
use core::fmt::Debug;
use cfg_primitives::LP_DEFENSIVE_WEIGHT;
use cfg_traits::liquidity_pools::{
InboundMessageHandler, LpMessageBatch, LpMessageProof, LpMessageRecovery, LpMessageSerializer,
MessageHash, MessageProcessor, MessageQueue, MessageReceiver, MessageSender,
OutboundMessageHandler, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{
dispatch::DispatchResult,
pallet_prelude::*,
storage::{with_transaction, TransactionOutcome},
};
use frame_system::pallet_prelude::{ensure_signed, OriginFor};
use message::GatewayMessage;
use orml_traits::GetByKey;
pub use pallet::*;
use parity_scale_codec::FullCodec;
use sp_arithmetic::traits::{BaseArithmetic, EnsureAddAssign, One};
use sp_std::convert::TryInto;
use crate::{
message_processing::{InboundEntry, ProofEntry},
weights::WeightInfo,
};
pub mod message;
pub mod weights;
mod message_processing;
#[cfg(test)]
mod mock;
#[cfg(test)]
mod tests;
#[frame_support::pallet]
pub mod pallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
#[pallet::pallet]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config {
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type AdminOrigin: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
type Message: LpMessageSerializer
+ LpMessageBatch
+ LpMessageProof
+ LpMessageRecovery
+ Clone
+ Debug
+ PartialEq
+ Eq
+ MaxEncodedLen
+ TypeInfo
+ FullCodec;
type MessageSender: MessageSender<
Middleware = Self::RouterId,
Origin = DomainAddress,
Message = Self::Message,
>;
type RouterId: Parameter + MaxEncodedLen + Into<Domain>;
type RouterProvider: RouterProvider<Domain, RouterId = Self::RouterId>;
type InboundMessageHandler: InboundMessageHandler<
Sender = DomainAddress,
Message = Self::Message,
>;
type WeightInfo: WeightInfo;
#[pallet::constant]
type MaxIncomingMessageSize: Get<u32>;
#[pallet::constant]
type Sender: Get<DomainAddress>;
type MessageQueue: MessageQueue<Message = GatewayMessage<Self::Message, Self::RouterId>>;
#[pallet::constant]
type MaxRouterCount: Get<u32>;
type SessionId: Parameter + Member + BaseArithmetic + Default + Copy + MaxEncodedLen;
}
#[pallet::event]
#[pallet::generate_deposit(pub (super) fn deposit_event)]
pub enum Event<T: Config> {
RoutersSet {
router_ids: BoundedVec<T::RouterId, T::MaxRouterCount>,
session_id: T::SessionId,
},
InstanceAdded { instance: DomainAddress },
InstanceRemoved { instance: DomainAddress },
DomainHookAddressSet {
domain: Domain,
hook_address: [u8; 20],
},
InboundMessageProcessed {
domain_address: DomainAddress,
message_hash: MessageHash,
router_id: T::RouterId,
},
InboundProofProcessed {
domain_address: DomainAddress,
message_hash: MessageHash,
router_id: T::RouterId,
},
InboundMessageExecuted {
domain_address: DomainAddress,
message_hash: MessageHash,
},
OutboundMessageSent {
domain_address: DomainAddress,
message_hash: MessageHash,
router_id: T::RouterId,
},
MessageRecoveryExecuted {
message_hash: MessageHash,
router_id: T::RouterId,
},
MessageRecoveryInitiated {
domain: Domain,
message_hash: MessageHash,
recovery_router: [u8; 32],
messaging_router: T::RouterId,
},
MessageRecoveryDisputed {
domain: Domain,
message_hash: MessageHash,
recovery_router: [u8; 32],
messaging_router: T::RouterId,
},
}
#[pallet::storage]
#[pallet::getter(fn routers)]
pub type Routers<T: Config> =
StorageValue<_, BoundedVec<T::RouterId, T::MaxRouterCount>, ValueQuery>;
#[pallet::storage]
#[pallet::getter(fn allowlist)]
pub type Allowlist<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, Domain, Blake2_128Concat, DomainAddress, ()>;
#[pallet::storage]
pub type DomainHookAddress<T: Config> =
StorageMap<_, Blake2_128Concat, Domain, [u8; 20], OptionQuery>;
#[pallet::storage]
pub(crate) type PackedMessage<T: Config> =
StorageMap<_, Blake2_128Concat, (T::AccountId, Domain), T::Message>;
#[pallet::storage]
#[pallet::getter(fn pending_inbound_entries)]
pub type PendingInboundEntries<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
MessageHash,
Blake2_128Concat,
T::RouterId,
InboundEntry<T>,
>;
#[pallet::storage]
pub type SessionIdStore<T: Config> = StorageValue<_, T::SessionId, ValueQuery>;
#[pallet::error]
pub enum Error<T> {
InvalidMessageOrigin,
DomainNotSupported,
InstanceAlreadyAdded,
MaxDomainInstances,
UnknownInstance,
RoutersNotFound,
MessagePackingAlreadyStarted,
MessagePackingNotStarted,
UnknownRouter,
MessagingRouterNotFound,
MessageExpectedFromFirstRouter,
ProofNotExpectedFromFirstRouter,
ExpectedMessageType,
ExpectedMessageProofType,
PendingInboundEntryNotFound,
MessageProofRetrieval,
RecoveryMessageNotFound,
NotEnoughRoutersForDomain,
InboundEntryMessageMismatch,
InboundEntryDomainAddressMismatch,
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::weight(T::WeightInfo::set_routers())]
#[pallet::call_index(0)]
pub fn set_routers(
origin: OriginFor<T>,
router_ids: BoundedVec<T::RouterId, T::MaxRouterCount>,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
<Routers<T>>::set(router_ids.clone());
let new_session_id = SessionIdStore::<T>::try_mutate(|n| {
n.ensure_add_assign(One::one())?;
Ok::<T::SessionId, DispatchError>(*n)
})?;
Self::deposit_event(Event::RoutersSet {
router_ids,
session_id: new_session_id,
});
Ok(())
}
#[pallet::weight(T::WeightInfo::add_instance())]
#[pallet::call_index(1)]
pub fn add_instance(origin: OriginFor<T>, instance: DomainAddress) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
ensure!(
instance.domain() != Domain::Centrifuge,
Error::<T>::DomainNotSupported
);
ensure!(
!Allowlist::<T>::contains_key(instance.domain(), instance.clone()),
Error::<T>::InstanceAlreadyAdded,
);
Allowlist::<T>::insert(instance.domain(), instance.clone(), ());
Self::deposit_event(Event::InstanceAdded { instance });
Ok(())
}
#[pallet::weight(T::WeightInfo::remove_instance())]
#[pallet::call_index(2)]
pub fn remove_instance(origin: OriginFor<T>, instance: DomainAddress) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin.clone())?;
ensure!(
Allowlist::<T>::contains_key(instance.domain(), instance.clone()),
Error::<T>::UnknownInstance,
);
Allowlist::<T>::remove(instance.domain(), instance.clone());
Self::deposit_event(Event::InstanceRemoved { instance });
Ok(())
}
#[pallet::weight(T::WeightInfo::set_domain_hook_address())]
#[pallet::call_index(8)]
pub fn set_domain_hook_address(
origin: OriginFor<T>,
domain: Domain,
hook_address: [u8; 20],
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
ensure!(domain != Domain::Centrifuge, Error::<T>::DomainNotSupported);
DomainHookAddress::<T>::insert(domain, hook_address);
Self::deposit_event(Event::DomainHookAddressSet {
domain,
hook_address,
});
Ok(())
}
#[pallet::weight(T::WeightInfo::start_batch_message())]
#[pallet::call_index(9)]
pub fn start_batch_message(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;
PackedMessage::<T>::mutate((&sender, &destination), |msg| match msg {
Some(_) => Err(Error::<T>::MessagePackingAlreadyStarted.into()),
None => {
*msg = Some(T::Message::empty());
Ok(())
}
})
}
#[pallet::weight(T::WeightInfo::end_batch_message())]
#[pallet::call_index(10)]
pub fn end_batch_message(origin: OriginFor<T>, destination: Domain) -> DispatchResult {
let sender = ensure_signed(origin)?;
match PackedMessage::<T>::take((&sender, &destination)) {
Some(msg) if msg.submessages().is_empty() => Ok(()), Some(message) => Self::queue_outbound_message(destination, message),
None => Err(Error::<T>::MessagePackingNotStarted.into()),
}
}
#[pallet::weight(T::WeightInfo::execute_message_recovery())]
#[pallet::call_index(11)]
pub fn execute_message_recovery(
origin: OriginFor<T>,
domain_address: DomainAddress,
message_hash: MessageHash,
router_id: T::RouterId,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?;
ensure!(
router_ids.iter().any(|x| x == &router_id),
Error::<T>::UnknownRouter
);
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain);
let session_id = SessionIdStore::<T>::get();
PendingInboundEntries::<T>::try_mutate(
message_hash,
router_id.clone(),
|storage_entry| match storage_entry {
Some(stored_inbound_entry) => {
stored_inbound_entry.increment_proof_count(session_id)
}
None => {
*storage_entry = Some(InboundEntry::<T>::Proof(ProofEntry {
session_id,
current_count: 1,
}));
Ok::<(), DispatchError>(())
}
},
)?;
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?;
Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address,
)?;
Self::deposit_event(Event::<T>::MessageRecoveryExecuted {
message_hash,
router_id,
});
Ok(())
}
#[pallet::weight(T::WeightInfo::initiate_message_recovery())]
#[pallet::call_index(12)]
pub fn initiate_message_recovery(
origin: OriginFor<T>,
message_hash: MessageHash,
recovery_router: [u8; 32],
messaging_router: T::RouterId,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
let domain = messaging_router.clone().into();
let message = T::Message::initiate_recovery_message(message_hash, recovery_router);
Self::send_recovery_message(domain, message, messaging_router.clone())?;
Self::deposit_event(Event::<T>::MessageRecoveryInitiated {
domain,
message_hash,
recovery_router,
messaging_router,
});
Ok(())
}
#[pallet::weight(T::WeightInfo::dispute_message_recovery())]
#[pallet::call_index(13)]
pub fn dispute_message_recovery(
origin: OriginFor<T>,
message_hash: MessageHash,
recovery_router: [u8; 32],
messaging_router: T::RouterId,
) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
let domain = messaging_router.clone().into();
let message = T::Message::dispute_recovery_message(message_hash, recovery_router);
Self::send_recovery_message(domain, message, messaging_router.clone())?;
Self::deposit_event(Event::<T>::MessageRecoveryDisputed {
domain,
message_hash,
recovery_router,
messaging_router,
});
Ok(())
}
}
impl<T: Config> Pallet<T> {
fn send_recovery_message(
domain: Domain,
message: T::Message,
messaging_router: T::RouterId,
) -> DispatchResult {
let router_ids = Self::get_router_ids_for_domain(domain)?;
ensure!(
router_ids.iter().any(|x| x == &messaging_router),
Error::<T>::MessagingRouterNotFound
);
T::MessageSender::send(messaging_router, T::Sender::get(), message)
}
}
impl<T: Config> OutboundMessageHandler for Pallet<T> {
type Destination = Domain;
type Message = T::Message;
type Sender = T::AccountId;
fn handle(
from: Self::Sender,
destination: Self::Destination,
message: Self::Message,
) -> DispatchResult {
ensure!(
destination != Domain::Centrifuge,
Error::<T>::DomainNotSupported
);
PackedMessage::<T>::mutate((&from, destination), |batch| match batch {
Some(batch) => batch.pack_with(message),
None => Self::queue_outbound_message(destination, message),
})
}
}
impl<T: Config> GetByKey<Domain, Option<[u8; 20]>> for Pallet<T> {
fn get(domain: &Domain) -> Option<[u8; 20]> {
DomainHookAddress::<T>::get(domain)
}
}
impl<T: Config> MessageProcessor for Pallet<T> {
type Message = GatewayMessage<T::Message, T::RouterId>;
fn process(msg: Self::Message) -> (DispatchResult, Weight) {
let res = with_transaction(|| {
let res = match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
} => Self::process_inbound_message(domain_address, message, router_id),
GatewayMessage::Outbound { message, router_id } => {
T::MessageSender::send(router_id, T::Sender::get(), message)
}
};
if res.is_ok() {
TransactionOutcome::Commit(res)
} else {
TransactionOutcome::Rollback(res)
}
});
(res, LP_DEFENSIVE_WEIGHT)
}
fn max_processing_weight(_: &Self::Message) -> Weight {
LP_DEFENSIVE_WEIGHT
}
}
impl<T: Config> MessageReceiver for Pallet<T> {
type Message = T::Message;
type Middleware = T::RouterId;
type Origin = DomainAddress;
fn receive(
router_id: T::RouterId,
origin_address: DomainAddress,
message: T::Message,
) -> DispatchResult {
ensure!(
Allowlist::<T>::contains_key(origin_address.domain(), origin_address.clone()),
Error::<T>::UnknownInstance,
);
let gateway_message = GatewayMessage::<T::Message, T::RouterId>::Inbound {
domain_address: origin_address,
message,
router_id,
};
T::MessageQueue::queue(gateway_message)
}
}
}