Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export class ChannelError extends Error implements grpc.ServiceError {
interface StreamTracked {
stream?: PullStream;
receivedStatus?: boolean;
pingTimeout?: NodeJS.Timeout;
}

/**
Expand Down Expand Up @@ -227,6 +228,9 @@ export class MessageStream extends PassThrough {

for (let i = 0; i < this._streams.length; i++) {
const tracker = this._streams[i];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
}
if (tracker.stream) {
this._removeStream(i, 'overall message stream destroyed', 'n/a');
}
Expand Down Expand Up @@ -254,6 +258,8 @@ export class MessageStream extends PassThrough {
tracker.stream = stream;
tracker.receivedStatus = false;

this._resetPingTimer(index);

stream
.on('error', err => this._onError(index, err))
.once('status', status => this._onStatus(index, status))
Expand All @@ -264,10 +270,30 @@ export class MessageStream extends PassThrough {
// Mark this stream as alive again. (reset backoff)
const tracker = this._streams[index];
this._retrier.reset(tracker);
this._resetPingTimer(index);

this.emit('data', data);
}

private _resetPingTimer(index: number): void {
const tracker = this._streams[index];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
}
// We expect a packet from the server at least once every 30 seconds.
// Give it a 1-second grace period.
tracker.pingTimeout = setTimeout(() => {
this._removeStream(
index,
'stream inactive for longer than 30 seconds',
'will be retried',
);
this._retrier.retryLater(tracker, () =>
this._fillOne(index, undefined, 'retry'),
);
}, 31000);
}

/**
* Attempts to create and cache the desired number of StreamingPull requests.
* gRPC does not supply a way to confirm that a stream is connected, so our
Expand Down Expand Up @@ -347,6 +373,8 @@ export class MessageStream extends PassThrough {
maxOutstandingBytes: this._subscriber.useLegacyFlowControl
? 0
: this._subscriber.maxBytes,
clientId: 'node-pubsub',
protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities
};
const otherArgs = {
headers: {
Expand Down Expand Up @@ -511,6 +539,10 @@ export class MessageStream extends PassThrough {
whatNext?: string,
): void {
const tracker = this._streams[index];
if (tracker.pingTimeout) {
clearTimeout(tracker.pingTimeout);
tracker.pingTimeout = undefined;
}
if (tracker.stream) {
logs.subscriberStreams.info(
'closing stream %i; why: %s; next: %s',
Expand Down
Loading