Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions internal-packages/replication/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import { LogicalReplicationClientError } from "./errors.js";
import { PgoutputMessage, PgoutputParser, getPgoutputStartReplicationSQL } from "./pgoutput.js";
import { startSpan, trace, Tracer } from "@internal/tracing";

/**
* Milliseconds between the Unix epoch (1970-01-01) and the Postgres epoch
* (2000-01-01), used to convert between Postgres replication timestamps and
* `Date.now()`. Mirrors `(POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY`
* in pgoutput.ts (946684800000000 micros).
*/
const POSTGRES_EPOCH_MS = 946684800000;

export interface LogicalReplicationClientOptions {
/**
* The pg client config.
Expand Down Expand Up @@ -363,7 +371,7 @@ export class LogicalReplicationClient {
} else if (buffer[0] === 0x6b) {
// Primary keepalive message
const timestamp = Math.floor(
buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + 946080000000
buffer.readUInt32BE(9) * 4294967.296 + buffer.readUInt32BE(13) / 1000 + POSTGRES_EPOCH_MS
);
const shouldRespond = !!buffer.readInt8(17);
this.events.emit("heartbeat", { lsn, timestamp, shouldRespond });
Expand Down Expand Up @@ -658,7 +666,7 @@ export class LogicalReplicationClient {
const slice = lsn.split("/");
let [upperWAL, lowerWAL]: [number, number] = [parseInt(slice[0], 16), parseInt(slice[1], 16)];
// Timestamp as microseconds since midnight 2000-01-01
const now = Date.now() - 946080000000;
const now = Date.now() - POSTGRES_EPOCH_MS;
const upperTimestamp = Math.floor(now / 4294967.296);
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296);
Comment thread
matt-aitken marked this conversation as resolved.
if (lowerWAL === 4294967295) {
Expand Down