diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 290ad02..0bc3b42 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -6,3 +6,6 @@ updates: interval: daily time: "10:00" open-pull-requests-limit: 10 + commit-message: + prefix: "deps" + prefix-development: "deps(dev)" diff --git a/.github/workflows/js-test-and-release.yml b/.github/workflows/js-test-and-release.yml index 65d30bd..c6897e3 100644 --- a/.github/workflows/js-test-and-release.yml +++ b/.github/workflows/js-test-and-release.yml @@ -7,8 +7,6 @@ on: branches: - master # with #262 - ${{{ github.default_branch }}} pull_request: - branches: - - master # with #262 - ${{{ github.default_branch }}} jobs: @@ -29,7 +27,7 @@ jobs: strategy: matrix: os: [windows-latest, ubuntu-latest, macos-latest] - node: [16] + node: [lts/*] fail-fast: true steps: - uses: actions/checkout@v3 diff --git a/.gitignore b/.gitignore index 2df8b47..f7c8504 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,5 @@ node_modules package-lock.json coverage .nyc_output -docs - +.docs dist diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e2e2eb..2ce27af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,132 @@ +## [6.0.8](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.7...v6.0.8) (2022-12-16) + + +### Documentation + +* fix build badge ([#238](https://github.com/libp2p/js-libp2p-tcp/issues/238)) ([8a94ced](https://github.com/libp2p/js-libp2p-tcp/commit/8a94cedc6e8806b597c650209b76b5ce38231146)) + +## [6.0.7](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.6...v6.0.7) (2022-12-15) + + +### Bug Fixes + +* publish tsdocs for this module ([#236](https://github.com/libp2p/js-libp2p-tcp/issues/236)) ([b4f88e7](https://github.com/libp2p/js-libp2p-tcp/commit/b4f88e7bfbe865eb00cfb1d99a4231b072b458a5)) + +## [6.0.6](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.5...v6.0.6) (2022-12-13) + + +### Bug Fixes + +* remove abortable-iterator and close socket directly on abort ([#220](https://github.com/libp2p/js-libp2p-tcp/issues/220)) ([28fe750](https://github.com/libp2p/js-libp2p-tcp/commit/28fe7500fa99c91f4f81d73671e885955b5d7e4a)) + +## [6.0.5](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.4...v6.0.5) (2022-12-06) + + +### Dependencies + +* **dev:** bump sinon from 14.0.2 to 15.0.0 ([#233](https://github.com/libp2p/js-libp2p-tcp/issues/233)) ([72a79ab](https://github.com/libp2p/js-libp2p-tcp/commit/72a79ab81d79daaeb8a77656e98a19b70f132595)) + +## [6.0.4](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.3...v6.0.4) (2022-11-22) + + +### Bug Fixes + +* use labels to differentiate interfaces for metrics ([#230](https://github.com/libp2p/js-libp2p-tcp/issues/230)) ([6c4c316](https://github.com/libp2p/js-libp2p-tcp/commit/6c4c316d080cde679c11a784c22284d6e1912b94)) + +## [6.0.3](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.2...v6.0.3) (2022-11-22) + + +### Bug Fixes + +* make metrics interface a dep instead of a dev dep ([#231](https://github.com/libp2p/js-libp2p-tcp/issues/231)) ([876ca13](https://github.com/libp2p/js-libp2p-tcp/commit/876ca132aa2b307315148628681cddfa0828b3ac)) + +## [6.0.2](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.1...v6.0.2) (2022-11-17) + + +### Bug Fixes + +* update metric names to follow prometheus naming guide ([#228](https://github.com/libp2p/js-libp2p-tcp/issues/228)) ([24c5b37](https://github.com/libp2p/js-libp2p-tcp/commit/24c5b37ab64429972f29af6ae4516c18232d1ff3)) + + +### Trivial Changes + +* add test for filtering unix socket address ([#229](https://github.com/libp2p/js-libp2p-tcp/issues/229)) ([efcfbb2](https://github.com/libp2p/js-libp2p-tcp/commit/efcfbb28a77192a489834c8b8ad832337539d62b)), closes [#132](https://github.com/libp2p/js-libp2p-tcp/issues/132) + +## [6.0.1](https://github.com/libp2p/js-libp2p-tcp/compare/v6.0.0...v6.0.1) (2022-11-16) + + +### Trivial Changes + +* **deps-dev:** bump @libp2p/interface-mocks from 7.1.0 to 8.0.1 ([#225](https://github.com/libp2p/js-libp2p-tcp/issues/225)) ([a271056](https://github.com/libp2p/js-libp2p-tcp/commit/a271056c8d8d179dd95399f9621d790a0f18b84a)) + +## [6.0.0](https://github.com/libp2p/js-libp2p-tcp/compare/v5.0.2...v6.0.0) (2022-11-05) + + +### ⚠ BREAKING CHANGES + +* requires metrics interface v4 + +### Features + +* add metrics ([#223](https://github.com/libp2p/js-libp2p-tcp/issues/223)) ([c004357](https://github.com/libp2p/js-libp2p-tcp/commit/c0043577777181545eef925b50e28743cfd7a29d)), closes [#217](https://github.com/libp2p/js-libp2p-tcp/issues/217) + +## [5.0.2](https://github.com/libp2p/js-libp2p-tcp/compare/v5.0.1...v5.0.2) (2022-11-05) + + +### Bug Fixes + +* handle listen error ([#224](https://github.com/libp2p/js-libp2p-tcp/issues/224)) ([4125e9e](https://github.com/libp2p/js-libp2p-tcp/commit/4125e9eaa4d531dbcb0f2777149d1ca8fa9460a5)) + +## [5.0.1](https://github.com/libp2p/js-libp2p-tcp/compare/v5.0.0...v5.0.1) (2022-10-17) + + +### Trivial Changes + +* **deps-dev:** bump it-all from 1.0.6 to 2.0.0 ([#222](https://github.com/libp2p/js-libp2p-tcp/issues/222)) ([fddebdf](https://github.com/libp2p/js-libp2p-tcp/commit/fddebdff3ab2056da78f9ec665e5005a659e9045)) + +## [5.0.0](https://github.com/libp2p/js-libp2p-tcp/compare/v4.1.0...v5.0.0) (2022-10-12) + + +### ⚠ BREAKING CHANGES + +* modules no longer implement `Initializable` instead switching to constructor injection + +### Bug Fixes + +* remove @libp2p/components ([#219](https://github.com/libp2p/js-libp2p-tcp/issues/219)) ([be2dbc3](https://github.com/libp2p/js-libp2p-tcp/commit/be2dbc3f674e9bce534dc92d93ad2739ed6d2bef)) + +## [4.1.0](https://github.com/libp2p/js-libp2p-tcp/compare/v4.0.2...v4.1.0) (2022-10-11) + + +### Features + +* add server.maxConnections option ([#213](https://github.com/libp2p/js-libp2p-tcp/issues/213)) ([99e88a4](https://github.com/libp2p/js-libp2p-tcp/commit/99e88a4d3122c46f06f69cfbe3f72a2279e2329f)) + +## [4.0.2](https://github.com/libp2p/js-libp2p-tcp/compare/v4.0.1...v4.0.2) (2022-10-11) + + +### Bug Fixes + +* port listener to ES6 class syntax ([#214](https://github.com/libp2p/js-libp2p-tcp/issues/214)) ([af7b8e2](https://github.com/libp2p/js-libp2p-tcp/commit/af7b8e2bf48ec0c9f01e087e76bc9570dca05783)) + +## [4.0.1](https://github.com/libp2p/js-libp2p-tcp/compare/v4.0.0...v4.0.1) (2022-10-07) + + +### Trivial Changes + +* **deps-dev:** bump @libp2p/interface-mocks from 4.0.3 to 6.0.0 ([#216](https://github.com/libp2p/js-libp2p-tcp/issues/216)) ([f224a5a](https://github.com/libp2p/js-libp2p-tcp/commit/f224a5a0e8b497317b2b9410bb60433647a3185a)) + +## [4.0.0](https://github.com/libp2p/js-libp2p-tcp/compare/v3.1.2...v4.0.0) (2022-10-07) + + +### ⚠ BREAKING CHANGES + +* **deps:** bump @libp2p/interface-transport from 1.0.4 to 2.0.0 (#215) + +### Trivial Changes + +* **deps:** bump @libp2p/interface-transport from 1.0.4 to 2.0.0 ([#215](https://github.com/libp2p/js-libp2p-tcp/issues/215)) ([1adf73d](https://github.com/libp2p/js-libp2p-tcp/commit/1adf73db4e88e0c196766588a2972a3a6e28e69a)) + ## [3.1.2](https://github.com/libp2p/js-libp2p-tcp/compare/v3.1.1...v3.1.2) (2022-09-24) diff --git a/README.md b/README.md index ecb13ae..3e1673f 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,19 @@ # @libp2p/tcp [![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) -[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) [![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-tcp.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-tcp) -[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-tcp/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p-tcp/js-test-and-release.yml?branch=master\&style=flat-square)](https://github.com/libp2p/js-libp2p-tcp/actions/workflows/js-test-and-release.yml?query=branch%3Amaster) -> Node.js implementation of the TCP module that libp2p uses, which implements the interface-connection and interface-transport interfaces +> A TCP transport for libp2p ## Table of contents - [Install](#install) - [Usage](#usage) -- [API](#api) - - [Transport](#transport) - - [Connection](#connection) -- [Contribute](#contribute) -- [Contribute](#contribute-1) +- [API Docs](#api-docs) - [License](#license) -- [Contribution](#contribution) +- [Contribute](#contribute) ## Install @@ -26,19 +21,12 @@ $ npm i @libp2p/tcp ``` -[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/src/transport/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/transport) -[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/src/connection/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/connection) - -```sh -> npm install @libp2p/tcp -``` - ## Usage ```js -import { TCP } from '@libp2p/tcp' +import { tcp } from '@libp2p/tcp' import { multiaddr } from '@multiformats/multiaddr' -import {pipe} from 'it-pipe' +import { pipe } from 'it-pipe' import all from 'it-all' // A simple upgrader that just returns the MultiaddrConnection @@ -47,9 +35,9 @@ const upgrader = { upgradeOutbound: async maConn => maConn } -const tcp = new TCP() +const transport = tcp()() -const listener = tcp.createListener({ +const listener = transport.createListener({ upgrader, handler: (socket) => { console.log('new connection opened') @@ -64,7 +52,7 @@ const addr = multiaddr('/ip4/127.0.0.1/tcp/9090') await listener.listen(addr) console.log('listening') -const socket = await tcp.dial(addr, { upgrader }) +const socket = await transport.dial(addr, { upgrader }) const values = await pipe( socket, all @@ -83,41 +71,9 @@ new connection opened Value: hello World! ``` -## API - -### Transport - -[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/packages/libp2p-interfaces/src/transport/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/transport) - -`libp2p-tcp` accepts TCP addresses as both IPFS and non IPFS encapsulated addresses, i.e: - -`/ip4/127.0.0.1/tcp/4001` -`/ip4/127.0.0.1/tcp/4001/ipfs/QmHash` - -(both for dialing and listening) - -### Connection +## API Docs -[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/packages/libp2p-interfaces/src/connection/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/packages/libp2p-interfaces/src/connection) - -## Contribute - -Contributions are welcome! The libp2p implementation in JavaScript is a work in progress. As such, there's a few things you can do right now to help out: - -- [Check out the existing issues](//github.com/libp2p/js-libp2p-tcp/issues). -- **Perform code reviews**. -- **Add tests**. There can never be enough tests. - -Please be aware that all interactions related to libp2p are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). - -Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. - -## Contribute - -The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out: - -- Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically. -- **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs. +- ## License @@ -126,6 +82,6 @@ Licensed under either of - Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) - MIT ([LICENSE-MIT](LICENSE-MIT) / ) -## Contribution +## Contribute Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/package.json b/package.json index b82570b..4a33564 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@libp2p/tcp", - "version": "3.1.2", - "description": "Node.js implementation of the TCP module that libp2p uses, which implements the interface-connection and interface-transport interfaces", + "version": "6.0.8", + "description": "A TCP transport for libp2p", "license": "Apache-2.0 OR MIT", "homepage": "https://github.com/libp2p/js-libp2p-tcp#readme", "repository": { @@ -28,7 +28,7 @@ "types": "./dist/src/index.d.ts", "files": [ "src", - "dist/src", + "dist", "!dist/test", "!**/*.tsbuildinfo" ], @@ -71,15 +71,15 @@ "release": "patch" }, { - "type": "chore", + "type": "docs", "release": "patch" }, { - "type": "docs", + "type": "test", "release": "patch" }, { - "type": "test", + "type": "deps", "release": "patch" }, { @@ -109,7 +109,11 @@ }, { "type": "docs", - "section": "Trivial Changes" + "section": "Documentation" + }, + { + "type": "deps", + "section": "Dependencies" }, { "type": "test", @@ -130,6 +134,7 @@ "lint": "aegir lint", "dep-check": "aegir dep-check", "build": "aegir build", + "docs": "aegir docs", "test": "aegir test -t node -t electron-main", "test:node": "aegir test -t node --cov", "test:electron-main": "aegir test -t electron-main", @@ -137,24 +142,24 @@ }, "dependencies": { "@libp2p/interface-connection": "^3.0.2", - "@libp2p/interface-transport": "^1.0.4", + "@libp2p/interface-metrics": "^4.0.0", + "@libp2p/interface-transport": "^2.0.0", "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", "@libp2p/utils": "^3.0.2", "@multiformats/mafmt": "^11.0.3", "@multiformats/multiaddr": "^11.0.0", - "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", "stream-to-it": "^0.2.2" }, "devDependencies": { - "@libp2p/interface-mocks": "^4.0.3", - "@libp2p/interface-transport-compliance-tests": "^2.0.6", - "aegir": "^37.5.3", - "it-all": "^1.0.6", + "@libp2p/interface-mocks": "^8.0.1", + "@libp2p/interface-transport-compliance-tests": "^3.0.0", + "aegir": "^37.7.3", + "it-all": "^2.0.0", "it-pipe": "^2.0.3", "p-defer": "^4.0.0", - "sinon": "^14.0.0", - "uint8arrays": "^3.0.0" + "sinon": "^15.0.0", + "uint8arrays": "^4.0.2" } } diff --git a/src/index.ts b/src/index.ts index f790055..b3e19ff 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,14 +3,15 @@ import * as mafmt from '@multiformats/mafmt' import errCode from 'err-code' import { logger } from '@libp2p/logger' import { toMultiaddrConnection } from './socket-to-conn.js' -import { createListener } from './listener.js' +import { TCPListener } from './listener.js' import { multiaddrToNetConfig } from './utils.js' import { AbortError } from '@libp2p/interfaces/errors' import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js' -import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/interface-transport' +import { CreateListenerOptions, DialOptions, Listener, symbol, Transport } from '@libp2p/interface-transport' import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net' import type { Connection } from '@libp2p/interface-connection' +import type { CounterGroup, Metrics } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp') @@ -29,6 +30,12 @@ export interface TCPOptions { * When closing a socket, wait this long for it to close gracefully before it is closed more forcibly */ socketCloseTimeout?: number + + /** + * Set this property to reject connections when the server's connection count gets high. + * https://nodejs.org/api/net.html#servermaxconnections + */ + maxConnections?: number } /** @@ -49,11 +56,31 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock } -export class TCP implements Transport { +export interface TCPComponents { + metrics?: Metrics +} + +export interface TCPMetrics { + dialerEvents: CounterGroup +} + +class TCP implements Transport { private readonly opts: TCPOptions + private readonly metrics?: TCPMetrics + private readonly components: TCPComponents - constructor (options: TCPOptions = {}) { + constructor (components: TCPComponents, options: TCPOptions = {}) { this.opts = options + this.components = components + + if (components.metrics != null) { + this.metrics = { + dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', { + label: 'event', + help: 'Total count of TCP dialer events by type' + }) + } + } } get [symbol] (): true { @@ -67,6 +94,7 @@ export class TCP implements Transport { async dial (ma: Multiaddr, options: TCPDialOptions): Promise { options.keepAlive = options.keepAlive ?? true + // options.signal destroys the socket before 'connect' event const socket = await this._connect(ma, options) // Avoid uncaught errors caused by unstable connections @@ -76,13 +104,32 @@ export class TCP implements Transport { const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, - signal: options.signal, socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, - socketCloseTimeout: this.opts.socketCloseTimeout + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.metrics?.dialerEvents }) + + const onAbort = () => { + maConn.close().catch(err => { + log.error('Error closing maConn after abort', err) + }) + } + options.signal?.addEventListener('abort', onAbort, { once: true }) + log('new outbound connection %s', maConn.remoteAddr) const conn = await options.upgrader.upgradeOutbound(maConn) log('outbound connection %s upgraded', maConn.remoteAddr) + + options.signal?.removeEventListener('abort', onAbort) + + if (options.signal?.aborted === true) { + conn.close().catch(err => { + log.error('Error closing conn after abort', err) + }) + + throw new AbortError() + } + return conn } @@ -101,12 +148,14 @@ export class TCP implements Transport { const onError = (err: Error) => { err.message = `connection error ${cOptsStr}: ${err.message}` + this.metrics?.dialerEvents.increment({ error: true }) done(err) } const onTimeout = () => { log('connection timeout %s', cOptsStr) + this.metrics?.dialerEvents.increment({ timeout: true }) const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') // Note: this will result in onError() being called @@ -115,11 +164,13 @@ export class TCP implements Transport { const onConnect = () => { log('connection opened %j', cOpts) + this.metrics?.dialerEvents.increment({ connect: true }) done() } const onAbort = () => { log('connection aborted %j', cOpts) + this.metrics?.dialerEvents.increment({ abort: true }) rawSocket.destroy() done(new AbortError()) } @@ -155,18 +206,20 @@ export class TCP implements Transport { * anytime a new incoming Connection has been successfully upgraded via * `upgrader.upgradeInbound`. */ - createListener (options: TCPCreateListenerOptions) { - return createListener({ + createListener (options: TCPCreateListenerOptions): Listener { + return new TCPListener({ ...options, + maxConnections: this.opts.maxConnections, socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout, - socketCloseTimeout: this.opts.socketCloseTimeout + socketCloseTimeout: this.opts.socketCloseTimeout, + metrics: this.components.metrics }) } /** * Takes a list of `Multiaddr`s and returns only valid TCP addresses */ - filter (multiaddrs: Multiaddr[]) { + filter (multiaddrs: Multiaddr[]): Multiaddr[] { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter(ma => { @@ -182,3 +235,9 @@ export class TCP implements Transport { }) } } + +export function tcp (init: TCPOptions = {}): (components?: TCPComponents) => Transport { + return (components: TCPComponents = {}) => { + return new TCP(components, init) + } +} diff --git a/src/listener.ts b/src/listener.ts index d850016..1b13385 100644 --- a/src/listener.ts +++ b/src/listener.ts @@ -8,17 +8,13 @@ import { } from './utils.js' import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection' -import type { Upgrader, Listener } from '@libp2p/interface-transport' -import type { Server } from 'net' +import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport' import type { Multiaddr } from '@multiformats/multiaddr' import type { TCPCreateListenerOptions } from './index.js' +import type { CounterGroup, MetricGroup, Metrics } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp:listener') -interface ServerWithMultiaddrConnections extends Server { - __connections: MultiaddrConnection[] -} - /** * Attempts to close the given maConn. If a failure occurs, it will be logged */ @@ -35,54 +31,145 @@ interface Context extends TCPCreateListenerOptions { upgrader: Upgrader socketInactivityTimeout?: number socketCloseTimeout?: number + maxConnections?: number + metrics?: Metrics } -/** - * Create listener - */ -export function createListener (context: Context) { - const { - handler, upgrader, socketInactivityTimeout, socketCloseTimeout - } = context +const SERVER_STATUS_UP = 1 +const SERVER_STATUS_DOWN = 0 + +export interface TCPListenerMetrics { + status: MetricGroup + errors: CounterGroup + events: CounterGroup +} + +type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null } + +export class TCPListener extends EventEmitter implements Listener { + private readonly server: net.Server + /** Keep track of open connections to destroy in case of timeout */ + private readonly connections = new Set() + private status: Status = { started: false } + private metrics?: TCPListenerMetrics + private addr: string + + constructor (private readonly context: Context) { + super() + + context.keepAlive = context.keepAlive ?? true + + this.addr = 'unknown' + this.server = net.createServer(context, this.onSocket.bind(this)) + + // https://nodejs.org/api/net.html#servermaxconnections + // If set reject connections when the server's connection count gets high + // Useful to prevent too resource exhaustion via many open connections on high bursts of activity + if (context.maxConnections !== undefined) { + this.server.maxConnections = context.maxConnections + } + + this.server + .on('listening', () => { + if (context.metrics != null) { + // we are listening, register metrics for our port + const address = this.server.address() + + if (address == null) { + this.addr = 'unknown' + } else if (typeof address === 'string') { + // unix socket + this.addr = address + } else { + this.addr = `${address.address}:${address.port}` + } - context.keepAlive = context.keepAlive ?? true + context.metrics?.registerMetricGroup('libp2p_tcp_inbound_connections_total', { + label: 'address', + help: 'Current active connections in TCP listener', + calculate: () => { + return { + [this.addr]: this.connections.size + } + } + }) + + this.metrics = { + status: context.metrics.registerMetricGroup('libp2p_tcp_listener_status_info', { + label: 'address', + help: 'Current status of the TCP listener socket' + }), + errors: context.metrics.registerMetricGroup('libp2p_tcp_listener_errors_total', { + label: 'address', + help: 'Total count of TCP listener errors by type' + }), + events: context.metrics.registerMetricGroup('libp2p_tcp_listener_events_total', { + label: 'address', + help: 'Total count of TCP listener events by type' + }) + } + + this.metrics?.status.update({ + [this.addr]: SERVER_STATUS_UP + }) + } - let peerId: string | null - let listeningAddr: Multiaddr + this.dispatchEvent(new CustomEvent('listening')) + }) + .on('error', err => { + this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true }) + this.dispatchEvent(new CustomEvent('error', { detail: err })) + }) + .on('close', () => { + this.metrics?.status.update({ + [this.addr]: SERVER_STATUS_DOWN + }) + this.dispatchEvent(new CustomEvent('close')) + }) + } - const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(context, socket => { + private onSocket (socket: net.Socket) { // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) + this.metrics?.events.increment({ [`${this.addr} error`]: true }) }) let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr, - socketInactivityTimeout, - socketCloseTimeout + listeningAddr: this.status.started ? this.status.listeningAddr : undefined, + socketInactivityTimeout: this.context.socketInactivityTimeout, + socketCloseTimeout: this.context.socketCloseTimeout, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` }) } catch (err) { log.error('inbound connection failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true }) return } log('new inbound connection %s', maConn.remoteAddr) try { - upgrader.upgradeInbound(maConn) + this.context.upgrader.upgradeInbound(maConn) .then((conn) => { - log('inbound connection %s upgraded', maConn.remoteAddr) - trackConn(server, maConn, socket) + log('inbound connection upgraded %s', maConn.remoteAddr) + this.connections.add(maConn) + + socket.once('close', () => { + this.connections.delete(maConn) + }) - if (handler != null) { - handler(conn) + if (this.context.handler != null) { + this.context.handler(conn) } - listener.dispatchEvent(new CustomEvent('connection', { detail: conn })) + this.dispatchEvent(new CustomEvent('connection', { detail: conn })) }) .catch(async err => { log.error('inbound connection failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) await attemptClose(maConn) }) @@ -95,87 +182,72 @@ export function createListener (context: Context) { attemptClose(maConn) .catch(err => { log.error('closing inbound connection failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) }) } - }), - // Keep track of open connections to destroy in case of timeout - { __connections: [] }) + } - const listener: Listener = Object.assign(new EventEmitter(), { - getAddrs: () => { - let addrs: Multiaddr[] = [] - const address = server.address() + getAddrs () { + if (!this.status.started) { + return [] + } - if (address == null) { - return [] - } + let addrs: Multiaddr[] = [] + const address = this.server.address() + const { listeningAddr, peerId } = this.status - if (typeof address === 'string') { - addrs = [listeningAddr] - } else { - try { - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (listeningAddr.toString().startsWith('/ip4')) { - addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) - } else if (address.family === 'IPv6') { - addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) - } - } catch (err) { - log.error('could not turn %s:%s into multiaddr', address.address, address.port, err) + if (address == null) { + return [] + } + + if (typeof address === 'string') { + addrs = [listeningAddr] + } else { + try { + // Because TCP will only return the IPv6 version + // we need to capture from the passed multiaddr + if (listeningAddr.toString().startsWith('/ip4')) { + addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port)) + } else if (address.family === 'IPv6') { + addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port)) } + } catch (err) { + log.error('could not turn %s:%s into multiaddr', address.address, address.port, err) } + } - return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma) - }, - listen: async (ma: Multiaddr) => { - listeningAddr = ma - peerId = ma.getPeerId() - - if (peerId == null) { - listeningAddr = ma.decapsulateCode(CODE_P2P) - } + return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma) + } - return await new Promise((resolve, reject) => { - const options = multiaddrToNetConfig(listeningAddr) - server.listen(options, (err?: any) => { - if (err != null) { - return reject(err) - } - log('Listening on %s', server.address()) - resolve() - }) - }) - }, - close: async () => { - if (!server.listening) { - return - } + async listen (ma: Multiaddr) { + const peerId = ma.getPeerId() + const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma - await Promise.all( - server.__connections.map(async maConn => await attemptClose(maConn)) - ) + this.status = { started: true, listeningAddr, peerId } - await new Promise((resolve, reject) => { - server.close(err => (err != null) ? reject(err) : resolve()) + return await new Promise((resolve, reject) => { + const options = multiaddrToNetConfig(listeningAddr) + this.server.on('error', (err) => { + reject(err) }) - } - }) - - server - .on('listening', () => listener.dispatchEvent(new CustomEvent('listening'))) - .on('error', err => listener.dispatchEvent(new CustomEvent('error', { detail: err }))) - .on('close', () => listener.dispatchEvent(new CustomEvent('close'))) + this.server.listen(options, () => { + log('Listening on %s', this.server.address()) + resolve() + }) + }) + } - return listener -} + async close () { + if (!this.server.listening) { + return + } -function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) { - server.__connections.push(maConn) + await Promise.all( + Array.from(this.connections.values()).map(async maConn => await attemptClose(maConn)) + ) - const untrackConn = () => { - server.__connections = server.__connections.filter(c => c !== maConn) + await new Promise((resolve, reject) => { + this.server.close(err => (err != null) ? reject(err) : resolve()) + }) } - - socket.once('close', untrackConn) } diff --git a/src/socket-to-conn.ts b/src/socket-to-conn.ts index c390f46..88f99cb 100644 --- a/src/socket-to-conn.ts +++ b/src/socket-to-conn.ts @@ -1,4 +1,3 @@ -import { abortableSource } from 'abortable-iterator' import { logger } from '@libp2p/logger' // @ts-expect-error no types import toIterable from 'stream-to-it' @@ -9,6 +8,7 @@ import errCode from 'err-code' import type { Socket } from 'net' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrConnection } from '@libp2p/interface-connection' +import type { CounterGroup } from '@libp2p/interface-metrics' const log = logger('libp2p:tcp:socket') @@ -16,17 +16,19 @@ interface ToConnectionOptions { listeningAddr?: Multiaddr remoteAddr?: Multiaddr localAddr?: Multiaddr - signal?: AbortSignal socketInactivityTimeout?: number socketCloseTimeout?: number + metrics?: CounterGroup + metricPrefix?: string } /** * Convert a socket into a MultiaddrConnection * https://github.com/libp2p/interface-transport#multiaddrconnection */ -export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => { - options = options ?? {} +export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => { + const metrics = options.metrics + const metricPrefix = options.metricPrefix ?? '' const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback socket.setTimeout(inactivityTimeout, () => { log('%s socket read timeout', lOptsStr) + metrics?.increment({ [`${metricPrefix}timeout`]: true }) // only destroy with an error if the remote has not sent the FIN message let err: Error | undefined @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti socket.once('close', () => { log('%s socket read timeout', lOptsStr) + metrics?.increment({ [`${metricPrefix}close`]: true }) // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close @@ -88,14 +92,11 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti // the remote sent a FIN packet which means no more data will be sent // https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end log('socket ended', maConn.remoteAddr.toString()) + metrics?.increment({ [`${metricPrefix}end`]: true }) }) const maConn: MultiaddrConnection = { async sink (source) { - if ((options?.signal) != null) { - source = abortableSource(source, options.signal) - } - try { await sink(source) } catch (err: any) { @@ -112,7 +113,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti socket.end() }, - source: (options.signal != null) ? abortableSource(source, options.signal) : source, + source, // If the remote address was passed, use it - it may have the peer ID encapsulated remoteAddr, diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index 674e9ce..ed50c9c 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -2,12 +2,12 @@ import sinon from 'sinon' import tests from '@libp2p/interface-transport-compliance-tests' import { multiaddr } from '@multiformats/multiaddr' import net from 'net' -import { TCP } from '../src/index.js' +import { tcp } from '../src/index.js' describe('interface-transport compliance', () => { tests({ async setup () { - const tcp = new TCP() + const transport = tcp()() const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091'), multiaddr('/ip4/127.0.0.1/tcp/9092'), @@ -35,7 +35,7 @@ describe('interface-transport compliance', () => { } } - return { transport: tcp, addrs, connector } + return { transport, addrs, connector } }, async teardown () {} }) diff --git a/test/connection.spec.ts b/test/connection.spec.ts index dafaac0..254826a 100644 --- a/test/connection.spec.ts +++ b/test/connection.spec.ts @@ -1,16 +1,16 @@ import { expect } from 'aegir/chai' -import { TCP } from '../src/index.js' +import { tcp } from '../src/index.js' import { multiaddr } from '@multiformats/multiaddr' import { mockUpgrader } from '@libp2p/interface-mocks' import type { Connection } from '@libp2p/interface-connection' -import type { Upgrader } from '@libp2p/interface-transport' +import type { Transport, Upgrader } from '@libp2p/interface-transport' describe('valid localAddr and remoteAddr', () => { - let tcp: TCP + let transport: Transport let upgrader: Upgrader beforeEach(() => { - tcp = new TCP() + transport = tcp()() upgrader = mockUpgrader() }) @@ -24,7 +24,7 @@ describe('valid localAddr and remoteAddr', () => { const handler = (conn: Connection) => handled(conn) // Create a listener with the handler - const listener = tcp.createListener({ + const listener = transport.createListener({ handler, upgrader }) @@ -36,7 +36,7 @@ describe('valid localAddr and remoteAddr', () => { expect(localAddrs.length).to.equal(1) // Dial to that address - await tcp.dial(localAddrs[0], { + await transport.dial(localAddrs[0], { upgrader }) @@ -55,7 +55,7 @@ describe('valid localAddr and remoteAddr', () => { const handler = (conn: Connection) => handled(conn) // Create a listener with the handler - const listener = tcp.createListener({ + const listener = transport.createListener({ handler, upgrader }) @@ -67,7 +67,7 @@ describe('valid localAddr and remoteAddr', () => { expect(localAddrs.length).to.equal(1) // Dial to that address - const dialerConn = await tcp.dial(localAddrs[0], { + const dialerConn = await transport.dial(localAddrs[0], { upgrader }) diff --git a/test/filter.spec.ts b/test/filter.spec.ts index 4d3feec..23b8877 100644 --- a/test/filter.spec.ts +++ b/test/filter.spec.ts @@ -1,15 +1,17 @@ import { expect } from 'aegir/chai' -import { TCP } from '../src/index.js' +import { tcp } from '../src/index.js' import { multiaddr } from '@multiformats/multiaddr' +import type { Transport } from '@libp2p/interface-transport' describe('filter addrs', () => { const base = '/ip4/127.0.0.1' const ipfs = '/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + const unix = '/tmp/some/file.sock' - let tcp: TCP + let transport: Transport before(() => { - tcp = new TCP() + transport = tcp()() }) it('filter valid addrs for this transport', () => { @@ -21,17 +23,19 @@ describe('filter addrs', () => { const ma6 = multiaddr('/ip4/127.0.0.1/tcp/9090/p2p-circuit' + ipfs) const ma7 = multiaddr('/dns4/libp2p.io/tcp/9090') const ma8 = multiaddr('/dnsaddr/libp2p.io/tcp/9090') + const ma9 = multiaddr('/unix' + unix) - const valid = tcp.filter([ma1, ma2, ma3, ma4, ma5, ma6, ma7, ma8]) - expect(valid.length).to.equal(4) + const valid = transport.filter([ma1, ma2, ma3, ma4, ma5, ma6, ma7, ma8, ma9]) + expect(valid.length).to.equal(5) expect(valid[0]).to.deep.equal(ma1) expect(valid[1]).to.deep.equal(ma4) + expect(valid[4]).to.deep.equal(ma9) }) it('filter a single addr for this transport', () => { const ma1 = multiaddr(base + '/tcp/9090') - const valid = tcp.filter([ma1]) + const valid = transport.filter([ma1]) expect(valid.length).to.equal(1) expect(valid[0]).to.eql(ma1) }) diff --git a/test/listen-dial.spec.ts b/test/listen-dial.spec.ts index d3e7491..ded447d 100644 --- a/test/listen-dial.spec.ts +++ b/test/listen-dial.spec.ts @@ -1,5 +1,5 @@ import { expect } from 'aegir/chai' -import { TCP } from '../src/index.js' +import { tcp } from '../src/index.js' import os from 'os' import path from 'path' import { multiaddr } from '@multiformats/multiaddr' @@ -7,19 +7,22 @@ import { pipe } from 'it-pipe' import all from 'it-all' import { mockRegistrar, mockUpgrader } from '@libp2p/interface-mocks' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import type { Upgrader } from '@libp2p/interface-transport' +import type { Transport, Upgrader } from '@libp2p/interface-transport' +import pDefer from 'p-defer' +import type { MultiaddrConnection } from '@libp2p/interface-connection' const isCI = process.env.CI describe('listen', () => { - let tcp: TCP + let transport: Transport let listener: any let upgrader: Upgrader beforeEach(() => { - tcp = new TCP() + transport = tcp()() upgrader = mockUpgrader() }) + afterEach(async () => { try { if (listener != null) { @@ -33,7 +36,7 @@ describe('listen', () => { it('listen on path', async () => { const mh = multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`) - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -41,18 +44,34 @@ describe('listen', () => { it('listen on port 0', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) }) + it('errors when listening on busy port', async () => { + const mh = multiaddr('/ip4/127.0.0.1/tcp/0') + listener = transport.createListener({ + upgrader + }) + await listener.listen(mh) + + const listener2 = transport.createListener({ + upgrader + }) + + const mh2 = listener.getAddrs()[0] + await expect(listener2.listen(mh2)).to.eventually.be.rejected() + .with.property('code', 'EADDRINUSE') + }) + it('listen on IPv6 addr', async () => { if (isCI != null) { return } const mh = multiaddr('/ip6/::/tcp/9090') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -60,7 +79,7 @@ describe('listen', () => { it('listen on any Interface', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -68,7 +87,7 @@ describe('listen', () => { it('getAddrs', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -80,7 +99,7 @@ describe('listen', () => { it('getAddrs on port 0 listen', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/0') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -91,7 +110,7 @@ describe('listen', () => { it('getAddrs from listening on 0.0.0.0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/9090') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -103,7 +122,7 @@ describe('listen', () => { it('getAddrs from listening on 0.0.0.0 and port 0', async () => { const mh = multiaddr('/ip4/0.0.0.0/tcp/0') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -115,7 +134,7 @@ describe('listen', () => { it('getAddrs from listening on ip6 \'::\'', async () => { const mh = multiaddr('/ip6/::/tcp/9090') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -127,7 +146,7 @@ describe('listen', () => { it('getAddrs preserves IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - listener = tcp.createListener({ + listener = transport.createListener({ upgrader }) await listener.listen(mh) @@ -140,7 +159,7 @@ describe('listen', () => { describe('dial', () => { const protocol = '/echo/1.0.0' - let tcp: TCP + let transport: Transport let upgrader: Upgrader beforeEach(async () => { @@ -155,17 +174,17 @@ describe('dial', () => { registrar }) - tcp = new TCP() + transport = tcp()() }) it('dial on IPv4', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090') - const listener = tcp.createListener({ + const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await tcp.dial(ma, { + const conn = await transport.dial(ma, { upgrader }) const stream = await conn.newStream([protocol]) @@ -187,11 +206,11 @@ describe('dial', () => { } const ma = multiaddr('/ip6/::/tcp/9090') - const listener = tcp.createListener({ + const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await tcp.dial(ma, { + const conn = await transport.dial(ma, { upgrader }) const stream = await conn.newStream([protocol]) @@ -209,11 +228,11 @@ describe('dial', () => { it('dial on path', async () => { const ma = multiaddr(`/unix/${path.resolve(os.tmpdir(), `/tmp/p2pd-${Date.now()}.sock`)}`) - const listener = tcp.createListener({ + const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await tcp.dial(ma, { + const conn = await transport.dial(ma, { upgrader }) const stream = await conn.newStream([protocol]) @@ -235,7 +254,7 @@ describe('dial', () => { const ma = multiaddr('/ip6/::/tcp/9090') - const listener = tcp.createListener({ + const listener = transport.createListener({ handler: (conn) => { // let multistream select finish before closing setTimeout(() => { @@ -249,7 +268,7 @@ describe('dial', () => { await listener.listen(ma) const addrs = listener.getAddrs() - const conn = await tcp.dial(addrs[0], { + const conn = await transport.dial(addrs[0], { upgrader }) const stream = await conn.newStream([protocol]) @@ -270,7 +289,7 @@ describe('dial', () => { const ma = multiaddr('/ip6/::/tcp/9090') - const listener = tcp.createListener({ + const listener = transport.createListener({ handler: () => { handled() }, @@ -279,7 +298,7 @@ describe('dial', () => { await listener.listen(ma) const addrs = listener.getAddrs() - const conn = await tcp.dial(addrs[0], { + const conn = await transport.dial(addrs[0], { upgrader }) @@ -290,12 +309,12 @@ describe('dial', () => { it('dials on IPv4 with IPFS Id', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = tcp.createListener({ + const listener = transport.createListener({ upgrader }) await listener.listen(ma) - const conn = await tcp.dial(ma, { + const conn = await transport.dial(ma, { upgrader }) const stream = await conn.newStream([protocol]) @@ -310,4 +329,57 @@ describe('dial', () => { await conn.close() await listener.close() }) + + it('aborts during dial', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const maConnPromise = pDefer() + + // @ts-expect-error missing return value + upgrader.upgradeOutbound = async (maConn) => { + maConnPromise.resolve(maConn) + + // take a long time to give us time to abort the dial + await new Promise((resolve) => { + setTimeout(() => resolve(), 100) + }) + } + + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const abortController = new AbortController() + + // abort once the upgrade process has started + void maConnPromise.promise.then(() => abortController.abort()) + + await expect(transport.dial(ma, { + upgrader, + signal: abortController.signal + })).to.eventually.be.rejected('The operation was aborted') + + await expect(maConnPromise.promise).to.eventually.have.nested.property('timeline.close') + .that.is.ok('did not gracefully close maConn') + + await listener.close() + }) + + it('aborts before dial', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const abortController = new AbortController() + abortController.abort() + + await expect(transport.dial(ma, { + upgrader, + signal: abortController.signal + })).to.eventually.be.rejected('The operation was aborted') + + await listener.close() + }) }) diff --git a/test/max-connections.spec.ts b/test/max-connections.spec.ts new file mode 100644 index 0000000..24551a0 --- /dev/null +++ b/test/max-connections.spec.ts @@ -0,0 +1,79 @@ +import { expect } from 'aegir/chai' +import net from 'node:net' +import { promisify } from 'node:util' +import { mockUpgrader } from '@libp2p/interface-mocks' +import { multiaddr } from '@multiformats/multiaddr' +import { tcp } from '../src/index.js' + +describe('maxConnections', () => { + const afterEachCallbacks: Array<() => Promise | any> = [] + afterEach(async () => { + await Promise.all(afterEachCallbacks.map(fn => fn())) + afterEachCallbacks.length = 0 + }) + + it('reject dial of connection above maxConnections', async () => { + const maxConnections = 2 + const socketCount = 4 + const port = 9900 + + const seenRemoteConnections = new Set() + const transport = tcp({ maxConnections })() + + const upgrader = mockUpgrader() + const listener = transport.createListener({ upgrader }) + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + + listener.addEventListener('connection', (conn) => { + seenRemoteConnections.add(conn.detail.remoteAddr.toString()) + }) + + const sockets: net.Socket[] = [] + + for (let i = 0; i < socketCount; i++) { + const socket = net.connect({ host: '127.0.0.1', port }) + sockets.push(socket) + + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.unshift(async () => { + if (!socket.destroyed) { + socket.destroy() + await new Promise((resolve) => socket.on('close', resolve)) + } + }) + + // Wait for connection so the order of sockets is stable, sockets expected to be alive are always [0,1] + await new Promise((resolve, reject) => { + socket.on('connect', () => { + resolve() + }) + socket.on('error', (err) => { + reject(err) + }) + }) + } + + // With server.maxConnections the TCP socket is created and the initial handshake is completed + // Then in the server handler NodeJS javascript code will call socket.emit('drop') if over the limit + // https://github.com/nodejs/node/blob/fddc701d3c0eb4520f2af570876cc987ae6b4ba2/lib/net.js#L1706 + + // Wait for some time for server to drop all sockets above limit + await promisify(setTimeout)(250) + + expect(seenRemoteConnections.size).equals(maxConnections, 'wrong serverConnections') + + for (let i = 0; i < socketCount; i++) { + const socket = sockets[i] + + if (i < maxConnections) { + // Assert socket connected + expect(socket.destroyed).equals(false, `socket ${i} under limit must not be destroyed`) + } else { + // Assert socket ended + expect(socket.destroyed).equals(true, `socket ${i} above limit must be destroyed`) + } + } + }) +})