Skip to content

Commit ab7232c

Browse files
committed
fix tx_global_index consistency with transactions table
1 parent 11039a0 commit ab7232c

File tree

8 files changed

+159
-171
lines changed

8 files changed

+159
-171
lines changed

api-server/api-server-common/src/storage/impls/in_memory/mod.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515

1616
pub mod transactional;
1717

18-
use crate::storage::storage_api::{
19-
block_aux_data::{BlockAuxData, BlockWithExtraData},
20-
AmountWithDecimals, ApiServerStorageError, BlockInfo, CoinOrTokenStatistic, Delegation,
21-
FungibleTokenData, LockedUtxo, NftWithOwner, Order, PoolBlockStats, PoolDataWithExtraInfo,
22-
TokenTransaction, TransactionInfo, TransactionWithBlockInfo, Utxo, UtxoLock, UtxoWithExtraInfo,
18+
use std::{
19+
cmp::{Ordering, Reverse},
20+
collections::{BTreeMap, BTreeSet},
21+
ops::Bound::{Excluded, Unbounded},
22+
sync::Arc,
2323
};
24+
2425
use common::{
2526
address::Address,
2627
chain::{
@@ -31,24 +32,42 @@ use common::{
3132
},
3233
primitives::{id::WithId, Amount, BlockHeight, CoinOrTokenId, Id, Idable},
3334
};
34-
use itertools::Itertools as _;
35-
use std::{
36-
cmp::Reverse,
37-
collections::{BTreeMap, BTreeSet},
38-
ops::Bound::{Excluded, Unbounded},
39-
sync::Arc,
35+
36+
use crate::storage::storage_api::{
37+
block_aux_data::{BlockAuxData, BlockWithExtraData},
38+
AmountWithDecimals, ApiServerStorageError, BlockInfo, CoinOrTokenStatistic, Delegation,
39+
FungibleTokenData, LockedUtxo, NftWithOwner, Order, PoolBlockStats, PoolDataWithExtraInfo,
40+
TokenTransaction, TransactionInfo, TransactionWithBlockInfo, Utxo, UtxoLock, UtxoWithExtraInfo,
4041
};
4142

43+
use itertools::Itertools as _;
44+
4245
use super::CURRENT_STORAGE_VERSION;
4346

47+
#[derive(Debug, Clone, PartialEq, Eq)]
48+
struct TokenTransactionOrderedByTxId(TokenTransaction);
49+
50+
impl PartialOrd for TokenTransactionOrderedByTxId {
51+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
52+
Some(self.cmp(other))
53+
}
54+
}
55+
56+
impl Ord for TokenTransactionOrderedByTxId {
57+
fn cmp(&self, other: &Self) -> Ordering {
58+
self.0.tx_id.cmp(&other.0.tx_id)
59+
}
60+
}
61+
4462
#[derive(Debug, Clone)]
4563
struct ApiServerInMemoryStorage {
4664
block_table: BTreeMap<Id<Block>, BlockWithExtraData>,
4765
block_aux_data_table: BTreeMap<Id<Block>, BlockAuxData>,
4866
address_balance_table: BTreeMap<String, BTreeMap<CoinOrTokenId, BTreeMap<BlockHeight, Amount>>>,
4967
address_locked_balance_table: BTreeMap<String, BTreeMap<(CoinOrTokenId, BlockHeight), Amount>>,
5068
address_transactions_table: BTreeMap<String, BTreeMap<BlockHeight, Vec<Id<Transaction>>>>,
51-
token_transactions_table: BTreeMap<TokenId, BTreeMap<BlockHeight, BTreeSet<TokenTransaction>>>,
69+
token_transactions_table:
70+
BTreeMap<TokenId, BTreeMap<BlockHeight, BTreeSet<TokenTransactionOrderedByTxId>>>,
5271
delegation_table: BTreeMap<DelegationId, BTreeMap<BlockHeight, Delegation>>,
5372
main_chain_blocks_table: BTreeMap<BlockHeight, Id<Block>>,
5473
pool_data_table: BTreeMap<PoolId, BTreeMap<BlockHeight, PoolDataWithExtraInfo>>,
@@ -188,9 +207,9 @@ impl ApiServerInMemoryStorage {
188207
transactions
189208
.iter()
190209
.rev()
191-
.flat_map(|(_, txs)| txs.iter())
192-
.cloned()
210+
.flat_map(|(_, txs)| txs.iter().map(|tx| &tx.0))
193211
.flat_map(|tx| (tx.tx_global_index < tx_global_index).then_some(tx))
212+
.cloned()
194213
.take(len as usize)
195214
.collect()
196215
}))
@@ -979,36 +998,22 @@ impl ApiServerInMemoryStorage {
979998
Ok(())
980999
}
9811000

982-
fn set_token_transactions_at_height(
1001+
fn set_token_transaction_at_height(
9831002
&mut self,
9841003
token_id: TokenId,
985-
transaction_ids: BTreeSet<Id<Transaction>>,
1004+
tx_id: Id<Transaction>,
9861005
block_height: BlockHeight,
1006+
tx_global_index: u64,
9871007
) -> Result<(), ApiServerStorageError> {
988-
if transaction_ids.is_empty() {
989-
return Ok(());
990-
}
991-
992-
let next_tx_idx = self
993-
.token_transactions_table
994-
.values()
995-
.flat_map(|by_height| by_height.values())
996-
.flat_map(|tx_set| tx_set.iter())
997-
.map(|tx| tx.tx_global_index + 1)
998-
.max()
999-
.unwrap_or(0);
1000-
10011008
self.token_transactions_table
10021009
.entry(token_id)
10031010
.or_default()
10041011
.entry(block_height)
10051012
.or_default()
1006-
.extend(
1007-
transaction_ids.into_iter().enumerate().map(|(idx, tx_id)| TokenTransaction {
1008-
tx_global_index: next_tx_idx + idx as u64,
1009-
tx_id,
1010-
}),
1011-
);
1013+
.replace(TokenTransactionOrderedByTxId(TokenTransaction {
1014+
tx_global_index,
1015+
tx_id,
1016+
}));
10121017

10131018
Ok(())
10141019
}

api-server/api-server-common/src/storage/impls/in_memory/transactional/write.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,19 @@ impl ApiServerStorageWrite for ApiServerInMemoryStorageTransactionalRw<'_> {
111111
.set_address_transactions_at_height(address, transactions, block_height)
112112
}
113113

114-
async fn set_token_transactions_at_height(
114+
async fn set_token_transaction_at_height(
115115
&mut self,
116116
token_id: TokenId,
117-
transactions: BTreeSet<Id<Transaction>>,
117+
tx_id: Id<Transaction>,
118118
block_height: BlockHeight,
119+
tx_global_index: u64,
119120
) -> Result<(), ApiServerStorageError> {
120-
self.transaction
121-
.set_token_transactions_at_height(token_id, transactions, block_height)
121+
self.transaction.set_token_transaction_at_height(
122+
token_id,
123+
tx_id,
124+
block_height,
125+
tx_global_index,
126+
)
122127
}
123128

124129
async fn set_mainchain_block(

api-server/api-server-common/src/storage/impls/postgres/queries.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -604,25 +604,25 @@ impl<'a, 'b> QueryFromConnection<'a, 'b> {
604604
pub async fn set_token_transactions_at_height(
605605
&mut self,
606606
token_id: TokenId,
607-
transaction_ids: BTreeSet<Id<Transaction>>,
607+
transaction_id: Id<Transaction>,
608608
block_height: BlockHeight,
609+
tx_global_index: u64,
609610
) -> Result<(), ApiServerStorageError> {
610611
let height = Self::block_height_to_postgres_friendly(block_height);
612+
let tx_global_index = Self::tx_global_index_to_postgres_friendly(tx_global_index)?;
611613

612-
for transaction_id in transaction_ids {
613-
self.tx
614-
.execute(
615-
r#"
616-
INSERT INTO ml.token_transactions (token_id, block_height, transaction_id)
617-
VALUES ($1, $2, $3)
618-
ON CONFLICT (token_id, block_height, transaction_id)
619-
DO NOTHING;
620-
"#,
621-
&[&token_id.encode(), &height, &transaction_id.encode()],
622-
)
623-
.await
624-
.map_err(|e| ApiServerStorageError::LowLevelStorageError(e.to_string()))?;
625-
}
614+
self.tx
615+
.execute(
616+
r#"
617+
INSERT INTO ml.token_transactions (token_id, block_height, transaction_id, tx_global_index)
618+
VALUES ($1, $2, $3, $4)
619+
ON CONFLICT (token_id, transaction_id, block_height)
620+
DO NOTHING;
621+
"#,
622+
&[&token_id.encode(), &height, &transaction_id.encode(), &tx_global_index],
623+
)
624+
.await
625+
.map_err(|e| ApiServerStorageError::LowLevelStorageError(e.to_string()))?;
626626

627627
Ok(())
628628
}
@@ -823,11 +823,11 @@ impl<'a, 'b> QueryFromConnection<'a, 'b> {
823823

824824
self.just_execute(
825825
"CREATE TABLE ml.token_transactions (
826-
tx_global_index bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (START WITH 0 MINVALUE 0),
826+
tx_global_index bigint PRIMARY KEY,
827827
token_id bytea NOT NULL,
828828
block_height bigint NOT NULL,
829829
transaction_id bytea NOT NULL,
830-
UNIQUE (token_id, block_height, transaction_id)
830+
UNIQUE (token_id, transaction_id, block_height)
831831
);",
832832
)
833833
.await?;

api-server/api-server-common/src/storage/impls/postgres/transactional/write.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,21 @@ impl ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'_> {
131131
Ok(())
132132
}
133133

134-
async fn set_token_transactions_at_height(
134+
async fn set_token_transaction_at_height(
135135
&mut self,
136136
token_id: TokenId,
137-
transaction_ids: BTreeSet<Id<Transaction>>,
137+
transaction_id: Id<Transaction>,
138138
block_height: BlockHeight,
139+
tx_global_index: u64,
139140
) -> Result<(), ApiServerStorageError> {
140141
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
141-
conn.set_token_transactions_at_height(token_id, transaction_ids, block_height)
142-
.await?;
142+
conn.set_token_transactions_at_height(
143+
token_id,
144+
transaction_id,
145+
block_height,
146+
tx_global_index,
147+
)
148+
.await?;
143149

144150
Ok(())
145151
}

api-server/api-server-common/src/storage/storage_api/mod.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
// limitations under the License.
1515

1616
use std::{
17-
cmp::Ordering,
1817
collections::{BTreeMap, BTreeSet},
1918
fmt::Display,
2019
str::FromStr,
@@ -69,7 +68,7 @@ pub enum ApiServerStorageError {
6968
AddressableError,
7069
#[error("Block timestamp too high: {0}")]
7170
TimestampTooHigh(BlockTimestamp),
72-
#[error("Tx global index to hight: {0}")]
71+
#[error("Tx global index too hight: {0}")]
7372
TxGlobalIndexTooHigh(u64),
7473
#[error("Id creation error: {0}")]
7574
IdCreationError(#[from] IdCreationError),
@@ -590,18 +589,6 @@ pub struct TokenTransaction {
590589
pub tx_id: Id<Transaction>,
591590
}
592591

593-
impl PartialOrd for TokenTransaction {
594-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
595-
Some(self.cmp(other))
596-
}
597-
}
598-
599-
impl Ord for TokenTransaction {
600-
fn cmp(&self, other: &Self) -> Ordering {
601-
self.tx_id.cmp(&other.tx_id)
602-
}
603-
}
604-
605592
#[async_trait::async_trait]
606593
pub trait ApiServerStorageRead: Sync {
607594
async fn is_initialized(&self) -> Result<bool, ApiServerStorageError>;
@@ -630,10 +617,9 @@ pub trait ApiServerStorageRead: Sync {
630617
address: &str,
631618
) -> Result<Vec<Id<Transaction>>, ApiServerStorageError>;
632619

633-
/// Return a page of TX IDs that reference this token_id, with a limit of len and older
634-
/// tx_global_index than the specified.
635-
/// The tx_global_index is only ordered by block height and are not continuous for a specific
636-
/// token_id.
620+
/// Returns a page of transaction IDs that reference this `token_id`, limited to `len` entries
621+
/// and with a `tx_global_index` older than the specified value.
622+
/// The `tx_global_index` and is not continuous for a specific `token_id`.
637623
async fn get_token_transactions(
638624
&self,
639625
token_id: TokenId,
@@ -866,12 +852,15 @@ pub trait ApiServerStorageWrite: ApiServerStorageRead {
866852
block_height: BlockHeight,
867853
) -> Result<(), ApiServerStorageError>;
868854

869-
/// Append new token transactions with increasing tx_global_index at this block height
870-
async fn set_token_transactions_at_height(
855+
/// Sets the `token_id`–`transaction_id` pair at the specified `block_height` along with the
856+
/// `tx_global_index`.
857+
/// If the pair already exists at that `block_height`, the `tx_global_index` is updated.
858+
async fn set_token_transaction_at_height(
871859
&mut self,
872860
token_id: TokenId,
873-
transaction_ids: BTreeSet<Id<Transaction>>,
861+
transaction_id: Id<Transaction>,
874862
block_height: BlockHeight,
863+
tx_global_index: u64,
875864
) -> Result<(), ApiServerStorageError>;
876865

877866
async fn set_mainchain_block(

0 commit comments

Comments
 (0)