-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgresjs.ts
More file actions
202 lines (191 loc) · 6.21 KB
/
Copy pathpostgresjs.ts
File metadata and controls
202 lines (191 loc) · 6.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import { Pool, type PoolConfig } from "pg";
import Cursor from "pg-cursor";
import {
type Postgres,
type PostgresTransaction,
PostgresVersion,
} from "@query-doctor/core";
import { Connectable } from "../sync/connectable.ts";
import { log } from "../log.ts";
const DEFAULT_ITEMS_PER_PAGE = 20;
// we want to set a very low idle timeout to prevent
// clogging up connections
const DEFAULT_IDLE_TIMEOUT_MS = 15_000;
// it's ok to recycle connections frequently if needed
const DEFAULT_MAX_LIFETIME_MS = 60 * 5 * 1000;
/**
* Connecting to the local optimizer
*/
export function connectToOptimizer(connectable: Connectable) {
const hostname = connectable.url.searchParams.get("host");
const baseConfig: PoolConfig = {
max: 100,
};
if (hostname) {
const database = connectable.url.pathname.slice(1);
const config: PoolConfig = {
...baseConfig,
user: "postgres",
database,
host: hostname,
};
const pool = new Pool(config);
return wrapPgPool(pool);
} else {
log.info(
`Connecting to optimizing db ${connectable} using custom POSTGRES_URL`,
"postgres",
);
return connect(connectable, baseConfig);
}
}
/**
* Connect to the source database to pull data out.
* We have to be a lot more conservative here
* and make sure the connections drop asap to prevent
* exhausting them
*/
export function connectToSource(
connectable: Connectable,
) {
const config: PoolConfig = {
max: 20,
idleTimeoutMillis: DEFAULT_IDLE_TIMEOUT_MS,
allowExitOnIdle: true,
};
return connect(connectable, config);
}
/**
* node-pg treats sslmode=require as rejectUnauthorized: true,
* but PostgreSQL semantics for "require" only mean "encrypt the connection"
* without verifying the server certificate. This breaks self-signed certs.
*/
export function getSslConfig(connectable: Connectable): PoolConfig["ssl"] {
const sslmode = connectable.url.searchParams.get("sslmode");
if (!sslmode || sslmode === "disable") return undefined;
if (sslmode === "verify-full" || sslmode === "verify-ca") return true;
// require, prefer, allow — encrypt but accept self-signed certificates
return { rejectUnauthorized: false };
}
function connect(connectable: Connectable, config: PoolConfig) {
const ssl = getSslConfig(connectable);
// Strip sslmode from the connection string so pg-connection-string
// doesn't override our explicit ssl config (it treats require as verify-full)
const url = new URL(connectable.toString());
url.searchParams.delete("sslmode");
const pool = new Pool({
...config,
connectionString: url.toString(),
...(ssl !== undefined && { ssl }),
});
return wrapPgPool(pool);
}
/**
* Arrays are supported BUT only for primitive values.
* For anything else jsonb has to be serialized
*/
function serializeArray(arr: unknown[]): unknown[] | string {
const allPrimitiveValues = arr.every((v) => typeof v === "string" || typeof v === "number" || typeof v === "boolean");
if (allPrimitiveValues) {
return arr;
}
return JSON.stringify(arr);
}
/**
* node-postgres does not serialize jsonb in an expected way
*/
function serializeParams(params?: unknown[]): unknown[] | undefined {
if (!params) return params;
return params.map((p) => {
if (p === null || p === undefined) {
return p;
}
if (Array.isArray(p) && p.length > 0) {
return serializeArray(p);
}
if (typeof p === "object" && !(p instanceof Buffer)) {
return JSON.stringify(p);
}
return p;
});
}
export function wrapPgPool(pool: Pool): Postgres {
// Handle idle client errors to prevent process crashes.
// Expected during DROP DATABASE ... WITH (FORCE) which terminates
// all connections to the target database.
pool.on("error", (err) => {
log.warn(`Pool idle client error: ${err.message}`, "postgres");
});
return {
exec: async (query, params) => {
const result = await pool.query(query, serializeParams(params) as any[]);
return result.rows;
},
serverNum: async () => {
const result = await pool.query("show server_version_num");
return PostgresVersion.parse(result.rows[0].server_version_num);
},
transaction: async <T>(
callback: (tx: PostgresTransaction) => Promise<T>,
) => {
const client = await pool.connect();
try {
await client.query("BEGIN");
let savepointCounter = 0;
// Serialize exec calls to prevent savepoint interleaving
// when callers use Promise.all (matches postgres.js behavior)
let queue: Promise<void> = Promise.resolve();
const transaction: PostgresTransaction = {
exec: (query, params) => {
const doExec = async () => {
const sp = "sp_" + savepointCounter++;
await client.query("SAVEPOINT " + sp);
try {
const result = await client.query(query, serializeParams(params) as any[]);
await client.query("RELEASE SAVEPOINT " + sp);
return result.rows;
} catch (error) {
await client.query("ROLLBACK TO SAVEPOINT " + sp);
throw error;
}
};
const result = queue.then(doExec, doExec);
queue = result.then(() => { }, () => { });
return result;
},
};
const result = await callback(transaction);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
},
async *cursor<T>(
query: string,
params?: unknown[],
options?: { size?: number },
) {
const client = await pool.connect();
try {
const cursor = client.query(new Cursor(query, serializeParams(params) as any[]));
const batchSize = options?.size ?? DEFAULT_ITEMS_PER_PAGE;
let rows = await cursor.read(batchSize);
while (rows.length > 0) {
yield* rows as T[];
rows = await cursor.read(batchSize);
}
await cursor.close();
} finally {
client.release();
}
},
// @ts-expect-error | this will be added to the pg interface later
close() {
return pool.end();
},
};
}