Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Added

- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394)
- Introduced block explorer [#381](https://github.com/proto-kit/framework/pull/381)
- Added CircuitAnalysisModule for easy analysis of protocol circuits [#379](https://github.com/proto-kit/framework/pull/379)
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/tasks/IndexPendingTxTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class IndexPendingTxTask

public async compute(input: PendingTransaction): Promise<string | void> {
try {
await this.transactionStorage.pushUserTransaction(input);
await this.transactionStorage.pushUserTransaction(input, 0);
return "";
} catch (err) {
log.error("Failed to process pending tx task", err);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- CreateTable
CREATE TABLE "TransactionPriority" (
"transactionHash" TEXT NOT NULL,
"priority" BIGINT NOT NULL,

CONSTRAINT "TransactionPriority_pkey" PRIMARY KEY ("transactionHash")
);

-- AddForeignKey
ALTER TABLE "TransactionPriority" ADD CONSTRAINT "TransactionPriority_transactionHash_fkey" FOREIGN KEY ("transactionHash") REFERENCES "Transaction"("hash") ON DELETE RESTRICT ON UPDATE CASCADE;
12 changes: 12 additions & 0 deletions packages/persistance/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ model Transaction {
executionResult TransactionExecutionResult?

IncomingMessageBatchTransaction IncomingMessageBatchTransaction[]

priority TransactionPriority?
}

model TransactionPriority {
transactionHash String

priority BigInt

Transaction Transaction @relation(fields: [transactionHash], references: [hash])

@@id([transactionHash])
}

model TransactionExecutionResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export class PrismaTransactionStorage implements TransactionStorage {
equals: false,
},
},
orderBy: {
priority: {
priority: "desc",
},
},
skip: offset,
take: limit,
});
Expand All @@ -56,13 +61,27 @@ export class PrismaTransactionStorage implements TransactionStorage {
}
}

public async pushUserTransaction(tx: PendingTransaction): Promise<boolean> {
public async pushUserTransaction(
tx: PendingTransaction,
priority: number
): Promise<boolean> {
const { prismaClient } = this.connection;

const result = await prismaClient.transaction.createMany({
data: [this.transactionMapper.mapOut(tx)],
skipDuplicates: true,
});
const transactionData = this.transactionMapper.mapOut(tx);

const [result] = await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: [transactionData],
skipDuplicates: true,
}),

prismaClient.transactionPriority.create({
data: {
priority,
transactionHash: transactionData.hash,
},
}),
]);

return result.count === 1;
}
Expand Down
4 changes: 1 addition & 3 deletions packages/sdk/test/fees-multi-zkprograms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ describe("check fee analyzer", () => {
},
},
Sequencer: {
Mempool: {
validationEnabled: true,
},
Mempool: {},
},
});

Expand Down
2 changes: 2 additions & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export * from "./mempool/Mempool";
export * from "./mempool/PendingTransaction";
export * from "./mempool/CompressedSignature";
export * from "./mempool/private/PrivateMempool";
export * from "./mempool/sorting/MempoolSorting";
export * from "./mempool/sorting/DefaultMempoolSorting";
export * from "./sequencer/executor/Sequencer";
export * from "./sequencer/executor/Sequenceable";
export * from "./sequencer/SequencerIdProvider";
Expand Down
44 changes: 40 additions & 4 deletions packages/sequencer/src/mempool/private/PrivateMempool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,38 @@ import { TransactionValidator } from "../verification/TransactionValidator";
import { Tracer } from "../../logging/Tracer";
import { trace } from "../../logging/trace";
import { IncomingMessagesService } from "../../settlement/messages/IncomingMessagesService";
import { MempoolSorting } from "../sorting/MempoolSorting";
import { DefaultMempoolSorting } from "../sorting/DefaultMempoolSorting";

type PrivateMempoolConfig = {
type?: "hybrid" | "private" | "based";
};

@sequencerModule()
export class PrivateMempool extends SequencerModule implements Mempool {
export class PrivateMempool
extends SequencerModule<PrivateMempoolConfig>
implements Mempool
{
public readonly events = new EventEmitter<MempoolEvents>();

private readonly mempoolSorting: MempoolSorting;

public constructor(
private readonly transactionValidator: TransactionValidator,
@inject("TransactionStorage")
private readonly transactionStorage: TransactionStorage,
@inject("IncomingMessagesService", { isOptional: true })
private readonly messageService: IncomingMessagesService | undefined,
@inject("Tracer") public readonly tracer: Tracer
@inject("Tracer") public readonly tracer: Tracer,
@inject("MempoolSorting", { isOptional: true })
mempoolSorting: MempoolSorting | undefined
) {
super();
this.mempoolSorting = mempoolSorting ?? new DefaultMempoolSorting();
}

private type() {
return this.config.type ?? "hybrid";
}

public async length(): Promise<number> {
Expand All @@ -36,7 +54,12 @@ export class PrivateMempool extends SequencerModule implements Mempool {
public async add(tx: PendingTransaction): Promise<boolean> {
const [txValid, error] = this.transactionValidator.validateTx(tx);
if (txValid) {
const success = await this.transactionStorage.pushUserTransaction(tx);
const sortingValue = this.mempoolSorting!.presortingPriority(tx);

const success = await this.transactionStorage.pushUserTransaction(
tx,
sortingValue
);
if (success) {
this.events.emit("mempool-transaction-added", tx);
log.trace(`Transaction added to mempool: ${tx.hash().toString()}`);
Expand Down Expand Up @@ -69,14 +92,27 @@ export class PrivateMempool extends SequencerModule implements Mempool {
offset?: number,
limit?: number
): Promise<PendingTransaction[]> {
return await this.transactionStorage.getPendingUserTransactions(
if (this.type() === "based") {
return [];
}

let txs = await this.transactionStorage.getPendingUserTransactions(
offset ?? 0,
limit
);

if (this.mempoolSorting.enablePostSorting()) {
txs = this.mempoolSorting.postSorting(txs);
}

return txs;
}

@trace("mempool.get_mandatory_txs")
public async getMandatoryTxs(): Promise<PendingTransaction[]> {
if (this.type() === "private") {
return [];
}
return (await this.messageService?.getPendingMessages()) ?? [];
}

Expand Down
32 changes: 32 additions & 0 deletions packages/sequencer/src/mempool/sorting/DefaultMempoolSorting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { noop } from "@proto-kit/common";

import { PendingTransaction } from "../PendingTransaction";
import {
SequencerModule,
sequencerModule,
} from "../../sequencer/builder/SequencerModule";

import { MempoolSorting } from "./MempoolSorting";

@sequencerModule()
export class DefaultMempoolSorting
extends SequencerModule
implements MempoolSorting
{
public async start() {
noop();
}

public enablePostSorting(): boolean {
return false;
}

public postSorting(transactions: PendingTransaction[]): PendingTransaction[] {
return transactions;
}

public presortingPriority(tx: PendingTransaction): number {
// This means we order by first in, first out in the db
return Date.UTC(2500, 0) - Date.now();
}
}
26 changes: 26 additions & 0 deletions packages/sequencer/src/mempool/sorting/MempoolSorting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { PendingTransaction } from "../PendingTransaction";

export interface MempoolSorting {
/**
* Presorting happens on the backend (i.e. the DB), before the data travels to the sequencer.
* It's very fast, but limited to only integer sorting.
* The value returned here has to be static per transaction, since it will be sorted and
* compared on the DB-side.
*
* @param tx
* @returns Priority of the transaction - larger is better (therefore will be
* put in the block first)
*/
presortingPriority(tx: PendingTransaction): number;

/**
* Indicate whether to do pre-sorting (as it's expensive depending on your block size)
*/
enablePostSorting(): boolean;

/**
* Postsorting happens on the sequencer-side. It's less fast but can take in any two
* transactions and directly compare them based on arbitrary logic
*/
postSorting(transactions: PendingTransaction[]): PendingTransaction[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { InMemoryBatchStorage } from "./InMemoryBatchStorage";

@injectable()
export class InMemoryTransactionStorage implements TransactionStorage {
private queue: PendingTransaction[] = [];
private queue: { tx: PendingTransaction; sortingValue: number }[] = [];

private latestScannedBlock = -1;

Expand All @@ -21,12 +21,17 @@ export class InMemoryTransactionStorage implements TransactionStorage {

public async removeTx(hashes: string[]) {
const hashSet = new Set(hashes);
this.queue = this.queue.filter((tx) => {
this.queue = this.queue.filter(({ tx }) => {
const hash = tx.hash().toString();
return !hashSet.has(hash);
});
}

private sortQueue() {
// Sort in-place and descending
this.queue.sort(({ sortingValue: a }, { sortingValue: b }) => b - a);
}

public async getPendingUserTransactions(
offset: number,
limit?: number
Expand All @@ -42,25 +47,30 @@ export class InMemoryTransactionStorage implements TransactionStorage {
if (block !== undefined) {
const hashes = block.transactions.map((tx) => tx.tx.hash().toString());
this.queue = this.queue.filter(
(tx) => !hashes.includes(tx.hash().toString())
({ tx }) => !hashes.includes(tx.hash().toString())
);
}
}
this.latestScannedBlock = nextHeight - 1;

this.sortQueue();

const from = offset ?? 0;
const to = limit !== undefined ? from + limit : undefined;

return this.queue.slice(from, to);
return this.queue.slice(from, to).map(({ tx }) => tx);
}

public async pushUserTransaction(tx: PendingTransaction): Promise<boolean> {
public async pushUserTransaction(
tx: PendingTransaction,
priority: number
): Promise<boolean> {
const notInQueue =
this.queue.find(
(tx2) => tx2.hash().toString() === tx.hash().toString()
({ tx: tx2 }) => tx2.hash().toString() === tx.hash().toString()
) === undefined;
if (notInQueue) {
this.queue.push(tx);
this.queue.push({ tx, sortingValue: priority });
}
return notInQueue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { PendingTransaction } from "../../mempool/PendingTransaction";

export interface TransactionStorage {
pushUserTransaction: (tx: PendingTransaction) => Promise<boolean>;
pushUserTransaction: (
tx: PendingTransaction,
priority: number
) => Promise<boolean>;

getPendingUserTransactions: (
offset: number,
Expand Down
4 changes: 1 addition & 3 deletions packages/sequencer/test-integration/benchmarks/tps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ export async function createAppChain() {
maximumBlockSize: 100,
},
BlockTrigger: {},
Mempool: {
validationEnabled: false,
},
Mempool: {},
},
Signer: {
signer: PrivateKey.random(),
Expand Down
Loading