#![cfg_attr(not(feature = "std"), no_std)]
use core::fmt::Debug;
use cfg_traits::liquidity_pools::{MessageProcessor, MessageQueue as MessageQueueT};
use frame_support::{dispatch::PostDispatchInfo, pallet_prelude::*};
use frame_system::pallet_prelude::*;
pub use pallet::*;
use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;
#[cfg(test)]
mod mock;
#[cfg(test)]
mod tests;
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::config]
pub trait Config: frame_system::Config {
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type Message: Clone + Debug + PartialEq + MaxEncodedLen + TypeInfo + FullCodec;
type MessageNonce: Parameter
+ Member
+ BaseArithmetic
+ Default
+ Copy
+ MaybeSerializeDeserialize
+ TypeInfo
+ MaxEncodedLen;
type MessageProcessor: MessageProcessor<Message = Self::Message>;
}
#[pallet::pallet]
pub struct Pallet<T>(_);
#[pallet::storage]
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;
#[pallet::storage]
#[pallet::getter(fn message_queue)]
pub type MessageQueue<T: Config> = StorageMap<_, Blake2_128Concat, T::MessageNonce, T::Message>;
#[pallet::storage]
#[pallet::getter(fn failed_message_queue)]
pub type FailedMessageQueue<T: Config> =
StorageMap<_, Blake2_128Concat, T::MessageNonce, (T::Message, DispatchError)>;
#[pallet::event]
#[pallet::generate_deposit(pub (super) fn deposit_event)]
pub enum Event<T: Config> {
MessageSubmitted {
nonce: T::MessageNonce,
message: T::Message,
},
MessageExecutionSuccess {
nonce: T::MessageNonce,
message: T::Message,
},
MessageExecutionFailure {
nonce: T::MessageNonce,
message: T::Message,
error: DispatchError,
},
}
#[pallet::error]
pub enum Error<T> {
MessageNotFound,
}
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_idle(_now: BlockNumberFor<T>, max_weight: Weight) -> Weight {
Self::service_message_queue(max_weight)
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::weight(MessageQueue::<T>::get(nonce)
.map(|msg| T::MessageProcessor::max_processing_weight(&msg))
.unwrap_or(T::DbWeight::get().reads(1)))]
#[pallet::call_index(0)]
pub fn process_message(
origin: OriginFor<T>,
nonce: T::MessageNonce,
) -> DispatchResultWithPostInfo {
ensure_signed(origin)?;
let message = MessageQueue::<T>::take(nonce).ok_or(Error::<T>::MessageNotFound)?;
let (result, weight) = Self::process_message_and_deposit_event(nonce, message.clone());
if let Err(e) = result {
FailedMessageQueue::<T>::insert(nonce, (message, e));
}
Ok(PostDispatchInfo::from(Some(weight)))
}
#[pallet::weight(FailedMessageQueue::<T>::get(nonce)
.map(|(msg, _)| T::MessageProcessor::max_processing_weight(&msg))
.unwrap_or(T::DbWeight::get().reads(1)))]
#[pallet::call_index(1)]
pub fn process_failed_message(
origin: OriginFor<T>,
nonce: T::MessageNonce,
) -> DispatchResultWithPostInfo {
ensure_signed(origin)?;
let (message, _) =
FailedMessageQueue::<T>::get(nonce).ok_or(Error::<T>::MessageNotFound)?;
let (result, weight) = Self::process_message_and_deposit_event(nonce, message);
if result.is_ok() {
FailedMessageQueue::<T>::remove(nonce);
}
Ok(PostDispatchInfo::from(Some(weight)))
}
}
impl<T: Config> Pallet<T> {
fn process_message_and_deposit_event(
nonce: T::MessageNonce,
message: T::Message,
) -> (DispatchResult, Weight) {
match T::MessageProcessor::process(message.clone()) {
(Ok(()), weight) => {
Self::deposit_event(Event::<T>::MessageExecutionSuccess { nonce, message });
(Ok(()), weight)
}
(Err(error), weight) => {
Self::deposit_event(Event::<T>::MessageExecutionFailure {
nonce,
message,
error,
});
(Err(error), weight)
}
}
}
fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut processed_entries = Vec::new();
for (nonce, message) in MessageQueue::<T>::iter() {
let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);
if remaining_weight.any_lt(next_weight) {
break;
}
let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
(Ok(()), weight) => {
weight.saturating_add(T::DbWeight::get().reads_writes(1, 1))
}
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))
}
};
processed_entries.push(nonce);
weight_used = weight_used.saturating_add(weight);
if weight_used.all_gte(max_weight) {
break;
}
}
for entry in processed_entries {
MessageQueue::<T>::remove(entry);
}
weight_used
}
}
impl<T: Config> MessageQueueT for Pallet<T> {
type Message = T::Message;
fn queue(message: Self::Message) -> DispatchResult {
let nonce = <MessageNonceStore<T>>::try_mutate(|n| {
n.ensure_add_assign(T::MessageNonce::one())?;
Ok::<T::MessageNonce, DispatchError>(*n)
})?;
MessageQueue::<T>::insert(nonce, message.clone());
Self::deposit_event(Event::MessageSubmitted { nonce, message });
Ok(())
}
}
}