extends RespType<
+ RESP_TYPES['MAP'],
+ {
+ [P in T[number] as ExtractMapKey]: P[1];
+ },
+ Map, T[number][1]> | FlattenTuples
+> {}
+
+type FlattenTuples = (
+ T extends [] ? [] :
+ T extends [MapKeyValue] ? T[0] :
+ T extends [MapKeyValue, ...infer R] ? [
+ ...T[0],
+ ...FlattenTuples
+ ] :
+ never
+);
+
+export type ReplyUnion = (
+ NullReply |
+ BooleanReply |
+ NumberReply |
+ BigNumberReply |
+ DoubleReply |
+ SimpleStringReply |
+ BlobStringReply |
+ VerbatimStringReply |
+ SimpleErrorReply |
+ BlobErrorReply |
+ ArrayReply |
+ SetReply |
+ MapReply
+);
+
+export type MappedType = ((...args: any) => T) | (new (...args: any) => T);
+
+type InferTypeMapping = T extends RespType ? FLAG_TYPES : never;
+
+export type TypeMapping = {
+ [P in RespTypes]?: MappedType>>>;
+};
+
+type MapKey<
+ T,
+ TYPE_MAPPING extends TypeMapping
+> = ReplyWithTypeMapping;
+
+export type UnwrapReply> = REPLY['DEFAULT' | 'TYPES'];
+
+export type ReplyWithTypeMapping<
+ REPLY,
+ TYPE_MAPPING extends TypeMapping
+> = (
+ // if REPLY is a type, extract the coresponding type from TYPE_MAPPING or use the default type
+ REPLY extends RespType ?
+ TYPE_MAPPING[RESP_TYPE] extends MappedType ?
+ ReplyWithTypeMapping, TYPE_MAPPING> :
+ ReplyWithTypeMapping
+ : (
+ // if REPLY is a known generic type, convert its generic arguments
+ // TODO: tuples?
+ REPLY extends Array ? Array> :
+ REPLY extends Set ? Set> :
+ REPLY extends Map ? Map, ReplyWithTypeMapping> :
+ // `Date | Buffer | Error` are supersets of `Record`, so they need to be checked first
+ REPLY extends Date | Buffer | Error ? REPLY :
+ REPLY extends Record ? {
+ [P in keyof REPLY]: ReplyWithTypeMapping;
+ } :
+ // otherwise, just return the REPLY as is
+ REPLY
+ )
+);
+
+export type TransformReply = (this: void, reply: any, preserve?: any, typeMapping?: TypeMapping) => any; // TODO;
+
+export type RedisArgument = string | Buffer;
+
+export type CommandArguments = Array & { preserve?: unknown };
+
+// export const REQUEST_POLICIES = {
+// /**
+// * TODO
+// */
+// ALL_NODES: 'all_nodes',
+// /**
+// * TODO
+// */
+// ALL_SHARDS: 'all_shards',
+// /**
+// * TODO
+// */
+// SPECIAL: 'special'
+// } as const;
+
+// export type REQUEST_POLICIES = typeof REQUEST_POLICIES;
+
+// export type RequestPolicies = REQUEST_POLICIES[keyof REQUEST_POLICIES];
+
+// export const RESPONSE_POLICIES = {
+// /**
+// * TODO
+// */
+// ONE_SUCCEEDED: 'one_succeeded',
+// /**
+// * TODO
+// */
+// ALL_SUCCEEDED: 'all_succeeded',
+// /**
+// * TODO
+// */
+// LOGICAL_AND: 'agg_logical_and',
+// /**
+// * TODO
+// */
+// SPECIAL: 'special'
+// } as const;
+
+// export type RESPONSE_POLICIES = typeof RESPONSE_POLICIES;
+
+// export type ResponsePolicies = RESPONSE_POLICIES[keyof RESPONSE_POLICIES];
+
+// export type CommandPolicies = {
+// request?: RequestPolicies | null;
+// response?: ResponsePolicies | null;
+// };
+
+export type Command = {
+ CACHEABLE?: boolean;
+ IS_READ_ONLY?: boolean;
+ /**
+ * @internal
+ * TODO: remove once `POLICIES` is implemented
+ */
+ IS_FORWARD_COMMAND?: boolean;
+ NOT_KEYED_COMMAND?: true;
+ // POLICIES?: CommandPolicies;
+ parseCommand(this: void, parser: CommandParser, ...args: Array): void;
+ TRANSFORM_LEGACY_REPLY?: boolean;
+ transformReply: TransformReply | Record;
+ unstableResp3?: boolean;
+};
+
+export type RedisCommands = Record;
+
+export type RedisModules = Record;
+
+export interface RedisFunction extends Command {
+ NUMBER_OF_KEYS?: number;
+}
+
+export type RedisFunctions = Record>;
+
+export type RedisScript = RedisScriptConfig & SHA1;
+
+export type RedisScripts = Record;
+
+// TODO: move to Commander?
+export interface CommanderConfig<
+ M extends RedisModules,
+ F extends RedisFunctions,
+ S extends RedisScripts,
+ RESP extends RespVersions
+> {
+ modules?: M;
+ functions?: F;
+ scripts?: S;
+ /**
+ * TODO
+ */
+ RESP?: RESP;
+ /**
+ * TODO
+ */
+ unstableResp3?: boolean;
+}
+
+type Resp2Array = (
+ T extends [] ? [] :
+ T extends [infer ITEM] ? [Resp2Reply- ] :
+ T extends [infer ITEM, ...infer REST] ? [
+ Resp2Reply
- ,
+ ...Resp2Array
+ ] :
+ T extends Array ? Array> :
+ never
+);
+
+export type Resp2Reply = (
+ RESP3REPLY extends RespType ?
+ // TODO: RESP3 only scalar types
+ RESP_TYPE extends RESP_TYPES['DOUBLE'] ? BlobStringReply :
+ RESP_TYPE extends RESP_TYPES['ARRAY'] | RESP_TYPES['SET'] ? RespType<
+ RESP_TYPE,
+ Resp2Array
+ > :
+ RESP_TYPE extends RESP_TYPES['MAP'] ? RespType<
+ RESP_TYPES['ARRAY'],
+ Resp2Array>>
+ > :
+ RESP3REPLY :
+ RESP3REPLY
+);
+
+export type RespVersions = 2 | 3;
+
+export type CommandReply<
+ COMMAND extends Command,
+ RESP extends RespVersions
+> = (
+ // if transformReply is a function, use its return type
+ COMMAND['transformReply'] extends (...args: any) => infer T ? T :
+ // if transformReply[RESP] is a function, use its return type
+ COMMAND['transformReply'] extends Record infer T> ? T :
+ // otherwise use the generic reply type
+ ReplyUnion
+);
+
+export type CommandSignature<
+ COMMAND extends Command,
+ RESP extends RespVersions,
+ TYPE_MAPPING extends TypeMapping
+> = (...args: Tail>) => Promise, TYPE_MAPPING>>;
+
+// export type CommandWithPoliciesSignature<
+// COMMAND extends Command,
+// RESP extends RespVersions,
+// TYPE_MAPPING extends TypeMapping,
+// POLICIES extends CommandPolicies
+// > = (...args: Parameters) => Promise<
+// ReplyWithPolicy<
+// ReplyWithTypeMapping, TYPE_MAPPING>,
+// MergePolicies
+// >
+// >;
+
+// export type MergePolicies<
+// COMMAND extends Command,
+// POLICIES extends CommandPolicies
+// > = Omit & POLICIES;
+
+// type ReplyWithPolicy<
+// REPLY,
+// POLICIES extends CommandPolicies,
+// > = (
+// POLICIES['request'] extends REQUEST_POLICIES['SPECIAL'] ? never :
+// POLICIES['request'] extends null | undefined ? REPLY :
+// unknown extends POLICIES['request'] ? REPLY :
+// POLICIES['response'] extends RESPONSE_POLICIES['SPECIAL'] ? never :
+// POLICIES['response'] extends RESPONSE_POLICIES['ALL_SUCCEEDED' | 'ONE_SUCCEEDED' | 'LOGICAL_AND'] ? REPLY :
+// // otherwise, return array of replies
+// Array
+// );
diff --git a/packages/client/lib/RESP/verbatim-string.ts b/packages/client/lib/RESP/verbatim-string.ts
new file mode 100644
index 00000000000..92ff4fe3fb1
--- /dev/null
+++ b/packages/client/lib/RESP/verbatim-string.ts
@@ -0,0 +1,8 @@
+export class VerbatimString extends String {
+ constructor(
+ public format: string,
+ value: string
+ ) {
+ super(value);
+ }
+}
diff --git a/packages/client/lib/authx/credentials-provider.ts b/packages/client/lib/authx/credentials-provider.ts
new file mode 100644
index 00000000000..667795be9b3
--- /dev/null
+++ b/packages/client/lib/authx/credentials-provider.ts
@@ -0,0 +1,102 @@
+import { Disposable } from './disposable';
+/**
+ * Provides credentials asynchronously.
+ */
+export interface AsyncCredentialsProvider {
+ readonly type: 'async-credentials-provider';
+ credentials: () => Promise
+}
+
+/**
+ * Provides credentials asynchronously with support for continuous updates via a subscription model.
+ * This is useful for environments where credentials are frequently rotated or updated or can be revoked.
+ */
+export interface StreamingCredentialsProvider {
+ readonly type: 'streaming-credentials-provider';
+
+ /**
+ * Provides initial credentials and subscribes to subsequent updates. This is used internally by the node-redis client
+ * to handle credential rotation and re-authentication.
+ *
+ * Note: The node-redis client manages the subscription lifecycle automatically. Users only need to implement
+ * onReAuthenticationError if they want to be notified about authentication failures.
+ *
+ * Error handling:
+ * - Errors received via onError indicate a fatal issue with the credentials stream
+ * - The stream is automatically closed(disposed) when onError occurs
+ * - onError typically mean the provider failed to fetch new credentials after retrying
+ *
+ * @example
+ * ```ts
+ * const provider = getStreamingProvider();
+ * const [initialCredentials, disposable] = await provider.subscribe({
+ * onNext: (newCredentials) => {
+ * // Handle credential update
+ * },
+ * onError: (error) => {
+ * // Handle fatal stream error
+ * }
+ * });
+ *
+ * @param listener - Callbacks to handle credential updates and errors
+ * @returns A Promise resolving to [initial credentials, cleanup function]
+ */
+ subscribe: (listener: StreamingCredentialsListener) => Promise<[BasicAuth, Disposable]>
+
+ /**
+ * Called when authentication fails or credentials cannot be renewed in time.
+ * Implement this to handle authentication errors in your application.
+ *
+ * @param error - Either a CredentialsError (invalid/expired credentials) or
+ * UnableToObtainNewCredentialsError (failed to fetch new credentials on time)
+ */
+ onReAuthenticationError: (error: ReAuthenticationError) => void;
+
+}
+
+/**
+ * Type representing basic authentication credentials.
+ */
+export type BasicAuth = { username?: string, password?: string }
+
+/**
+ * Callback to handle credential updates and errors.
+ */
+export type StreamingCredentialsListener = {
+ onNext: (credentials: T) => void;
+ onError: (e: Error) => void;
+}
+
+
+/**
+ * Providers that can supply authentication credentials
+ */
+export type CredentialsProvider = AsyncCredentialsProvider | StreamingCredentialsProvider
+
+/**
+ * Errors that can occur during re-authentication.
+ */
+export type ReAuthenticationError = CredentialsError | UnableToObtainNewCredentialsError
+
+/**
+ * Thrown when re-authentication fails with provided credentials .
+ * e.g. when the credentials are invalid, expired or revoked.
+ *
+ */
+export class CredentialsError extends Error {
+ constructor(message: string) {
+ super(`Re-authentication with latest credentials failed: ${message}`);
+ this.name = 'CredentialsError';
+ }
+
+}
+
+/**
+ * Thrown when new credentials cannot be obtained before current ones expire
+ */
+export class UnableToObtainNewCredentialsError extends Error {
+ constructor(message: string) {
+ super(`Unable to obtain new credentials : ${message}`);
+ this.name = 'UnableToObtainNewCredentialsError';
+ }
+}
\ No newline at end of file
diff --git a/packages/client/lib/authx/disposable.ts b/packages/client/lib/authx/disposable.ts
new file mode 100644
index 00000000000..ee4526a37bd
--- /dev/null
+++ b/packages/client/lib/authx/disposable.ts
@@ -0,0 +1,6 @@
+/**
+ * Represents a resource that can be disposed.
+ */
+export interface Disposable {
+ dispose(): void;
+}
\ No newline at end of file
diff --git a/packages/client/lib/authx/identity-provider.ts b/packages/client/lib/authx/identity-provider.ts
new file mode 100644
index 00000000000..a2d25c8f9db
--- /dev/null
+++ b/packages/client/lib/authx/identity-provider.ts
@@ -0,0 +1,22 @@
+/**
+ * An identity provider is responsible for providing a token that can be used to authenticate with a service.
+ */
+
+/**
+ * The response from an identity provider when requesting a token.
+ *
+ * note: "native" refers to the type of the token that the actual identity provider library is using.
+ *
+ * @type T The type of the native idp token.
+ * @property token The token.
+ * @property ttlMs The time-to-live of the token in epoch milliseconds extracted from the native token in local time.
+ */
+export type TokenResponse = { token: T, ttlMs: number };
+
+export interface IdentityProvider {
+ /**
+ * Request a token from the identity provider.
+ * @returns A promise that resolves to an object containing the token and the time-to-live in epoch milliseconds.
+ */
+ requestToken(): Promise>;
+}
\ No newline at end of file
diff --git a/packages/client/lib/authx/index.ts b/packages/client/lib/authx/index.ts
new file mode 100644
index 00000000000..ce611e1497f
--- /dev/null
+++ b/packages/client/lib/authx/index.ts
@@ -0,0 +1,15 @@
+export { TokenManager, TokenManagerConfig, TokenStreamListener, RetryPolicy, IDPError } from './token-manager';
+export {
+ CredentialsProvider,
+ StreamingCredentialsProvider,
+ UnableToObtainNewCredentialsError,
+ CredentialsError,
+ StreamingCredentialsListener,
+ AsyncCredentialsProvider,
+ ReAuthenticationError,
+ BasicAuth
+} from './credentials-provider';
+export { Token } from './token';
+export { IdentityProvider, TokenResponse } from './identity-provider';
+
+export { Disposable } from './disposable'
\ No newline at end of file
diff --git a/packages/client/lib/authx/token-manager.spec.ts b/packages/client/lib/authx/token-manager.spec.ts
new file mode 100644
index 00000000000..1cc2a207edc
--- /dev/null
+++ b/packages/client/lib/authx/token-manager.spec.ts
@@ -0,0 +1,588 @@
+import { strict as assert } from 'node:assert';
+import { Token } from './token';
+import { IDPError, RetryPolicy, TokenManager, TokenManagerConfig, TokenStreamListener } from './token-manager';
+import { IdentityProvider, TokenResponse } from './identity-provider';
+import { setTimeout } from 'timers/promises';
+
+describe('TokenManager', () => {
+
+ /**
+ * Helper function to delay execution for a given number of milliseconds.
+ * @param ms
+ */
+ const delay = (ms: number) => {
+ return setTimeout(ms);
+ }
+
+ /**
+ * IdentityProvider that returns a fixed test token for testing and doesn't handle TTL.
+ */
+ class TestIdentityProvider implements IdentityProvider {
+ requestToken(): Promise> {
+ return Promise.resolve({ token: 'test-token 1', ttlMs: 1000 });
+ }
+ }
+
+ /**
+ * Helper function to create a test token with a given TTL .
+ * @param ttlMs Time-to-live in milliseconds
+ */
+ const createToken = (ttlMs: number): Token => {
+ return new Token('test-token', ttlMs, 0);
+ };
+
+ /**
+ * Listener that records received tokens and errors for testing.
+ */
+ class TestListener implements TokenStreamListener {
+
+ public readonly receivedTokens: Token[] = [];
+ public readonly errors: IDPError[] = [];
+
+ onNext(token: Token): void {
+ this.receivedTokens.push(token);
+ }
+
+ onError(error: IDPError): void {
+ this.errors.push(error);
+ }
+ }
+
+ /**
+ * IdentityProvider that returns a sequence of tokens with a fixed delay simulating network latency.
+ * Used for testing token refresh scenarios.
+ */
+ class ControlledIdentityProvider implements IdentityProvider {
+ private tokenIndex = 0;
+ private readonly delayMs: number;
+ private readonly ttlMs: number;
+
+ constructor(
+ private readonly tokens: string[],
+ delayMs: number = 0,
+ tokenTTlMs: number = 100
+ ) {
+ this.delayMs = delayMs;
+ this.ttlMs = tokenTTlMs;
+ }
+
+ async requestToken(): Promise> {
+
+ if (this.tokenIndex >= this.tokens.length) {
+ throw new Error('No more test tokens available');
+ }
+
+ if (this.delayMs > 0) {
+ await setTimeout(this.delayMs);
+ }
+
+ return { token: this.tokens[this.tokenIndex++], ttlMs: this.ttlMs };
+ }
+
+ }
+
+ /**
+ * IdentityProvider that simulates various error scenarios with configurable behavior
+ */
+ class ErrorSimulatingProvider implements IdentityProvider {
+ private requestCount = 0;
+
+ constructor(
+ private readonly errorSequence: Array,
+ private readonly delayMs: number = 0,
+ private readonly ttlMs: number = 100
+ ) {}
+
+ async requestToken(): Promise> {
+
+ if (this.delayMs > 0) {
+ await delay(this.delayMs);
+ }
+
+ const result = this.errorSequence[this.requestCount];
+ this.requestCount++;
+
+ if (result instanceof Error) {
+ throw result;
+ } else if (typeof result === 'string') {
+ return { token: result, ttlMs: this.ttlMs };
+ } else {
+ throw new Error('No more responses configured');
+ }
+ }
+
+ getRequestCount(): number {
+ return this.requestCount;
+ }
+ }
+
+ describe('constructor validation', () => {
+ it('should throw error if ratio is greater than 1', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 1.1
+ };
+
+ assert.throws(
+ () => new TokenManager(new TestIdentityProvider(), config),
+ /expirationRefreshRatio must be less than or equal to 1/
+ );
+ });
+
+ it('should throw error if ratio is negative', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: -0.1
+ };
+
+ assert.throws(
+ () => new TokenManager(new TestIdentityProvider(), config),
+ /expirationRefreshRatio must be greater or equal to 0/
+ );
+ });
+
+ it('should accept ratio of 1', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 1
+ };
+
+ assert.doesNotThrow(
+ () => new TokenManager(new TestIdentityProvider(), config)
+ );
+ });
+
+ it('should accept ratio of 0', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0
+ };
+
+ assert.doesNotThrow(
+ () => new TokenManager(new TestIdentityProvider(), config)
+ );
+ });
+ });
+
+ describe('calculateRefreshTime', () => {
+ it('should calculate correct refresh time with 0.8 ratio', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8
+ };
+
+ const manager = new TokenManager(new TestIdentityProvider(), config);
+ const token = createToken(1000);
+ const refreshTime = manager.calculateRefreshTime(token, 0);
+
+ // With 1000s TTL and 0.8 ratio, should refresh at 800s
+ assert.equal(refreshTime, 800);
+ });
+
+ it('should return 0 for ratio of 0', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0
+ };
+
+ const manager = new TokenManager(new TestIdentityProvider(), config);
+ const token = createToken(1000);
+ const refreshTime = manager.calculateRefreshTime(token, 0);
+
+ assert.equal(refreshTime, 0);
+ });
+
+ it('should refresh at expiration time with ratio of 1', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 1
+ };
+
+ const manager = new TokenManager(new TestIdentityProvider(), config);
+ const token = createToken(1000);
+ const refreshTime = manager.calculateRefreshTime(token, 0);
+
+ assert.equal(refreshTime, 1000);
+ });
+
+ it('should handle short TTL tokens', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8
+ };
+
+ const manager = new TokenManager(new TestIdentityProvider(), config);
+ const token = createToken(5);
+ const refreshTime = manager.calculateRefreshTime(token, 0);
+
+ assert.equal(refreshTime, 4);
+ });
+
+ it('should handle expired tokens', () => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8
+ };
+
+ const manager = new TokenManager(new TestIdentityProvider(), config);
+ // Create token that expired 100s ago
+ const token = createToken(-100);
+ const refreshTime = manager.calculateRefreshTime(token, 0);
+
+ // Should return refresh time of 0 for expired tokens
+ assert.equal(refreshTime, 0);
+ });
+ describe('token refresh scenarios', () => {
+
+ describe('token refresh', () => {
+ it('should handle token refresh', async () => {
+ const networkDelay = 20;
+ const tokenTtl = 100;
+
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8
+ };
+
+ const identityProvider = new ControlledIdentityProvider(['token1', 'token2', 'token3'], networkDelay, tokenTtl);
+ const manager = new TokenManager(identityProvider, config);
+ const listener = new TestListener();
+ const disposable = manager.start(listener);
+
+ assert.equal(manager.getCurrentToken(), null, 'Should not have token yet');
+ // Wait for the first token request to complete ( it should be immediate, and we should wait only for the network delay)
+ await delay(networkDelay)
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should receive initial token');
+ assert.equal(listener.receivedTokens[0].value, 'token1', 'Should have correct token value');
+ assert.equal(listener.receivedTokens[0].expiresAtMs - listener.receivedTokens[0].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors: ' + listener.errors);
+ assert.equal(manager.getCurrentToken().value, 'token1', 'Should have current token');
+
+ await delay(80);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token yet');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+
+ await delay(networkDelay);
+
+ assert.equal(listener.receivedTokens.length, 2, 'Should receive second token');
+ assert.equal(listener.receivedTokens[1].value, 'token2', 'Should have correct token value');
+ assert.equal(listener.receivedTokens[1].expiresAtMs - listener.receivedTokens[1].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+ assert.equal(manager.getCurrentToken().value, 'token2', 'Should have current token');
+
+ await delay(80);
+
+ assert.equal(listener.receivedTokens.length, 2, 'Should not receive new token yet');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+
+ await delay(networkDelay);
+
+ assert.equal(listener.receivedTokens.length, 3, 'Should receive third token');
+ assert.equal(listener.receivedTokens[2].value, 'token3', 'Should have correct token value');
+ assert.equal(listener.receivedTokens[2].expiresAtMs - listener.receivedTokens[2].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+ assert.equal(manager.getCurrentToken().value, 'token3', 'Should have current token');
+
+ disposable?.dispose();
+ });
+ });
+ });
+ });
+
+ describe('TokenManager error handling', () => {
+
+ describe('error scenarios', () => {
+ it('should not recover if retries are not enabled', async () => {
+
+ const networkDelay = 20;
+ const tokenTtl = 100;
+
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8
+ };
+
+ const identityProvider = new ErrorSimulatingProvider(
+ [
+ 'token1',
+ new Error('Fatal error'),
+ 'token3'
+ ],
+ networkDelay,
+ tokenTtl
+ );
+
+ const manager = new TokenManager(identityProvider, config);
+ const listener = new TestListener();
+ const disposable = manager.start(listener);
+
+ await delay(networkDelay);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should receive initial token');
+ assert.equal(listener.receivedTokens[0].value, 'token1', 'Should have correct initial token');
+ assert.equal(listener.receivedTokens[0].expiresAtMs - listener.receivedTokens[0].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(listener.errors.length, 0, 'Should not have errors yet');
+
+ await delay(80);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token yet');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+
+ await delay(networkDelay);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token after failure');
+ assert.equal(listener.errors.length, 1, 'Should receive error');
+ assert.equal(listener.errors[0].message, 'Fatal error', 'Should have correct error message');
+ assert.equal(listener.errors[0].isRetryable, false, 'Should be a fatal error');
+
+ // verify that the token manager is stopped and no more requests are made after the error and expected refresh time
+ await delay(80);
+
+ assert.equal(identityProvider.getRequestCount(), 2, 'Should not make more requests after error');
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token after error');
+ assert.equal(listener.errors.length, 1, 'Should not receive more errors after error');
+ assert.equal(manager.isRunning(), false, 'Should stop token manager after error');
+
+ disposable?.dispose();
+ });
+
+ it('should handle retries with exponential backoff', async () => {
+ const networkDelay = 20;
+ const tokenTtl = 100;
+
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8,
+ retry: {
+ maxAttempts: 3,
+ initialDelayMs: 100,
+ maxDelayMs: 1000,
+ backoffMultiplier: 2,
+ isRetryable: (error: unknown) => error instanceof Error && error.message === 'Temporary failure'
+ }
+ };
+
+ const identityProvider = new ErrorSimulatingProvider(
+ [
+ 'initial-token',
+ new Error('Temporary failure'), // First attempt fails
+ new Error('Temporary failure'), // First retry fails
+ 'recovery-token' // Second retry succeeds
+ ],
+ networkDelay,
+ tokenTtl
+ );
+
+ const manager = new TokenManager(identityProvider, config);
+ const listener = new TestListener();
+ const disposable = manager.start(listener);
+
+ // Wait for initial token
+ await delay(networkDelay);
+ assert.equal(listener.receivedTokens.length, 1, 'Should receive initial token');
+ assert.equal(listener.receivedTokens[0].value, 'initial-token', 'Should have correct initial token');
+ assert.equal(listener.receivedTokens[0].expiresAtMs - listener.receivedTokens[0].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(listener.errors.length, 0, 'Should not have errors yet');
+
+ await delay(80);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token yet');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+
+ await delay(networkDelay);
+
+ // Should have first error but not stop due to retry config
+ assert.equal(listener.errors.length, 1, 'Should have first error');
+ assert.ok(listener.errors[0].message.includes('attempt 1'), 'Error should indicate first attempt');
+ assert.equal(listener.errors[0].isRetryable, true, 'Should not be a fatal error');
+ assert.equal(manager.isRunning(), true, 'Should continue running during retries');
+
+ // Advance past first retry (delay: 100ms due to backoff)
+ await delay(100);
+
+ assert.equal(listener.errors.length, 1, 'Should not have the second error yet');
+
+ await delay(networkDelay);
+
+ assert.equal(listener.errors.length, 2, 'Should have second error');
+ assert.ok(listener.errors[1].message.includes('attempt 2'), 'Error should indicate second attempt');
+ assert.equal(listener.errors[0].isRetryable, true, 'Should not be a fatal error');
+ assert.equal(manager.isRunning(), true, 'Should continue running during retries');
+
+ // Advance past second retry (delay: 200ms due to backoff)
+ await delay(200);
+
+ assert.equal(listener.errors.length, 2, 'Should not have another error');
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token yet');
+
+ await delay(networkDelay);
+
+ // Should have recovered
+ assert.equal(listener.receivedTokens.length, 2, 'Should receive recovery token');
+ assert.equal(listener.receivedTokens[1].value, 'recovery-token', 'Should have correct recovery token');
+ assert.equal(listener.receivedTokens[1].expiresAtMs - listener.receivedTokens[1].receivedAtMs,
+ tokenTtl, 'Should have correct TTL');
+ assert.equal(manager.isRunning(), true, 'Should continue running after recovery');
+ assert.equal(identityProvider.getRequestCount(), 4, 'Should have made exactly 4 requests');
+
+ disposable?.dispose();
+ });
+
+ it('should stop after max retries exceeded', async () => {
+ const networkDelay = 20;
+ const tokenTtl = 100;
+
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8,
+ retry: {
+ maxAttempts: 2, // Only allow 2 retries
+ initialDelayMs: 100,
+ maxDelayMs: 1000,
+ backoffMultiplier: 2,
+ jitterPercentage: 0,
+ isRetryable: (error: unknown) => error instanceof Error && error.message === 'Temporary failure'
+ }
+ };
+
+ // All attempts must fail
+ const identityProvider = new ErrorSimulatingProvider(
+ [
+ 'initial-token',
+ new Error('Temporary failure'),
+ new Error('Temporary failure'),
+ new Error('Temporary failure')
+ ],
+ networkDelay,
+ tokenTtl
+ );
+
+ const manager = new TokenManager(identityProvider, config);
+ const listener = new TestListener();
+ const disposable = manager.start(listener);
+
+ // Wait for initial token
+ await delay(networkDelay);
+ assert.equal(listener.receivedTokens.length, 1, 'Should receive initial token');
+
+ await delay(80);
+
+ assert.equal(listener.receivedTokens.length, 1, 'Should not receive new token yet');
+ assert.equal(listener.errors.length, 0, 'Should not have any errors');
+
+ //wait for the "network call" to complete
+ await delay(networkDelay);
+
+ // First error
+ assert.equal(listener.errors.length, 1, 'Should have first error');
+ assert.equal(manager.isRunning(), true, 'Should continue running after first error');
+ assert.equal(listener.errors[0].isRetryable, true, 'Should not be a fatal error');
+
+ // Advance past first retry
+ await delay(100);
+
+ assert.equal(listener.errors.length, 1, 'Should not have second error yet');
+
+ //wait for the "network call" to complete
+ await delay(networkDelay);
+
+ // Second error
+ assert.equal(listener.errors.length, 2, 'Should have second error');
+ assert.equal(manager.isRunning(), true, 'Should continue running after second error');
+ assert.equal(listener.errors[1].isRetryable, true, 'Should not be a fatal error');
+
+ // Advance past second retry
+ await delay(200);
+
+ assert.equal(listener.errors.length, 2, 'Should not have third error yet');
+
+ //wait for the "network call" to complete
+ await delay(networkDelay);
+
+ // Should stop after max retries
+ assert.equal(listener.errors.length, 3, 'Should have final error');
+ assert.equal(listener.errors[2].isRetryable, false, 'Should be a fatal error');
+ assert.equal(manager.isRunning(), false, 'Should stop after max retries exceeded');
+ assert.equal(identityProvider.getRequestCount(), 4, 'Should have made exactly 4 requests');
+
+ disposable?.dispose();
+
+ });
+ });
+ });
+
+ describe('TokenManager retry delay calculations', () => {
+ const createManager = (retryConfig: Partial) => {
+ const config: TokenManagerConfig = {
+ expirationRefreshRatio: 0.8,
+ retry: {
+ maxAttempts: 3,
+ initialDelayMs: 100,
+ maxDelayMs: 1000,
+ backoffMultiplier: 2,
+ ...retryConfig
+ }
+ };
+ return new TokenManager(new TestIdentityProvider(), config);
+ };
+
+ describe('calculateRetryDelay', () => {
+
+ it('should apply exponential backoff', () => {
+ const manager = createManager({
+ initialDelayMs: 100,
+ backoffMultiplier: 2,
+ jitterPercentage: 0
+ });
+
+ // Test multiple retry attempts
+ const expectedDelays = [
+ [1, 100], // First attempt: initialDelay * (2^0) = 100
+ [2, 200], // Second attempt: initialDelay * (2^1) = 200
+ [3, 400], // Third attempt: initialDelay * (2^2) = 400
+ [4, 800], // Fourth attempt: initialDelay * (2^3) = 800
+ [5, 1000] // Fifth attempt: would be 1600, but capped at maxDelay (1000)
+ ];
+
+ for (const [attempt, expectedDelay] of expectedDelays) {
+ manager['retryAttempt'] = attempt;
+ assert.equal(
+ manager.calculateRetryDelay(),
+ expectedDelay,
+ `Incorrect delay for attempt ${attempt}`
+ );
+ }
+ });
+
+ it('should respect maxDelayMs', () => {
+ const manager = createManager({
+ initialDelayMs: 100,
+ maxDelayMs: 300,
+ backoffMultiplier: 2,
+ jitterPercentage: 0
+ });
+
+ // Test that delays are capped at maxDelayMs
+ const expectedDelays = [
+ [1, 100], // First attempt: 100
+ [2, 200], // Second attempt: 200
+ [3, 300], // Third attempt: would be 400, capped at 300
+ [4, 300], // Fourth attempt: would be 800, capped at 300
+ [5, 300] // Fifth attempt: would be 1600, capped at 300
+ ];
+
+ for (const [attempt, expectedDelay] of expectedDelays) {
+ manager['retryAttempt'] = attempt;
+ assert.equal(
+ manager.calculateRetryDelay(),
+ expectedDelay,
+ `Incorrect delay for attempt ${attempt}`
+ );
+ }
+ });
+
+ it('should return 0 when no retry config is present', () => {
+ const manager = new TokenManager(new TestIdentityProvider(), {
+ expirationRefreshRatio: 0.8
+ });
+ manager['retryAttempt'] = 1;
+ assert.equal(manager.calculateRetryDelay(), 0);
+ });
+ });
+ });
+});
+
diff --git a/packages/client/lib/authx/token-manager.ts b/packages/client/lib/authx/token-manager.ts
new file mode 100644
index 00000000000..6532d88317b
--- /dev/null
+++ b/packages/client/lib/authx/token-manager.ts
@@ -0,0 +1,318 @@
+import { IdentityProvider, TokenResponse } from './identity-provider';
+import { Token } from './token';
+import {Disposable} from './disposable';
+
+/**
+ * The configuration for retrying token refreshes.
+ */
+export interface RetryPolicy {
+ /**
+ * The maximum number of attempts to retry token refreshes.
+ */
+ maxAttempts: number;
+
+ /**
+ * The initial delay in milliseconds before the first retry.
+ */
+ initialDelayMs: number;
+
+ /**
+ * The maximum delay in milliseconds between retries.
+ * The calculated delay will be capped at this value.
+ */
+ maxDelayMs: number;
+
+ /**
+ * The multiplier for exponential backoff between retries.
+ * @example
+ * A value of 2 will double the delay each time:
+ * - 1st retry: initialDelayMs
+ * - 2nd retry: initialDelayMs * 2
+ * - 3rd retry: initialDelayMs * 4
+ */
+ backoffMultiplier: number;
+
+ /**
+ * The percentage of jitter to apply to the delay.
+ * @example
+ * A value of 0.1 will add or subtract up to 10% of the delay.
+ */
+ jitterPercentage?: number;
+
+ /**
+ * Function to classify errors from the identity provider as retryable or non-retryable.
+ * Used to determine if a token refresh failure should be retried based on the type of error.
+ *
+ * The default behavior is to retry all types of errors if no function is provided.
+ *
+ * Common use cases:
+ * - Network errors that may be transient (should retry)
+ * - Invalid credentials (should not retry)
+ * - Rate limiting responses (should retry)
+ *
+ * @param error - The error from the identity provider3
+ * @param attempt - Current retry attempt (0-based)
+ * @returns `true` if the error is considered transient and the operation should be retried
+ *
+ * @example
+ * ```typescript
+ * const retryPolicy: RetryPolicy = {
+ * maxAttempts: 3,
+ * initialDelayMs: 1000,
+ * maxDelayMs: 5000,
+ * backoffMultiplier: 2,
+ * isRetryable: (error) => {
+ * // Retry on network errors or rate limiting
+ * return error instanceof NetworkError ||
+ * error instanceof RateLimitError;
+ * }
+ * };
+ * ```
+ */
+ isRetryable?: (error: unknown, attempt: number) => boolean;
+}
+
+/**
+ * the configuration for the TokenManager.
+ */
+export interface TokenManagerConfig {
+
+ /**
+ * Represents the ratio of a token's lifetime at which a refresh should be triggered.
+ * For example, a value of 0.75 means the token should be refreshed when 75% of its lifetime has elapsed (or when
+ * 25% of its lifetime remains).
+ */
+ expirationRefreshRatio: number;
+
+ // The retry policy for token refreshes. If not provided, no retries will be attempted.
+ retry?: RetryPolicy;
+}
+
+/**
+ * IDPError indicates a failure from the identity provider.
+ *
+ * The `isRetryable` flag is determined by the RetryPolicy's error classification function - if an error is
+ * classified as retryable, it will be marked as transient and the token manager will attempt to recover.
+ */
+export class IDPError extends Error {
+ constructor(public readonly message: string, public readonly isRetryable: boolean) {
+ super(message);
+ this.name = 'IDPError';
+ }
+}
+
+/**
+ * TokenStreamListener is an interface for objects that listen to token changes.
+ */
+export type TokenStreamListener = {
+ /**
+ * Called each time a new token is received.
+ * @param token
+ */
+ onNext: (token: Token) => void;
+
+ /**
+ * Called when an error occurs while calling the underlying IdentityProvider. The error can be
+ * transient and the token manager will attempt to obtain a token again if retry policy is configured.
+ *
+ * Only fatal errors will terminate the stream and stop the token manager.
+ *
+ * @param error
+ */
+ onError: (error: IDPError) => void;
+
+}
+
+/**
+ * TokenManager is responsible for obtaining/refreshing tokens and notifying listeners about token changes.
+ * It uses an IdentityProvider to request tokens. The token refresh is scheduled based on the token's TTL and
+ * the expirationRefreshRatio configuration.
+ *
+ * The TokenManager should be disposed when it is no longer needed by calling the dispose method on the Disposable
+ * returned by start.
+ */
+export class TokenManager {
+ private currentToken: Token | null = null;
+ private refreshTimeout: NodeJS.Timeout | null = null;
+ private listener: TokenStreamListener | null = null;
+ private retryAttempt: number = 0;
+
+ constructor(
+ private readonly identityProvider: IdentityProvider,
+ private readonly config: TokenManagerConfig
+ ) {
+ if (this.config.expirationRefreshRatio > 1) {
+ throw new Error('expirationRefreshRatio must be less than or equal to 1');
+ }
+ if (this.config.expirationRefreshRatio < 0) {
+ throw new Error('expirationRefreshRatio must be greater or equal to 0');
+ }
+ }
+
+ /**
+ * Starts the token manager and returns a Disposable that can be used to stop the token manager.
+ *
+ * @param listener The listener that will receive token updates.
+ * @param initialDelayMs The initial delay in milliseconds before the first token refresh.
+ */
+ public start(listener: TokenStreamListener, initialDelayMs: number = 0): Disposable {
+ if (this.listener) {
+ this.stop();
+ }
+
+ this.listener = listener;
+ this.retryAttempt = 0;
+
+ this.scheduleNextRefresh(initialDelayMs);
+
+ return {
+ dispose: () => this.stop()
+ };
+ }
+
+ public calculateRetryDelay(): number {
+ if (!this.config.retry) return 0;
+
+ const { initialDelayMs, maxDelayMs, backoffMultiplier, jitterPercentage } = this.config.retry;
+
+ let delay = initialDelayMs * Math.pow(backoffMultiplier, this.retryAttempt - 1);
+
+ delay = Math.min(delay, maxDelayMs);
+
+ if (jitterPercentage) {
+ const jitterRange = delay * (jitterPercentage / 100);
+ const jitterAmount = Math.random() * jitterRange - (jitterRange / 2);
+ delay += jitterAmount;
+ }
+
+ let result = Math.max(0, Math.floor(delay));
+
+ return result;
+ }
+
+ private shouldRetry(error: unknown): boolean {
+ if (!this.config.retry) return false;
+
+ const { maxAttempts, isRetryable } = this.config.retry;
+
+ if (this.retryAttempt >= maxAttempts) {
+ return false;
+ }
+
+ if (isRetryable) {
+ return isRetryable(error, this.retryAttempt);
+ }
+
+ return false;
+ }
+
+ public isRunning(): boolean {
+ return this.listener !== null;
+ }
+
+ private async refresh(): Promise {
+ if (!this.listener) {
+ throw new Error('TokenManager is not running, but refresh was called');
+ }
+
+ try {
+ await this.identityProvider.requestToken().then(this.handleNewToken);
+ this.retryAttempt = 0;
+ } catch (error) {
+
+ if (this.shouldRetry(error)) {
+ this.retryAttempt++;
+ const retryDelay = this.calculateRetryDelay();
+ this.notifyError(`Token refresh failed (attempt ${this.retryAttempt}), retrying in ${retryDelay}ms: ${error}`, true)
+ this.scheduleNextRefresh(retryDelay);
+ } else {
+ this.notifyError(error, false);
+ this.stop();
+ }
+ }
+ }
+
+ private handleNewToken = async ({ token: nativeToken, ttlMs }: TokenResponse): Promise