Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
return disabled;
}

async createIndex(
table: string,
columns: { name: string; order: "asc" | "desc" }[],
): Promise<void> {
const columnDefs = columns
.map((c) => {
const quoted = PgIdentifier.fromString(c.name);
return c.order === "desc" ? `${quoted} DESC` : `${quoted}`;
})
.join(", ");
const quotedTable = PgIdentifier.fromString(table);
const pg = this.manager.getOrCreateConnection(this.connectable);
await pg.exec(`CREATE INDEX ON ${quotedTable}(${columnDefs})`);
this.restart();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can kind of see how things went wrong here


/**
* Insert new queries to be processed. The {@link start} method must
* have been called previously for this to take effect
Expand Down
16 changes: 16 additions & 0 deletions src/remote/remote-controller.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,19 @@ export const ToggleIndexDto = z.object({
});

export type ToggleIndexDto = z.infer<typeof ToggleIndexDto>;

export const CreateIndexDto = z.object({
connectionString: z.string().min(1),
table: z.string().min(1),
columns: z
.array(
z.object({
name: z.string().min(1),
order: z.enum(["asc", "desc"]),
}),
)
.min(1),
});

export type CreateIndexDto = z.infer<typeof CreateIndexDto>;

91 changes: 91 additions & 0 deletions src/remote/remote-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,94 @@ Deno.test({
}
},
});

Deno.test({
name: "creating an index via endpoint adds it to the optimizing db",
sanitizeOps: false,
sanitizeResources: false,
fn: async () => {
const [sourceDb, targetDb] = await Promise.all([
new PostgreSqlContainer("postgres:17")
.withCopyContentToContainer([
{
content: `
create table testing(a int, b text);
insert into testing values (1, 'hello');
create extension pg_stat_statements;
`,
target: "/docker-entrypoint-initdb.d/init.sql",
},
])
.withCommand(["-c", "shared_preload_libraries=pg_stat_statements"])
.start(),
new PostgreSqlContainer("postgres:17").start(),
]);
const controller = new AbortController();

const target = Connectable.fromString(targetDb.getConnectionUri());
const source = Connectable.fromString(sourceDb.getConnectionUri());
const sourceOptimizer = ConnectionManager.forLocalDatabase();

const remote = new RemoteController(
new Remote(target, sourceOptimizer),
);

const server = Deno.serve(
{ port: 0, signal: controller.signal },
async (req: Request): Promise<Response> => {
const result = await remote.execute(req);
if (!result) {
return new Response("Not found", { status: 404 });
}
return result;
},
);

try {
// First sync the database
const syncResponse = await fetch(
`http://localhost:${server.addr.port}/postgres`,
{
method: "POST",
body: RemoteSyncRequest.encode({ db: source }),
},
);
assertEquals(syncResponse.status, 200);

const sql = postgres(
target.withDatabaseName(Remote.optimizingDbName).toString(),
);

// Verify no indexes exist initially
const indexesBefore =
await sql`select * from pg_indexes where schemaname = 'public'`;
assertEquals(indexesBefore.count, 0);

// Create an index via the endpoint
const createResponse = await fetch(
`http://localhost:${server.addr.port}/postgres/indexes`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
connectionString: sourceDb.getConnectionUri(),
table: "testing",
columns: [{ name: "a", order: "asc" }],
}),
},
);

assertEquals(createResponse.status, 200);
const body = await createResponse.json();
assertEquals(body.success, true);

// Verify the index was created on the optimizing db
const indexesAfter =
await sql`select * from pg_indexes where schemaname = 'public'`;
assertEquals(indexesAfter.count, 1);
assertEquals(indexesAfter[0].tablename, "testing");
} finally {
await Promise.all([sourceDb.stop(), targetDb.stop(), server.shutdown()]);
}
},
});
35 changes: 33 additions & 2 deletions src/remote/remote-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import { RemoteSyncRequest } from "./remote.dto.ts";
import { Remote } from "./remote.ts";
import * as errors from "../sync/errors.ts";
import type { OptimizedQuery } from "../sql/recent-query.ts";
import { ToggleIndexDto } from "./remote-controller.dto.ts";
import {
CreateIndexDto,
ToggleIndexDto,
} from "./remote-controller.dto.ts";
import { ZodError } from "zod";
import { PgIdentifier } from "@query-doctor/core";

const SyncStatus = {
NOT_STARTED: "notStarted",
Expand Down Expand Up @@ -62,6 +64,14 @@ export class RemoteController {
}
return methodNotAllowed();
}

if (url.pathname === "/postgres/indexes") {
if (request.method === "POST") {
return await this.createIndex(request);
}
return methodNotAllowed();
}

}

private hookUpWebsockets(remote: Remote) {
Expand Down Expand Up @@ -181,6 +191,27 @@ export class RemoteController {
}
}

private async createIndex(request: Request): Promise<Response> {
try {
const data = await request.json();
const body = CreateIndexDto.parse(data);
await this.remote.optimizer.createIndex(body.table, body.columns);
return Response.json({ success: true });
} catch (error) {
if (error instanceof ZodError) {
return Response.json({
type: "error",
error: "invalid_body",
message: error.message,
}, { status: 400 });
}
console.error("Failed to create index:", error);
return Response.json({
error: error instanceof Error ? error.message : "Failed to create index",
}, { status: 500 });
}
}

private onWebsocketRequest(request: Request): Response {
const { socket, response } = Deno.upgradeWebSocket(request);
this.socket = socket;
Expand Down