diff --git a/internal-packages/replication/src/client.ts b/internal-packages/replication/src/client.ts index 1a7ddb27236..aa4284d0417 100644 --- a/internal-packages/replication/src/client.ts +++ b/internal-packages/replication/src/client.ts @@ -9,6 +9,14 @@ import { LogicalReplicationClientError } from "./errors.js"; import { PgoutputMessage, PgoutputParser, getPgoutputStartReplicationSQL } from "./pgoutput.js"; import { startSpan, trace, Tracer } from "@internal/tracing"; +/** + * Milliseconds between the Unix epoch (1970-01-01) and the Postgres epoch + * (2000-01-01), used to convert between Postgres replication timestamps and + * `Date.now()`. Mirrors `(POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY` + * in pgoutput.ts (946684800000000 micros). + */ +const POSTGRES_EPOCH_MS = 946684800000; + export interface LogicalReplicationClientOptions { /** * The pg client config. @@ -363,7 +371,7 @@ export class LogicalReplicationClient { } else if (buffer[0] === 0x6b) { // Primary keepalive message const timestamp = Math.floor( - buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000 + buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + POSTGRES_EPOCH_MS ); const shouldRespond = !!buffer.readInt8(17); this.events.emit("heartbeat", { lsn, timestamp, shouldRespond }); @@ -658,7 +666,7 @@ export class LogicalReplicationClient { const slice = lsn.split("/"); let [upperWAL, lowerWAL]: [number, number] = [parseInt(slice[0], 16), parseInt(slice[1], 16)]; // Timestamp as microseconds since midnight 2000-01-01 - const now = Date.now() - 946080000000; + const now = Date.now() - POSTGRES_EPOCH_MS; const upperTimestamp = Math.floor(now / 4294967.296); const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296); if (lowerWAL === 4294967295) {