diff --git a/CHANGELOG.md b/CHANGELOG.md index 141361958..dbe8b9c75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,11 +22,24 @@ The following emojis are used to highlight certain changes: * `gateway` now sets the [`Content-Location`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Location) header for requests with non-default content format, as a result of content negotiation. This allows generic and misconfigured HTTP caches to store Deserialized, CAR and Block responses separately, under distinct cache keys. * `gateway` now supports `car-dups`, `car-order` and `car-version` as query parameters in addition to the `application/vnd.ipld.car` parameters sent via `Accept` header. The parameters in the `Accept` header have always priority, but including them in URL simplifies HTTP caching and allows use in `Content-Location` header on CAR responses to maximize interoperability with wide array of HTTP caches. * `bitswap/server` now allows to override the default peer ledger with `WithPeerLedger`. +* ✨ `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following additions: + - `client`: now includes `Provide` and `ProvidePeer` methods, which can be used to provide signed records for content routing and peer routing. + - `types`: types related to the Announcement schema record have been added. A `types/iter.Filter` utility has also been added if you want to filter on top of an iterator. ### Changed +* ✨🛠 `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following changes: + - `client`: `WithProviderInfo` now accepts a third parameter, `protocols`, whose value is used when providing the peer. + - `contentrouter`: the `Client` interface has been updated to reflect the changes made to the client, that is, replacing the `ProvideBitswap` method by the generic `Provide` method. + - `server`: the `ContentRouter` interface now includes a `Provide` and a `ProvidePeer` functions. + ### Removed +- ✨🛠 `routing/http`: delegated content and peer routing ([IPIP-378](https://github.com/ipfs/specs/pull/378)) has been implemented. This includes the following removals: + - Deprecated Bitswap-schema related types and functions have been removed (e.g. `ProvideBitswap`, `BitswapRecord`, `SchemaBitswap`, `WriteProvidersRequest`, `WriteProvidersResponse`). + - `server`: `ContentRouter` no longer includes `ProvideBitswap` + - `contentrouter`: the content router adaptor no longer supports the previously deprecated Bitswap schema. + ### Fixed * `routing/http/server` now returns 404 Status Not Found when no records can be found. diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 16840cab5..151a8d3d8 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -25,6 +25,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multibase" ) var ( @@ -44,14 +45,14 @@ type Client struct { clock clock.Clock accepts string - peerID peer.ID - addrs []types.Multiaddr - identity crypto.PrivKey + identity crypto.PrivKey + peerID peer.ID + addrs []types.Multiaddr + protocols []string - // Called immediately after signing a provide request. It is used + // Called immediately after signing a provide (peer) request. It is used // for testing, e.g., testing the server with a mangled signature. - //lint:ignore SA1019 // ignore staticcheck - afterSignCallback func(req *types.WriteBitswapRecord) + afterSignCallback func(req *types.AnnouncementRecord) } // defaultUserAgent is used as a fallback to inform HTTP server which library @@ -76,14 +77,6 @@ type httpClient interface { type Option func(*Client) error -func WithIdentity(identity crypto.PrivKey) Option { - return func(c *Client) error { - c.identity = identity - return nil - } -} - -// WithHTTPClient sets a custom HTTP Client to be used with [Client]. func WithHTTPClient(h httpClient) Option { return func(c *Client) error { c.httpClient = h @@ -115,9 +108,17 @@ func WithUserAgent(ua string) Option { } } -func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) Option { +// WithProviderInfo configures the [Client] with the given provider information. +// This is used by the methods [Client.Provide] and [Client.ProvidePeer] in order +// to create and sign announcement records. +// +// You can still use [Client.ProvideRecords] and [Client.ProvidePeerRecords] +// without this configuration. Then, you must provide already signed-records. +func WithProviderInfo(identity crypto.PrivKey, peerID peer.ID, addrs []multiaddr.Multiaddr, protocols []string) Option { return func(c *Client) error { + c.identity = identity c.peerID = peerID + c.protocols = protocols for _, a := range addrs { c.addrs = append(c.addrs, types.Multiaddr{Multiaddr: a}) } @@ -254,102 +255,140 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil } -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -func (c *Client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { - if c.identity == nil { - return 0, errors.New("cannot provide Bitswap records without an identity") - } - if c.peerID.Size() == 0 { - return 0, errors.New("cannot provide Bitswap records without a peer ID") - } - - ks := make([]types.CID, len(keys)) - for i, c := range keys { - ks[i] = types.CID{Cid: c} +// Provide publishes [types.AnnouncementRecord]s based on the given [types.AnnouncementRequests]. +// This records will be signed by your provided. Therefore, the [Client] must have been configured +// with [WithProviderInfo]. +func (c *Client) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + if err := c.canProvide(); err != nil { + return nil, err } now := c.clock.Now() + records := make([]*types.AnnouncementRecord, len(announcements)) + + for i, announcement := range announcements { + record := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + CID: announcement.CID, + Scope: announcement.Scope, + Timestamp: now, + TTL: announcement.TTL, + ID: &c.peerID, + Addrs: c.addrs, + Protocols: c.protocols, + }, + } - req := types.WriteBitswapRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - Payload: types.BitswapPayload{ - Keys: ks, - AdvisoryTTL: &types.Duration{Duration: ttl}, - Timestamp: &types.Time{Time: now}, - ID: &c.peerID, - Addrs: c.addrs, - }, - } - err := req.Sign(c.peerID, c.identity) - if err != nil { - return 0, err - } + if len(announcement.Metadata) != 0 { + var err error + record.Payload.Metadata, err = multibase.Encode(multibase.Base64, announcement.Metadata) + if err != nil { + return nil, fmt.Errorf("multibase-encoding metadata: %w", err) + } + } - if c.afterSignCallback != nil { - c.afterSignCallback(&req) - } + err := record.Sign(c.peerID, c.identity) + if err != nil { + return nil, err + } - advisoryTTL, err := c.provideSignedBitswapRecord(ctx, &req) - if err != nil { - return 0, err + if c.afterSignCallback != nil { + c.afterSignCallback(record) + } + + records[i] = record } - return advisoryTTL, err + url := c.baseURL + "/routing/v1/providers" + req := jsontypes.AnnounceProvidersRequest{ + Providers: records, + } + return c.provide(ctx, url, req) } -// ProvideAsync makes a provide request to a delegated router -// -//lint:ignore SA1019 // ignore staticcheck -func (c *Client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapRecord) (time.Duration, error) { - //lint:ignore SA1019 // ignore staticcheck - req := jsontypes.WriteProvidersRequest{Providers: []types.Record{bswp}} +// ProvideRecords publishes the given [types.AnnouncementRecord]. An error will +// be returned if the records aren't signed or valid. +func (c *Client) ProvideRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + providerRecords := make([]*types.AnnouncementRecord, len(records)) + for i, record := range records { + if err := record.Verify(); err != nil { + return nil, err + } + providerRecords[i] = records[i] + } - url := c.baseURL + "/routing/v1/providers/" + url := c.baseURL + "/routing/v1/providers" + req := jsontypes.AnnounceProvidersRequest{ + Providers: providerRecords, + } + return c.provide(ctx, url, req) +} +func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { b, err := drjson.MarshalJSONBytes(req) if err != nil { - return 0, err + return nil, err } - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewBuffer(b)) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(b)) if err != nil { - return 0, err + return nil, err } resp, err := c.httpClient.Do(httpReq) if err != nil { - return 0, fmt.Errorf("making HTTP req to provide a signed record: %w", err) + return nil, fmt.Errorf("making HTTP req to provide a signed peer record: %w", err) } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return 0, httpError(resp.StatusCode, resp.Body) + resp.Body.Close() + return nil, httpError(resp.StatusCode, resp.Body) } - //lint:ignore SA1019 // ignore staticcheck - var provideResult jsontypes.WriteProvidersResponse - err = json.NewDecoder(resp.Body).Decode(&provideResult) + respContentType := resp.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(respContentType) if err != nil { - return 0, err - } - if len(provideResult.ProvideResults) != 1 { - return 0, fmt.Errorf("expected 1 result but got %d", len(provideResult.ProvideResults)) + resp.Body.Close() + return nil, fmt.Errorf("parsing Content-Type: %w", err) } - //lint:ignore SA1019 // ignore staticcheck - v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapRecordResponse) - if !ok { - return 0, errors.New("expected AdvisoryTTL field") - } + var skipBodyClose bool + defer func() { + if !skipBodyClose { + resp.Body.Close() + } + }() - if v.AdvisoryTTL != nil { - return v.AdvisoryTTL.Duration, nil + var it iter.ResultIter[*types.AnnouncementResponseRecord] + switch mediaType { + case mediaTypeJSON: + parsedResp := &jsontypes.AnnouncePeersResponse{} + err = json.NewDecoder(resp.Body).Decode(parsedResp) + if err != nil { + return nil, err + } + var sliceIt iter.Iter[*types.AnnouncementResponseRecord] = iter.FromSlice(parsedResp.ProvideResults) + it = iter.ToResultIter(sliceIt) + case mediaTypeNDJSON: + skipBodyClose = true + it = ndjson.NewAnnouncementResponseRecordsIter(resp.Body) + default: + logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) + return nil, errors.New("unknown content type") } - return 0, nil + return it, nil +} + +func (c *Client) canProvide() error { + if c.identity == nil { + return errors.New("cannot provide without identity") + } + if c.peerID.Size() == 0 { + return errors.New("cannot provide without peer ID") + } + return nil } // FindPeers searches for information for the given [peer.ID]. @@ -413,6 +452,9 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI case mediaTypeJSON: parsedResp := &jsontypes.PeersResponse{} err = json.NewDecoder(resp.Body).Decode(parsedResp) + if err != nil { + return nil, err + } var sliceIt iter.Iter[*types.PeerRecord] = iter.FromSlice(parsedResp.Peers) it = iter.ToResultIter(sliceIt) case mediaTypeNDJSON: @@ -426,6 +468,67 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI return &measuringIter[iter.Result[*types.PeerRecord]]{Iter: it, ctx: ctx, m: m}, nil } +// ProvidePeer publishes an [types.AnnouncementRecord] with the provider +// information from your peer, configured with [WithProviderInfo]. +func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []byte) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + if err := c.canProvide(); err != nil { + return nil, err + } + + record := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + Timestamp: time.Now(), + TTL: ttl, + ID: &c.peerID, + Addrs: c.addrs, + Protocols: c.protocols, + }, + } + + if len(metadata) != 0 { + var err error + record.Payload.Metadata, err = multibase.Encode(multibase.Base64, metadata) + if err != nil { + return nil, fmt.Errorf("multibase-encoding metadata: %w", err) + } + } + + err := record.Sign(c.peerID, c.identity) + if err != nil { + return nil, err + } + + if c.afterSignCallback != nil { + c.afterSignCallback(record) + } + + url := c.baseURL + "/routing/v1/peers" + req := jsontypes.AnnouncePeersRequest{ + Peers: []*types.AnnouncementRecord{record}, + } + + return c.provide(ctx, url, req) +} + +// ProvidePeerRecords publishes the given [types.AnnouncementRecord]. An error will +// be returned if the records aren't signed or valid. +func (c *Client) ProvidePeerRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + providerRecords := make([]*types.AnnouncementRecord, len(records)) + for i, record := range records { + if err := record.Verify(); err != nil { + return nil, err + } + providerRecords[i] = records[i] + } + + url := c.baseURL + "/routing/v1/peers" + req := jsontypes.AnnouncePeersRequest{ + Peers: providerRecords, + } + return c.provide(ctx, url, req) +} + // GetIPNS tries to retrieve the [ipns.Record] for the given [ipns.Name]. The record is // validated against the given name. If validation fails, an error is returned, but no // record. diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index 590deed11..02ba691fa 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -36,8 +36,17 @@ func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limi return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } -//lint:ignore SA1019 // ignore staticcheck -func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { +func (m *mockContentRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // Ensure timestamps within tests are within the millisecond. + req.Payload.Timestamp = req.Payload.Timestamp.Truncate(time.Millisecond) + + // Signature must be always present, but not possible to test with the mock. + // We test it here and then empty it so that it matches what the mock expects. + if req.Signature == "" { + return 0, errors.New("signature not present") + } + req.Signature = "" + args := m.Called(ctx, req) return args.Get(0).(time.Duration), args.Error(1) } @@ -47,6 +56,21 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockContentRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // Ensure timestamps within tests are within the millisecond. + req.Payload.Timestamp = req.Payload.Timestamp.Truncate(time.Second) + + // Signature must be always present, but not possible to test with the mock. + // We test it here and then empty it so that it matches what the mock expects. + if req.Signature == "" { + return 0, errors.New("signature not present") + } + req.Signature = "" + + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) return args.Get(0).(*ipns.Record), args.Error(1) @@ -111,8 +135,7 @@ func makeTestDeps(t *testing.T, clientsOpts []Option, serverOpts []server.Option serverAddr := "http://" + server.Listener.Addr().String() recordingHTTPClient := &recordingHTTPClient{httpClient: newDefaultHTTPClient(testUserAgent)} defaultClientOpts := []Option{ - WithProviderInfo(peerID, addrs), - WithIdentity(identity), + WithProviderInfo(identity, peerID, addrs, nil), WithHTTPClient(recordingHTTPClient), } c, err := New(serverAddr, append(defaultClientOpts, clientsOpts...)...) @@ -144,13 +167,6 @@ func makeCID() cid.Cid { return c } -func drAddrsToAddrs(drmas []types.Multiaddr) (addrs []multiaddr.Multiaddr) { - for _, a := range drmas { - addrs = append(addrs, a.Multiaddr) - } - return -} - func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) { for _, a := range addrs { drmas = append(drmas, types.Multiaddr{Multiaddr: a}) @@ -169,19 +185,6 @@ func makePeerRecord() types.PeerRecord { } } -//lint:ignore SA1019 // ignore staticcheck -func makeBitswapRecord() types.BitswapRecord { - peerID, addrs, _ := makeProviderAndIdentity() - //lint:ignore SA1019 // ignore staticcheck - return types.BitswapRecord{ - //lint:ignore SA1019 // ignore staticcheck - Schema: types.SchemaBitswap, - ID: &peerID, - Protocol: "transport-bitswap", - Addrs: addrsToDRAddrs(addrs), - } -} - func makeProviderAndIdentity() (peer.ID, []multiaddr.Multiaddr, crypto.PrivKey) { priv, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { @@ -227,11 +230,6 @@ func TestClient_FindProviders(t *testing.T) { {Val: &peerRecord}, } - bitswapRecord := makeBitswapRecord() - bitswapProviders := []iter.Result[types.Record]{ - {Val: &bitswapRecord}, - } - cases := []struct { name string httpStatusCode int @@ -252,12 +250,6 @@ func TestClient_FindProviders(t *testing.T) { expResult: peerProviders, expStreamingResponse: true, }, - { - name: "happy case (with deprecated bitswap schema)", - routerResult: bitswapProviders, - expResult: bitswapProviders, - expStreamingResponse: true, - }, { name: "server doesn't support streaming", routerResult: peerProviders, @@ -375,36 +367,37 @@ func TestClient_Provide(t *testing.T) { expErrContains string expWinErrContains string - expAdvisoryTTL time.Duration + expResponseErrorContains string + expAdvisoryTTL time.Duration }{ { name: "happy case", cids: []cid.Cid{makeCID()}, ttl: 1 * time.Hour, - routerAdvisoryTTL: 1 * time.Minute, + routerAdvisoryTTL: 2 * time.Minute, - expAdvisoryTTL: 1 * time.Minute, + expAdvisoryTTL: 2 * time.Minute, }, { - name: "should return a 403 if the payload signature verification fails", - cids: []cid.Cid{}, - mangleSignature: true, - expErrContains: "HTTP error with StatusCode=403", + name: "should return an error if the payload signature verification fails", + cids: []cid.Cid{makeCID()}, + mangleSignature: true, + expResponseErrorContains: "signature verification failed", }, { name: "should return error if identity is not provided", noIdentity: true, - expErrContains: "cannot provide Bitswap records without an identity", + expErrContains: "cannot provide without identity", }, { name: "should return error if provider is not provided", noProviderInfo: true, - expErrContains: "cannot provide Bitswap records without a peer ID", + expErrContains: "cannot provide without peer ID", }, { name: "returns an error if there's a non-200 response", manglePath: true, - expErrContains: "HTTP error with StatusCode=404: 404 page not found", + expErrContains: "HTTP error with StatusCode=404", }, { name: "returns an error if the HTTP client returns a non-HTTP error", @@ -422,6 +415,7 @@ func TestClient_Provide(t *testing.T) { if c.noIdentity { client.identity = nil } + if c.noProviderInfo { client.peerID = "" client.addrs = nil @@ -440,8 +434,7 @@ func TestClient_Provide(t *testing.T) { deps.server.Close() } if c.mangleSignature { - //lint:ignore SA1019 // ignore staticcheck - client.afterSignCallback = func(req *types.WriteBitswapRecord) { + client.afterSignCallback = func(req *types.AnnouncementRecord) { mh, err := multihash.Encode([]byte("boom"), multihash.SHA2_256) require.NoError(t, err) mb, err := multibase.Encode(multibase.Base64, mh) @@ -451,19 +444,20 @@ func TestClient_Provide(t *testing.T) { } } - //lint:ignore SA1019 // ignore staticcheck - expectedProvReq := &server.BitswapWriteProvideRequest{ - Keys: c.cids, - Timestamp: clock.Now().Truncate(time.Millisecond), - AdvisoryTTL: c.ttl, - Addrs: drAddrsToAddrs(client.addrs), - ID: client.peerID, + for _, cid := range c.cids { + router.On("Provide", mock.Anything, &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + CID: cid, + Timestamp: clock.Now().UTC().Truncate(time.Millisecond), + TTL: c.ttl, + Addrs: client.addrs, + ID: &client.peerID, + }, + }).Return(c.routerAdvisoryTTL, c.routerErr) } - router.On("ProvideBitswap", mock.Anything, expectedProvReq). - Return(c.routerAdvisoryTTL, c.routerErr) - - advisoryTTL, err := client.ProvideBitswap(ctx, c.cids, c.ttl) + recs, err := client.Provide(ctx, makeBatchAnnouncements(c.cids, c.ttl)...) var errorString string if runtime.GOOS == "windows" && c.expWinErrContains != "" { @@ -474,15 +468,35 @@ func TestClient_Provide(t *testing.T) { if errorString != "" { require.ErrorContains(t, err, errorString) + return } else { require.NoError(t, err) } - assert.Equal(t, c.expAdvisoryTTL, advisoryTTL) + results, err := iter.ReadAllResults(recs) + require.NoError(t, err) + + require.Len(t, results, 1) + + if c.expResponseErrorContains != "" { + require.Contains(t, results[0].Error, c.expResponseErrorContains) + } else { + require.Empty(t, results[0].Error) + } + + assert.Equal(t, c.expAdvisoryTTL, results[0].TTL) }) } } +func makeBatchAnnouncements(keys []cid.Cid, ttl time.Duration) []types.AnnouncementRequest { + reqs := make([]types.AnnouncementRequest, len(keys)) + for i, key := range keys { + reqs[i] = types.AnnouncementRequest{CID: key, TTL: ttl} + } + return reqs +} + func TestClient_FindPeers(t *testing.T) { peerRecord := makePeerRecord() peerRecords := []iter.Result[*types.PeerRecord]{ @@ -611,6 +625,139 @@ func TestClient_FindPeers(t *testing.T) { } } +func TestClient_ProvidePeer(t *testing.T) { + cases := []struct { + name string + manglePath bool + mangleSignature bool + stopServer bool + noProviderInfo bool + noIdentity bool + + ttl time.Duration + + routerAdvisoryTTL time.Duration + routerErr error + + expErrContains string + expWinErrContains string + + expResponseErrorContains string + expAdvisoryTTL time.Duration + }{ + { + name: "happy case", + ttl: 1 * time.Hour, + routerAdvisoryTTL: 2 * time.Minute, + expAdvisoryTTL: 2 * time.Minute, + }, + { + name: "should return an error if the payload signature verification fails", + mangleSignature: true, + expResponseErrorContains: "signature verification failed", + }, + { + name: "should return error if identity is not provided", + noIdentity: true, + expErrContains: "cannot provide without identity", + }, + { + name: "should return error if provider is not provided", + noProviderInfo: true, + expErrContains: "cannot provide without peer ID", + }, + { + name: "returns an error if there's a non-200 response", + manglePath: true, + expErrContains: "HTTP error with StatusCode=404", + }, + { + name: "returns an error if the HTTP client returns a non-HTTP error", + stopServer: true, + expErrContains: "connect: connection refused", + expWinErrContains: "connectex: No connection could be made because the target machine actively refused it.", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + deps := makeTestDeps(t, nil, nil) + client := deps.client + router := deps.router + + if c.noIdentity { + client.identity = nil + } + + if c.noProviderInfo { + client.peerID = "" + client.addrs = nil + } + + clock := clock.NewMock() + clock.Set(time.Now()) + client.clock = clock + + ctx := context.Background() + + if c.manglePath { + client.baseURL += "/foo" + } + if c.stopServer { + deps.server.Close() + } + if c.mangleSignature { + client.afterSignCallback = func(req *types.AnnouncementRecord) { + mh, err := multihash.Encode([]byte("boom"), multihash.SHA2_256) + require.NoError(t, err) + mb, err := multibase.Encode(multibase.Base64, mh) + require.NoError(t, err) + + req.Signature = mb + } + } + + router.On("ProvidePeer", mock.Anything, &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + Timestamp: clock.Now().UTC().Truncate(time.Second), + TTL: c.ttl, + Addrs: client.addrs, + ID: &client.peerID, + }, + }).Return(c.routerAdvisoryTTL, c.routerErr) + + recs, err := client.ProvidePeer(ctx, c.ttl, nil) + + var errorString string + if runtime.GOOS == "windows" && c.expWinErrContains != "" { + errorString = c.expWinErrContains + } else { + errorString = c.expErrContains + } + + if errorString != "" { + require.ErrorContains(t, err, errorString) + return + } else { + require.NoError(t, err) + } + + results, err := iter.ReadAllResults(recs) + require.NoError(t, err) + + require.Len(t, results, 1) + + if c.expResponseErrorContains != "" { + require.Contains(t, results[0].Error, c.expResponseErrorContains) + } else { + require.Empty(t, results[0].Error) + } + + assert.Equal(t, c.expAdvisoryTTL, results[0].TTL) + }) + } +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index 9115ef154..5e6685c10 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -25,7 +25,7 @@ const ttl = 24 * time.Hour type Client interface { FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) - ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) + Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error @@ -76,7 +76,7 @@ func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool) return nil } - _, err := c.client.ProvideBitswap(ctx, []cid.Cid{key}, ttl) + _, err := c.client.Provide(ctx, types.AnnouncementRequest{CID: key, TTL: ttl}) return err } @@ -90,7 +90,7 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult } if len(keys) <= c.maxProvideBatchSize { - _, err := c.client.ProvideBitswap(ctx, keys, ttl) + _, err := c.client.Provide(ctx, makeBatchAnnouncements(keys, ttl)...) return err } @@ -100,7 +100,7 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult c.maxProvideConcurrency, keys, func(ctx context.Context, batch []cid.Cid) error { - _, err := c.client.ProvideBitswap(ctx, batch, ttl) + _, err := c.client.Provide(ctx, makeBatchAnnouncements(batch, ttl)...) return err }, ) @@ -111,8 +111,7 @@ func (c *contentRouter) Ready() bool { return true } -// readProviderResponses reads peer records (and bitswap records for legacy -// compatibility) from the iterator into the given channel. +// readProviderResponses reads peer records from the iterator into the given channel. func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) { defer close(ch) defer iter.Close() @@ -140,28 +139,6 @@ func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.Ad addrs = append(addrs, a.Multiaddr) } - ch <- peer.AddrInfo{ - ID: *result.ID, - Addrs: addrs, - } - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - result, ok := v.(*types.BitswapRecord) - if !ok { - logger.Errorw( - "problem casting find providers result", - "Schema", v.GetSchema(), - "Type", reflect.TypeOf(v).String(), - ) - continue - } - - var addrs []multiaddr.Multiaddr - for _, a := range result.Addrs { - addrs = append(addrs, a.Multiaddr) - } - ch <- peer.AddrInfo{ ID: *result.ID, Addrs: addrs, @@ -170,6 +147,14 @@ func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.Ad } } +func makeBatchAnnouncements(keys []cid.Cid, ttl time.Duration) []types.AnnouncementRequest { + reqs := make([]types.AnnouncementRequest, len(keys)) + for i, key := range keys { + reqs[i] = types.AnnouncementRequest{CID: key, TTL: ttl} + } + return reqs +} + func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo { resultsIter, err := c.client.FindProviders(ctx, key) if err != nil { diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 1c47850b9..918de035e 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -22,16 +22,16 @@ import ( type mockClient struct{ mock.Mock } -func (m *mockClient) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { - args := m.Called(ctx, keys, ttl) - return args.Get(0).(time.Duration), args.Error(1) -} - func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key) return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } +func (m *mockClient) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + args := m.Called(ctx, announcements) + return args.Get(0).(iter.ResultIter[*types.AnnouncementResponseRecord]), args.Error(1) +} + func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[*types.PeerRecord], error) { args := m.Called(ctx, pid) return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) @@ -76,14 +76,17 @@ func TestProvide(t *testing.T) { crc := NewContentRoutingClient(client) if !c.expNotProvided { - client.On("ProvideBitswap", ctx, []cid.Cid{key}, ttl).Return(time.Minute, nil) + res := []*types.AnnouncementResponseRecord{ + {TTL: time.Minute}, + } + client.On("Provide", ctx, []types.AnnouncementRequest{{CID: key, TTL: ttl}}).Return(iter.ToResultIter[*types.AnnouncementResponseRecord](iter.FromSlice(res)), nil) } err := crc.Provide(ctx, key, c.announce) assert.NoError(t, err) if c.expNotProvided { - client.AssertNumberOfCalls(t, "ProvideBitswap", 0) + client.AssertNumberOfCalls(t, "Provide", 0) } }) } @@ -98,9 +101,10 @@ func TestProvideMany(t *testing.T) { ctx := context.Background() client := &mockClient{} crc := NewContentRoutingClient(client) - - client.On("ProvideBitswap", ctx, cids, ttl).Return(time.Minute, nil) - + res := []*types.AnnouncementResponseRecord{ + {TTL: time.Minute}, + } + client.On("Provide", ctx, makeBatchAnnouncements(cids, ttl)).Return(iter.ToResultIter[*types.AnnouncementResponseRecord](iter.FromSlice(res)), nil) err := crc.ProvideMany(ctx, mhs) require.NoError(t, err) } @@ -135,12 +139,10 @@ func TestFindProvidersAsync(t *testing.T) { ID: &p1, Protocols: []string{"transport-bitswap"}, }, - //lint:ignore SA1019 // ignore staticcheck - &types.BitswapRecord{ - //lint:ignore SA1019 // ignore staticcheck - Schema: types.SchemaBitswap, - ID: &p2, - Protocol: "transport-bitswap", + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p2, + Protocols: []string{"transport-bitswap"}, }, &types.PeerRecord{ Schema: types.SchemaPeer, diff --git a/routing/http/server/server.go b/routing/http/server/server.go index deb7a0e18..221faa3a4 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -19,10 +19,10 @@ import ( "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" jsontypes "github.com/ipfs/boxo/routing/http/types/json" + "github.com/ipfs/boxo/routing/http/types/ndjson" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" - "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multibase" logging "github.com/ipfs/go-log/v2" @@ -41,10 +41,11 @@ const ( var logger = logging.Logger("routing/http/server") const ( - providePath = "/routing/v1/providers/" + providePath = "/routing/v1/providers" findProvidersPath = "/routing/v1/providers/{cid}" + providePeersPath = "/routing/v1/peers" findPeersPath = "/routing/v1/peers/{peer-id}" - GetIPNSPath = "/routing/v1/ipns/{cid}" + getIPNSPath = "/routing/v1/ipns/{cid}" ) type FindProvidersAsyncResponse struct { @@ -57,15 +58,18 @@ type ContentRouter interface { // Limit indicates the maximum amount of results to return; 0 means unbounded. FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) - // Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: - // - // [IPIP-378]: https://github.com/ipfs/specs/pull/378 - ProvideBitswap(ctx context.Context, req *BitswapWriteProvideRequest) (time.Duration, error) + // Provide stores the provided [types.AnnouncementRecord] record for CIDs. Can return + // a different TTL than the provided. + Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) // FindPeers searches for peers who have the provided [peer.ID]. // Limit indicates the maximum amount of results to return; 0 means unbounded. FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) + // ProvidePeer stores the provided [types.AnnouncementRecord] record for peers. Can + // return a different TTL than the provided. + ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) + // GetIPNS searches for an [ipns.Record] for the given [ipns.Name]. GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) @@ -74,26 +78,6 @@ type ContentRouter interface { PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error } -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type BitswapWriteProvideRequest struct { - Keys []cid.Cid - Timestamp time.Time - AdvisoryTTL time.Duration - ID peer.ID - Addrs []multiaddr.Multiaddr -} - -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type WriteProvideRequest struct { - Protocol string - Schema string - Bytes []byte -} - type Option func(s *server) // WithStreamingResultsDisabled disables ndjson responses, so that the server only supports JSON responses. @@ -134,10 +118,11 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler { r := mux.NewRouter() r.HandleFunc(findProvidersPath, server.findProviders).Methods(http.MethodGet) - r.HandleFunc(providePath, server.provide).Methods(http.MethodPut) + r.HandleFunc(providePath, server.provide).Methods(http.MethodPost) r.HandleFunc(findPeersPath, server.findPeers).Methods(http.MethodGet) - r.HandleFunc(GetIPNSPath, server.GetIPNS).Methods(http.MethodGet) - r.HandleFunc(GetIPNSPath, server.PutIPNS).Methods(http.MethodPut) + r.HandleFunc(providePeersPath, server.providePeers).Methods(http.MethodPost) + r.HandleFunc(getIPNSPath, server.GetIPNS).Methods(http.MethodGet) + r.HandleFunc(getIPNSPath, server.PutIPNS).Methods(http.MethodPut) return r } @@ -304,63 +289,116 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { handlerFunc(w, provIter) } -func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { - //lint:ignore SA1019 // ignore staticcheck - req := jsontypes.WriteProvidersRequest{} - err := json.NewDecoder(httpReq.Body).Decode(&req) - _ = httpReq.Body.Close() +func (s *server) providePeers(w http.ResponseWriter, r *http.Request) { + var requestIter iter.ResultIter[*types.AnnouncementRecord] + if r.Header.Get("Content-Type") == mediaTypeNDJSON { + requestIter = ndjson.NewAnnouncementRecordsIter(r.Body) + } else { + req := jsontypes.AnnouncePeersRequest{} + err := json.NewDecoder(r.Body).Decode(&req) + _ = r.Body.Close() + if err != nil { + writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("invalid request: %w", err)) + return + } + requestIter = iter.ToResultIter(iter.FromSlice(req.Peers)) + } + + responseIter := iter.Map(requestIter, func(t iter.Result[*types.AnnouncementRecord]) *types.AnnouncementResponseRecord { + resRecord := &types.AnnouncementResponseRecord{ + Schema: types.SchemaAnnouncementResponse, + } + + if t.Err != nil { + resRecord.Error = t.Err.Error() + return resRecord + } + + err := t.Val.Verify() + if err != nil { + resRecord.Error = fmt.Sprintf("Provide: signature verification failed: %s", err) + return resRecord + } + + ttl, err := s.svc.ProvidePeer(r.Context(), t.Val) + if err != nil { + resRecord.Error = err.Error() + return resRecord + } + + resRecord.TTL = ttl + return resRecord + }) + + mediaType, err := s.detectResponseType(r) if err != nil { - writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("invalid request: %w", err)) + writeErr(w, "FindPeers", http.StatusBadRequest, err) return } - //lint:ignore SA1019 // ignore staticcheck - resp := jsontypes.WriteProvidersResponse{} - - for i, prov := range req.Providers { - switch v := prov.(type) { - //lint:ignore SA1019 // ignore staticcheck - case *types.WriteBitswapRecord: - err := v.Verify() - if err != nil { - logErr("Provide", "signature verification failed", err) - writeErr(w, "Provide", http.StatusForbidden, errors.New("signature verification failed")) - return - } + if mediaType == mediaTypeNDJSON { + writeResultsIterNDJSON(w, iter.ToResultIter(responseIter)) + } else { + writeJSONResult(w, "ProvidePeers", jsontypes.AnnouncePeersResponse{ + ProvideResults: iter.ReadAll(responseIter), + }) + } +} - keys := make([]cid.Cid, len(v.Payload.Keys)) - for i, k := range v.Payload.Keys { - keys[i] = k.Cid - } - addrs := make([]multiaddr.Multiaddr, len(v.Payload.Addrs)) - for i, a := range v.Payload.Addrs { - addrs[i] = a.Multiaddr - } - advisoryTTL, err := s.svc.ProvideBitswap(httpReq.Context(), &BitswapWriteProvideRequest{ - Keys: keys, - Timestamp: v.Payload.Timestamp.Time, - AdvisoryTTL: v.Payload.AdvisoryTTL.Duration, - ID: *v.Payload.ID, - Addrs: addrs, - }) - if err != nil { - writeErr(w, "Provide", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) - return - } - resp.ProvideResults = append(resp.ProvideResults, - //lint:ignore SA1019 // ignore staticcheck - &types.WriteBitswapRecordResponse{ - Protocol: v.Protocol, - Schema: v.Schema, - AdvisoryTTL: &types.Duration{Duration: advisoryTTL}, - }, - ) - default: - writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("provider record %d is not bitswap", i)) +func (s *server) provide(w http.ResponseWriter, r *http.Request) { + var requestIter iter.ResultIter[*types.AnnouncementRecord] + if r.Header.Get("Content-Type") == mediaTypeNDJSON { + requestIter = ndjson.NewAnnouncementRecordsIter(r.Body) + } else { + req := jsontypes.AnnounceProvidersRequest{} + err := json.NewDecoder(r.Body).Decode(&req) + _ = r.Body.Close() + if err != nil { + writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("invalid request: %w", err)) return } + requestIter = iter.ToResultIter(iter.FromSlice(req.Providers)) + } + + responseIter := iter.Map(requestIter, func(t iter.Result[*types.AnnouncementRecord]) *types.AnnouncementResponseRecord { + resRecord := &types.AnnouncementResponseRecord{ + Schema: types.SchemaAnnouncementResponse, + } + + if t.Err != nil { + resRecord.Error = t.Err.Error() + return resRecord + } + + err := t.Val.Verify() + if err != nil { + resRecord.Error = fmt.Sprintf("Provide: signature verification failed: %s", err) + return resRecord + } + + ttl, err := s.svc.Provide(r.Context(), t.Val) + if err != nil { + resRecord.Error = err.Error() + return resRecord + } + + resRecord.TTL = ttl + return resRecord + }) + + mediaType, err := s.detectResponseType(r) + if err != nil { + writeErr(w, "FindPeers", http.StatusBadRequest, err) + return + } + + if mediaType == mediaTypeNDJSON { + writeResultsIterNDJSON(w, iter.ToResultIter(responseIter)) + } else { + writeJSONResult(w, "Provide", jsontypes.AnnounceProvidersResponse{ + ProvideResults: iter.ReadAll(responseIter), + }) } - writeJSONResult(w, "Provide", resp) } func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord]) { diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index 5f2f6b7fe..83856f38c 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "encoding/json" "io" "net/http" "net/http/httptest" @@ -17,11 +18,13 @@ import ( "github.com/ipfs/boxo/path" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" + tjson "github.com/ipfs/boxo/routing/http/types/json" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" b58 "github.com/mr-tron/base58/base58" + "github.com/multiformats/go-multihash" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -71,6 +74,16 @@ func makePeerID(t *testing.T) (crypto.PrivKey, peer.ID) { return sk, pid } +func makeCID(t *testing.T) cid.Cid { + buf := make([]byte, 63) + _, err := rand.Read(buf) + require.NoError(t, err) + mh, err := multihash.Encode(buf, multihash.SHA2_256) + require.NoError(t, err) + c := cid.NewCidV1(0, mh) + return c +} + func requireCloseToNow(t *testing.T, lastModified string) { // inspecting fields like 'Last-Modified' is prone to one-off errors, we test with 1m buffer lastModifiedTime, err := time.Parse(http.TimeFormat, lastModified) @@ -79,19 +92,17 @@ func requireCloseToNow(t *testing.T, lastModified string) { } func TestProviders(t *testing.T) { - pidStr := "12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn" - pid2Str := "12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz" - cidStr := "bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4" + // Prepare some variables common to all tests. + sk1, pid1 := makePeerID(t) + pid1Str := pid1.String() - pid, err := peer.Decode(pidStr) - require.NoError(t, err) - pid2, err := peer.Decode(pid2Str) - require.NoError(t, err) + sk2, pid2 := makePeerID(t) + pid2Str := pid2.String() - cid, err := cid.Decode(cidStr) - require.NoError(t, err) + cid1 := makeCID(t) + cid1Str := cid1.String() - runTest := func(t *testing.T, contentType string, empty bool, expectedStream bool, expectedBody string) { + runGetTest := func(t *testing.T, contentType string, empty bool, expectedStream bool, expectedBody string) { t.Parallel() var results *iter.SliceIter[iter.Result[types.Record]] @@ -102,17 +113,15 @@ func TestProviders(t *testing.T) { results = iter.FromSlice([]iter.Result[types.Record]{ {Val: &types.PeerRecord{ Schema: types.SchemaPeer, - ID: &pid, + ID: &pid1, Protocols: []string{"transport-bitswap"}, Addrs: []types.Multiaddr{}, }}, - //lint:ignore SA1019 // ignore staticcheck - {Val: &types.BitswapRecord{ - //lint:ignore SA1019 // ignore staticcheck - Schema: types.SchemaBitswap, - ID: &pid2, - Protocol: "transport-bitswap", - Addrs: []types.Multiaddr{}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid2, + Protocols: []string{"transport-bitswap"}, + Addrs: []types.Multiaddr{}, }}}, ) } @@ -125,8 +134,8 @@ func TestProviders(t *testing.T) { if expectedStream { limit = DefaultStreamingRecordsLimit } - router.On("FindProviders", mock.Anything, cid, limit).Return(results, nil) - urlStr := serverAddr + "/routing/v1/providers/" + cidStr + router.On("FindProviders", mock.Anything, cid1, limit).Return(results, nil) + urlStr := serverAddr + "/routing/v1/providers/" + cid1Str req, err := http.NewRequest(http.MethodGet, urlStr, nil) require.NoError(t, err) @@ -163,30 +172,99 @@ func TestProviders(t *testing.T) { require.Equal(t, expectedBody, string(body)) } - t.Run("JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, false, false, `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}]}`) + t.Run("GET /routing/v1/peers/{cid} (JSON Response)", func(t *testing.T) { + runGetTest(t, mediaTypeJSON, false, false, `{"Providers":[{"Addrs":[],"ID":"`+pid1Str+`","Protocols":["transport-bitswap"],"Schema":"peer"},{"Addrs":[],"ID":"`+pid2Str+`","Protocols":["transport-bitswap"],"Schema":"peer"}]}`) }) - t.Run("Empty JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, true, false, `{"Providers":null}`) + t.Run("GET /routing/v1/peers/{cid} (Empty JSON Response)", func(t *testing.T) { + runGetTest(t, mediaTypeJSON, true, false, `{"Providers":null}`) }) - t.Run("Wildcard Accept header defaults to JSON Response", func(t *testing.T) { + t.Run("GET /routing/v1/peers/{cid} (Wildcard Accept header defaults to JSON Response)", func(t *testing.T) { accept := "text/html,*/*" - runTest(t, accept, true, false, `{"Providers":null}`) + runGetTest(t, accept, true, false, `{"Providers":null}`) }) - t.Run("Missing Accept header defaults to JSON Response", func(t *testing.T) { + t.Run("GET /routing/v1/peers/{cid} (Missing Accept header defaults to JSON Response)", func(t *testing.T) { accept := "" - runTest(t, accept, true, false, `{"Providers":null}`) + runGetTest(t, accept, true, false, `{"Providers":null}`) }) - t.Run("NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, false, true, `{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}`+"\n") + t.Run("GET /routing/v1/peers/{cid} (NDJSON Response)", func(t *testing.T) { + runGetTest(t, mediaTypeNDJSON, false, true, `{"Addrs":[],"ID":"`+pid1Str+`","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"`+pid2Str+`","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n") }) - t.Run("Empty NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, true, true, "") + t.Run("GET /routing/v1/peers/{cid} (Empty NDJSON Response)", func(t *testing.T) { + runGetTest(t, mediaTypeNDJSON, true, true, "") + }) + + runPostTest := func(t *testing.T, contentType string, expectedBody string) { + t.Parallel() + + rec1 := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + CID: cid1, + Timestamp: time.Now().UTC(), + TTL: time.Hour, + ID: &pid1, + Protocols: []string{"transport-🌈"}, + }, + } + err := rec1.Sign(pid1, sk1) + require.NoError(t, err) + + rec2 := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + CID: cid1, + Timestamp: time.Now().UTC(), + TTL: time.Hour, + ID: &pid2, + Protocols: []string{"transport-🌈"}, + }, + } + err = rec2.Sign(pid2, sk2) + require.NoError(t, err) + + req := tjson.AnnounceProvidersRequest{Providers: []*types.AnnouncementRecord{rec1, rec2}} + body, err := json.Marshal(req) + require.NoError(t, err) + + router := &mockContentRouter{} + server := httptest.NewServer(Handler(router)) + t.Cleanup(server.Close) + + serverAddr := "http://" + server.Listener.Addr().String() + + router.On("Provide", mock.Anything, rec1).Return(time.Hour, nil) + + router.On("Provide", mock.Anything, rec2).Return(time.Minute, nil) + + urlStr := serverAddr + "/routing/v1/providers" + + httpReq, err := http.NewRequest(http.MethodPost, urlStr, bytes.NewReader(body)) + require.NoError(t, err) + httpReq.Header.Set("Accept", contentType) + + resp, err := http.DefaultClient.Do(httpReq) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + header := resp.Header.Get("Content-Type") + require.Equal(t, contentType, header) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, expectedBody, string(body)) + } + + t.Run("POST /routing/v1/providers (JSON Response)", func(t *testing.T) { + runPostTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement-response","TTL":3600000},{"Schema":"announcement-response","TTL":60000}]}`) + }) + + t.Run("POST /routing/v1/providers (NDJSON Response)", func(t *testing.T) { + runPostTest(t, mediaTypeNDJSON, `{"Schema":"announcement-response","TTL":3600000}`+"\n"+`{"Schema":"announcement-response","TTL":60000}`+"\n") }) t.Run("404 when router returns routing.ErrNotFound", func(t *testing.T) { @@ -195,9 +273,9 @@ func TestProviders(t *testing.T) { server := httptest.NewServer(Handler(router)) t.Cleanup(server.Close) serverAddr := "http://" + server.Listener.Addr().String() - router.On("FindProviders", mock.Anything, cid, DefaultRecordsLimit).Return(nil, routing.ErrNotFound) + router.On("FindProviders", mock.Anything, cid1, DefaultRecordsLimit).Return(nil, routing.ErrNotFound) - req, err := http.NewRequest(http.MethodGet, serverAddr+"/routing/v1/providers/"+cidStr, nil) + req, err := http.NewRequest(http.MethodGet, serverAddr+"/routing/v1/providers/"+cid1Str, nil) require.NoError(t, err) resp, err := http.DefaultClient.Do(req) @@ -207,7 +285,7 @@ func TestProviders(t *testing.T) { } func TestPeers(t *testing.T) { - makeRequest := func(t *testing.T, router *mockContentRouter, contentType, arg string) *http.Response { + makeGetRequest := func(t *testing.T, router *mockContentRouter, contentType, arg string) *http.Response { server := httptest.NewServer(Handler(router)) t.Cleanup(server.Close) req, err := http.NewRequest(http.MethodGet, "http://"+server.Listener.Addr().String()+"/routing/v1/peers/"+arg, nil) @@ -224,7 +302,7 @@ func TestPeers(t *testing.T) { t.Parallel() router := &mockContentRouter{} - resp := makeRequest(t, router, mediaTypeJSON, "bafkqaaa") + resp := makeGetRequest(t, router, mediaTypeJSON, "bafkqaaa") require.Equal(t, 400, resp.StatusCode) }) @@ -237,7 +315,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) - resp := makeRequest(t, router, mediaTypeJSON, peer.ToCid(pid).String()) + resp := makeGetRequest(t, router, mediaTypeJSON, peer.ToCid(pid).String()) require.Equal(t, 404, resp.StatusCode) require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) @@ -257,7 +335,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) // Simulate request with Accept header that includes wildcard match - resp := makeRequest(t, router, "text/html,*/*", peer.ToCid(pid).String()) + resp := makeGetRequest(t, router, "text/html,*/*", peer.ToCid(pid).String()) // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -275,7 +353,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) // Simulate request without Accept header - resp := makeRequest(t, router, "", peer.ToCid(pid).String()) + resp := makeGetRequest(t, router, "", peer.ToCid(pid).String()) // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -291,7 +369,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 20).Return(nil, routing.ErrNotFound) // Simulate request without Accept header - resp := makeRequest(t, router, "", peer.ToCid(pid).String()) + resp := makeGetRequest(t, router, "", peer.ToCid(pid).String()) // Expect response to default to application/json require.Equal(t, 404, resp.StatusCode) @@ -321,7 +399,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) libp2pKeyCID := peer.ToCid(pid).String() - resp := makeRequest(t, router, mediaTypeJSON, libp2pKeyCID) + resp := makeGetRequest(t, router, mediaTypeJSON, libp2pKeyCID) require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) @@ -346,7 +424,7 @@ func TestPeers(t *testing.T) { router := &mockContentRouter{} router.On("FindPeers", mock.Anything, pid, 0).Return(results, nil) - resp := makeRequest(t, router, mediaTypeNDJSON, peer.ToCid(pid).String()) + resp := makeGetRequest(t, router, mediaTypeNDJSON, peer.ToCid(pid).String()) require.Equal(t, 404, resp.StatusCode) require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) @@ -379,7 +457,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 0).Return(results, nil) libp2pKeyCID := peer.ToCid(pid).String() - resp := makeRequest(t, router, mediaTypeNDJSON, libp2pKeyCID) + resp := makeGetRequest(t, router, mediaTypeNDJSON, libp2pKeyCID) require.Equal(t, 200, resp.StatusCode) require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) @@ -416,7 +494,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) legacyPeerID := b58.Encode([]byte(pid)) - resp := makeRequest(t, router, mediaTypeJSON, legacyPeerID) + resp := makeGetRequest(t, router, mediaTypeJSON, legacyPeerID) require.Equal(t, 200, resp.StatusCode) header := resp.Header.Get("Content-Type") @@ -453,7 +531,7 @@ func TestPeers(t *testing.T) { router.On("FindPeers", mock.Anything, pid, 0).Return(results, nil) legacyPeerID := b58.Encode([]byte(pid)) - resp := makeRequest(t, router, mediaTypeNDJSON, legacyPeerID) + resp := makeGetRequest(t, router, mediaTypeNDJSON, legacyPeerID) require.Equal(t, 200, resp.StatusCode) header := resp.Header.Get("Content-Type") @@ -467,6 +545,75 @@ func TestPeers(t *testing.T) { require.Equal(t, expectedBody, string(body)) }) + sk1, pid1 := makePeerID(t) + sk2, pid2 := makePeerID(t) + + runPostTest := func(t *testing.T, contentType string, expectedBody string) { + t.Parallel() + + rec1 := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + Timestamp: time.Now().UTC(), + TTL: time.Hour, + ID: &pid1, + Protocols: []string{"transport-🌈"}, + }, + } + err := rec1.Sign(pid1, sk1) + require.NoError(t, err) + + rec2 := &types.AnnouncementRecord{ + Schema: types.SchemaAnnouncement, + Payload: types.AnnouncementPayload{ + Timestamp: time.Now().UTC(), + TTL: time.Hour, + ID: &pid2, + Protocols: []string{"transport-🌈"}, + }, + } + err = rec2.Sign(pid2, sk2) + require.NoError(t, err) + + req := tjson.AnnouncePeersRequest{Peers: []*types.AnnouncementRecord{rec1, rec2}} + body, err := json.Marshal(req) + require.NoError(t, err) + + router := &mockContentRouter{} + server := httptest.NewServer(Handler(router)) + t.Cleanup(server.Close) + + serverAddr := "http://" + server.Listener.Addr().String() + + router.On("ProvidePeer", mock.Anything, rec1).Return(time.Hour, nil) + + router.On("ProvidePeer", mock.Anything, rec2).Return(time.Minute, nil) + + urlStr := serverAddr + "/routing/v1/peers" + + httpReq, err := http.NewRequest(http.MethodPost, urlStr, bytes.NewReader(body)) + require.NoError(t, err) + httpReq.Header.Set("Accept", contentType) + + resp, err := http.DefaultClient.Do(httpReq) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + header := resp.Header.Get("Content-Type") + require.Equal(t, contentType, header) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + + require.Equal(t, expectedBody, string(body)) + } + + t.Run("POST /routing/v1/peers (JSON Response)", func(t *testing.T) { + runPostTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement-response","TTL":3600000},{"Schema":"announcement-response","TTL":60000}]}`) + }) + + t.Run("POST /routing/v1/peers (NDJSON Response)", func(t *testing.T) { + runPostTest(t, mediaTypeNDJSON, `{"Schema":"announcement-response","TTL":3600000}`+"\n"+`{"Schema":"announcement-response","TTL":60000}`+"\n") + }) } func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { @@ -697,7 +844,7 @@ func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limi return a.(iter.ResultIter[types.Record]), args.Error(1) } -func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWriteProvideRequest) (time.Duration, error) { +func (m *mockContentRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { args := m.Called(ctx, req) return args.Get(0).(time.Duration), args.Error(1) } @@ -711,6 +858,11 @@ func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit in return a.(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockContentRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + args := m.Called(ctx, req) + return args.Get(0).(time.Duration), args.Error(1) +} + func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) a := args.Get(0) diff --git a/routing/http/types/ipfs.go b/routing/http/types/ipfs.go index 1e139fa22..2df613e5a 100644 --- a/routing/http/types/ipfs.go +++ b/routing/http/types/ipfs.go @@ -3,28 +3,9 @@ package types import ( "encoding/json" - "github.com/ipfs/boxo/routing/http/internal/drjson" - "github.com/ipfs/go-cid" "github.com/multiformats/go-multiaddr" ) -type CID struct{ cid.Cid } - -func (c *CID) MarshalJSON() ([]byte, error) { return drjson.MarshalJSONBytes(c.String()) } -func (c *CID) UnmarshalJSON(b []byte) error { - var s string - err := json.Unmarshal(b, &s) - if err != nil { - return err - } - decodedCID, err := cid.Decode(s) - if err != nil { - return err - } - c.Cid = decodedCID - return nil -} - type Multiaddr struct{ multiaddr.Multiaddr } func (m *Multiaddr) UnmarshalJSON(b []byte) error { diff --git a/routing/http/types/iter/filter.go b/routing/http/types/iter/filter.go new file mode 100644 index 000000000..86bf3d7d0 --- /dev/null +++ b/routing/http/types/iter/filter.go @@ -0,0 +1,42 @@ +package iter + +// Filter invokes f on each element of iter, filtering the results. +func Filter[T any](iter Iter[T], f func(t T) bool) *FilterIter[T] { + return &FilterIter[T]{iter: iter, f: f} +} + +type FilterIter[T any] struct { + iter Iter[T] + f func(T) bool + + done bool + val T +} + +func (f *FilterIter[T]) Next() bool { + if f.done { + return false + } + + ok := f.iter.Next() + if !ok { + f.done = true + return false + } + + val := f.iter.Val() + if !f.f(val) { + return f.Next() + } + + f.val = val + return true +} + +func (f *FilterIter[T]) Val() T { + return f.val +} + +func (f *FilterIter[T]) Close() error { + return f.iter.Close() +} diff --git a/routing/http/types/iter/filter_test.go b/routing/http/types/iter/filter_test.go new file mode 100644 index 000000000..49d643c80 --- /dev/null +++ b/routing/http/types/iter/filter_test.go @@ -0,0 +1,46 @@ +package iter + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFilter(t *testing.T) { + for _, c := range []struct { + input Iter[int] + f func(int) bool + expResults []int + }{ + { + input: FromSlice([]int{1, 2, 3}), + f: func(i int) bool { return i != 2 }, + expResults: []int{1, 3}, + }, + { + input: FromSlice([]int{}), + f: func(i int) bool { return true }, + expResults: nil, + }, + { + input: FromSlice([]int{-3, -2, 1, -5, 2}), + f: func(i int) bool { return i > 0 }, + expResults: []int{1, 2}, + }, + { + input: FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9}), + f: func(i int) bool { return i%2 == 0 }, + expResults: []int{2, 4, 6, 8}, + }, + } { + t.Run(fmt.Sprintf("%v", c.input), func(t *testing.T) { + iter := Filter(c.input, c.f) + var res []int + for iter.Next() { + res = append(res, iter.Val()) + } + assert.Equal(t, c.expResults, res) + }) + } +} diff --git a/routing/http/types/iter/iter.go b/routing/http/types/iter/iter.go index 2e9801d46..a4fc683d0 100644 --- a/routing/http/types/iter/iter.go +++ b/routing/http/types/iter/iter.go @@ -29,7 +29,7 @@ type Result[T any] struct { } // ToResultIter returns an iterator that wraps each value in a Result. -func ToResultIter[T any](iter Iter[T]) Iter[Result[T]] { +func ToResultIter[T any](iter Iter[T]) ResultIter[T] { return Map(iter, func(t T) Result[T] { return Result[T]{Val: t} }) diff --git a/routing/http/types/json/requests.go b/routing/http/types/json/requests.go index 4b582c3ba..24414ade4 100644 --- a/routing/http/types/json/requests.go +++ b/routing/http/types/json/requests.go @@ -1,51 +1,13 @@ package json -import ( - "encoding/json" +import "github.com/ipfs/boxo/routing/http/types" - "github.com/ipfs/boxo/routing/http/types" -) - -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type WriteProvidersRequest struct { - Providers []types.Record +// AnnounceProvidersRequest is the content of a POST Providers request. +type AnnounceProvidersRequest struct { + Providers []*types.AnnouncementRecord } -func (r *WriteProvidersRequest) UnmarshalJSON(b []byte) error { - type wpr struct{ Providers []json.RawMessage } - var tempWPR wpr - err := json.Unmarshal(b, &tempWPR) - if err != nil { - return err - } - - for _, provBytes := range tempWPR.Providers { - var rawProv types.UnknownRecord - err := json.Unmarshal(provBytes, &rawProv) - if err != nil { - return err - } - - switch rawProv.Schema { - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - var prov types.WriteBitswapRecord - err := json.Unmarshal(rawProv.Bytes, &prov) - if err != nil { - return err - } - r.Providers = append(r.Providers, &prov) - default: - var prov types.UnknownRecord - err := json.Unmarshal(b, &prov) - if err != nil { - return err - } - r.Providers = append(r.Providers, &prov) - } - } - return nil +// AnnouncePeersRequest is the content of a POST Peers request. +type AnnouncePeersRequest struct { + Peers []*types.AnnouncementRecord } diff --git a/routing/http/types/json/responses.go b/routing/http/types/json/responses.go index d8f659ac5..10ab0b0ca 100644 --- a/routing/http/types/json/responses.go +++ b/routing/http/types/json/responses.go @@ -49,10 +49,15 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error { return err } *r = append(*r, &prov) - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - var prov types.BitswapRecord + case types.SchemaAnnouncement: + var prov types.AnnouncementRecord + err := json.Unmarshal(provBytes, &prov) + if err != nil { + return err + } + *r = append(*r, &prov) + case types.SchemaAnnouncementResponse: + var prov types.AnnouncementResponseRecord err := json.Unmarshal(provBytes, &prov) if err != nil { return err @@ -66,45 +71,14 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error { return nil } -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type WriteProvidersResponse struct { - ProvideResults []types.Record +// AnnounceProvidersResponse is the result of a POST Providers request. +type AnnounceProvidersResponse struct { + ProvideResults []*types.AnnouncementResponseRecord } -func (r WriteProvidersResponse) Length() int { +func (r AnnounceProvidersResponse) Length() int { return len(r.ProvideResults) } -func (r *WriteProvidersResponse) UnmarshalJSON(b []byte) error { - var tempWPR struct{ ProvideResults []json.RawMessage } - err := json.Unmarshal(b, &tempWPR) - if err != nil { - return err - } - - for _, provBytes := range tempWPR.ProvideResults { - var rawProv types.UnknownRecord - err := json.Unmarshal(provBytes, &rawProv) - if err != nil { - return err - } - - switch rawProv.Schema { - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - var prov types.WriteBitswapRecordResponse - err := json.Unmarshal(rawProv.Bytes, &prov) - if err != nil { - return err - } - r.ProvideResults = append(r.ProvideResults, &prov) - default: - r.ProvideResults = append(r.ProvideResults, &rawProv) - } - } - - return nil -} +// AnnouncePeersResponse is the result of a POST Peers request. +type AnnouncePeersResponse = AnnounceProvidersResponse diff --git a/routing/http/types/ndjson/records.go b/routing/http/types/ndjson/records.go index 819cd521d..16bcd3ea7 100644 --- a/routing/http/types/ndjson/records.go +++ b/routing/http/types/ndjson/records.go @@ -2,6 +2,7 @@ package ndjson import ( "encoding/json" + "fmt" "io" "github.com/ipfs/boxo/routing/http/types" @@ -9,7 +10,7 @@ import ( ) // NewRecordsIter returns an iterator that reads [types.Record] from the given [io.Reader]. -func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { +func NewRecordsIter(r io.Reader) iter.ResultIter[types.Record] { jsonIter := iter.FromReaderJSON[types.UnknownRecord](r) mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[types.Record] { var result iter.Result[types.Record] @@ -26,10 +27,8 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { return result } result.Val = &prov - //lint:ignore SA1019 // ignore staticcheck - case types.SchemaBitswap: - //lint:ignore SA1019 // ignore staticcheck - var prov types.BitswapRecord + case types.SchemaAnnouncement: + var prov types.AnnouncementRecord err := json.Unmarshal(upr.Val.Bytes, &prov) if err != nil { result.Err = err @@ -42,32 +41,54 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { return result } - return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) + return iter.Map(jsonIter, mapFn) } -// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given [io.Reader]. -// Records with a different schema are safely ignored. If you want to read all records, use +// NewAnnouncementRecordsIter returns an iterator that reads [types.AnnouncementRecord] +// from the given [io.Reader]. Incompatible records result in an error within the iterator. +// To read all records, use [NewRecordsIter] instead. +func NewAnnouncementRecordsIter(r io.Reader) iter.ResultIter[*types.AnnouncementRecord] { + return newTypedRecords[*types.AnnouncementRecord](r, types.SchemaAnnouncement) +} + +// NewAnnouncementResponseRecordsIter returns an iterator that reads +// [types.AnnouncementResponseRecord] from the given [io.Reader]. Incompatible +// records result in an error within the iterator. To read all records, use // [NewRecordsIter] instead. -func NewPeerRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.PeerRecord]] { - jsonIter := iter.FromReaderJSON[types.UnknownRecord](r) - mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[*types.PeerRecord] { - var result iter.Result[*types.PeerRecord] - if upr.Err != nil { - result.Err = upr.Err - return result - } - switch upr.Val.Schema { - case types.SchemaPeer: - var prov types.PeerRecord - err := json.Unmarshal(upr.Val.Bytes, &prov) - if err != nil { - result.Err = err +func NewAnnouncementResponseRecordsIter(r io.Reader) iter.ResultIter[*types.AnnouncementResponseRecord] { + return newTypedRecords[*types.AnnouncementResponseRecord](r, types.SchemaAnnouncementResponse) +} + +// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given +// [io.Reader]. Incompatible records result in an error within the iterator. To read all records, +// use [NewRecordsIter] instead. +func NewPeerRecordsIter(r io.Reader) iter.ResultIter[*types.PeerRecord] { + return newTypedRecords[*types.PeerRecord](r, types.SchemaPeer) +} + +func newTypedRecords[T any](r io.Reader, schema string) iter.ResultIter[T] { + return iter.Map( + NewRecordsIter(r), + func(upr iter.Result[types.Record]) iter.Result[T] { + var result iter.Result[T] + if upr.Err != nil { + result.Err = upr.Err return result } - result.Val = &prov - } - return result - } - return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) + if upr.Val.GetSchema() != schema { + result.Err = fmt.Errorf("unexpected schema %s, expected %s", upr.Val.GetSchema(), schema) + return result + } + + val, ok := upr.Val.(T) + if !ok { + result.Err = fmt.Errorf("type incompatible with schema %s", schema) + return result + } + + result.Val = val + return result + }, + ) } diff --git a/routing/http/types/record_announcement.go b/routing/http/types/record_announcement.go new file mode 100644 index 000000000..e11766037 --- /dev/null +++ b/routing/http/types/record_announcement.go @@ -0,0 +1,376 @@ +package types + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/ipfs/boxo/routing/http/internal/drjson" + "github.com/ipfs/boxo/util" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multibase" +) + +const ( + SchemaAnnouncement = "announcement" + SchemaAnnouncementResponse = "announcement-response" + + announcementSignaturePrefix = "routing-record:" +) + +var _ Record = &AnnouncementRecord{} + +type AnnouncementScope string + +const ( + AnnouncementBlock AnnouncementScope = "block" + AnnouncementEntity AnnouncementScope = "entity" + AnnouncementRecursive AnnouncementScope = "recursive" +) + +// AnnouncementRequest represents a request to announce +type AnnouncementRequest struct { + CID cid.Cid + Scope AnnouncementScope + TTL time.Duration + Metadata []byte +} + +// AnnouncementPayload is the [AnnouncementRecord] payload. +type AnnouncementPayload struct { + CID cid.Cid + Scope AnnouncementScope + Timestamp time.Time + TTL time.Duration + ID *peer.ID + Addrs []Multiaddr + Protocols []string + Metadata string +} + +func (ap AnnouncementPayload) asDagCbor() ([]byte, error) { + m := make(map[string]ipld.Node) + var err error + + if ap.CID.Defined() { + m["CID"] = basicnode.NewString(ap.CID.String()) + } + + if ap.Scope != "" { + m["Scope"] = basicnode.NewString(string(ap.Scope)) + } + + if !ap.Timestamp.IsZero() { + m["Timestamp"] = basicnode.NewString(util.FormatRFC3339(ap.Timestamp)) + } + + if ap.TTL != 0 { + m["TTL"] = basicnode.NewInt(ap.TTL.Milliseconds()) + } + + if ap.ID != nil { + m["ID"] = basicnode.NewString(ap.ID.String()) + } + + if ap.Metadata != "" { + m["Metadata"] = basicnode.NewString(ap.Metadata) + } + + if len(ap.Addrs) != 0 { + addrs := []ipld.Node{} + for _, addr := range ap.Addrs { + addrs = append(addrs, basicnode.NewString(addr.String())) + } + m["Addrs"], err = makeIPLDList(addrs) + if err != nil { + return nil, err + } + } + + if len(ap.Protocols) != 0 { + protocols := []ipld.Node{} + for _, protocol := range ap.Protocols { + protocols = append(protocols, basicnode.NewString(protocol)) + } + m["Protocols"], err = makeIPLDList(protocols) + if err != nil { + return nil, err + } + } + + // Make final IPLD node. + nd, err := makeIPLDMap(m) + if err != nil { + return nil, err + } + + // Encode it with the DAG-JSON encoder, which automatically sorts the fields + // in a deterministic manner. + var b bytes.Buffer + err = dagcbor.Encode(nd, &b) + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +type announcementPayloadHelper struct { + CID string `json:",omitempty"` + ID *peer.ID `json:",omitempty"` + Scope AnnouncementScope `json:",omitempty"` + Timestamp string `json:",omitempty"` + TTL int64 `json:",omitempty"` + Addrs []Multiaddr `json:",omitempty"` + Protocols []string `json:",omitempty"` + Metadata string `json:",omitempty"` +} + +func (ap AnnouncementPayload) MarshalJSON() ([]byte, error) { + v := announcementPayloadHelper{ + ID: ap.ID, + Scope: ap.Scope, + Addrs: ap.Addrs, + Protocols: ap.Protocols, + Metadata: ap.Metadata, + } + + if ap.CID.Defined() { + v.CID = ap.CID.String() + } + + if !ap.Timestamp.IsZero() { + v.Timestamp = util.FormatRFC3339(ap.Timestamp) + } + + if ap.TTL != 0 { + v.TTL = ap.TTL.Milliseconds() + } + + return drjson.MarshalJSONBytes(v) +} + +func (ap *AnnouncementPayload) UnmarshalJSON(b []byte) error { + v := announcementPayloadHelper{} + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + if v.CID != "" { + ap.CID, err = cid.Decode(v.CID) + if err != nil { + return err + } + } + + if v.Scope != "" { + switch v.Scope { + case AnnouncementBlock, AnnouncementEntity, AnnouncementRecursive, "": + ap.Scope = v.Scope + default: + return fmt.Errorf("invalid announcement scope %q", ap.Scope) + } + } + + if v.Timestamp != "" { + ap.Timestamp, err = util.ParseRFC3339(v.Timestamp) + if err != nil { + return err + } + } + + ap.TTL = time.Duration(v.TTL) * time.Millisecond + ap.ID = v.ID + ap.Addrs = v.Addrs + ap.Protocols = v.Protocols + ap.Metadata = v.Metadata + return nil +} + +// AnnouncementRecord is a [Record] of [SchemaAnnouncement]. +type AnnouncementRecord struct { + Schema string + Payload AnnouncementPayload + Signature string `json:",omitempty"` +} + +func (ar *AnnouncementRecord) GetSchema() string { + return ar.Schema +} + +func (ar AnnouncementRecord) IsSigned() bool { + return ar.Signature != "" +} + +func (ar *AnnouncementRecord) Sign(peerID peer.ID, key crypto.PrivKey) error { + if ar.IsSigned() { + return ar.Verify() + } + + if key == nil { + return errors.New("no key provided") + } + + sid, err := peer.IDFromPrivateKey(key) + if err != nil { + return err + } + if sid != peerID { + return errors.New("not the correct signing key") + } + + data, err := ar.Payload.asDagCbor() + if err != nil { + return err + } + + dataForSig := []byte(announcementSignaturePrefix) + dataForSig = append(dataForSig, data...) + + rawSignature, err := key.Sign(dataForSig) + if err != nil { + return err + } + + signature, err := multibase.Encode(multibase.Base64, rawSignature) + if err != nil { + return err + } + + ar.Signature = signature + return nil +} + +func (ar *AnnouncementRecord) Verify() error { + if !ar.IsSigned() { + return errors.New("not signed") + } + + if ar.Payload.ID == nil { + return errors.New("peer ID must be specified") + } + + data, err := ar.Payload.asDagCbor() + if err != nil { + return err + } + + pk, err := ar.Payload.ID.ExtractPublicKey() + if err != nil { + return fmt.Errorf("extracing public key from peer ID: %w", err) + } + + _, sigBytes, err := multibase.Decode(ar.Signature) + if err != nil { + return fmt.Errorf("multibase-decoding signature to verify: %w", err) + } + + dataForSig := []byte(announcementSignaturePrefix) + dataForSig = append(dataForSig, data...) + + ok, err := pk.Verify(dataForSig, sigBytes) + if err != nil { + return fmt.Errorf("verifying hash with signature: %w", err) + } + if !ok { + return errors.New("signature failed to verify") + } + + return nil +} + +func makeIPLDList(ls []ipld.Node) (datamodel.Node, error) { + nd := basicnode.Prototype__List{}.NewBuilder() + la, err := nd.BeginList(int64(len(ls))) + if err != nil { + return nil, err + } + + for _, item := range ls { + if err := la.AssembleValue().AssignNode(item); err != nil { + return nil, err + } + } + + if err := la.Finish(); err != nil { + return nil, err + } + + return nd.Build(), nil +} + +func makeIPLDMap(mp map[string]ipld.Node) (datamodel.Node, error) { + nd := basicnode.Prototype__Map{}.NewBuilder() + ma, err := nd.BeginMap(int64(len(mp))) + if err != nil { + return nil, err + } + + for k, v := range mp { + if err := ma.AssembleKey().AssignString(k); err != nil { + return nil, err + } + if err := ma.AssembleValue().AssignNode(v); err != nil { + return nil, err + } + } + + if err := ma.Finish(); err != nil { + return nil, err + } + + return nd.Build(), nil +} + +var _ Record = &AnnouncementResponseRecord{} + +// AnnouncementRecord is a [Record] of [SchemaAnnouncementResponse]. +type AnnouncementResponseRecord struct { + Schema string + Error string + TTL time.Duration +} + +func (ar *AnnouncementResponseRecord) GetSchema() string { + return ar.Schema +} + +func (ar AnnouncementResponseRecord) MarshalJSON() ([]byte, error) { + v := struct { + Schema string + Error string `json:",omitempty"` + TTL int64 `json:",omitempty"` + }{ + Schema: ar.Schema, + Error: ar.Error, + TTL: ar.TTL.Milliseconds(), + } + + return drjson.MarshalJSONBytes(v) +} + +func (ar *AnnouncementResponseRecord) UnmarshalJSON(b []byte) error { + v := struct { + Schema string + Error string + TTL int64 + }{} + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + + ar.Schema = v.Schema + ar.Error = v.Error + ar.TTL = time.Duration(v.TTL) * time.Millisecond + return nil +} diff --git a/routing/http/types/record_bitswap.go b/routing/http/types/record_bitswap.go deleted file mode 100644 index 0780fc3eb..000000000 --- a/routing/http/types/record_bitswap.go +++ /dev/null @@ -1,186 +0,0 @@ -package types - -import ( - "crypto/sha256" - "encoding/json" - "errors" - "fmt" - - "github.com/ipfs/boxo/routing/http/internal/drjson" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multibase" -) - -// Deprecated: use the more versatile [SchemaPeer] instead. For more information, read [IPIP-417]. -// -// [IPIP-417]: https://github.com/ipfs/specs/pull/417 -const SchemaBitswap = "bitswap" - -var ( - _ Record = &BitswapRecord{} -) - -// Deprecated: use the more versatile [PeerRecord] instead. For more information, read [IPIP-417]. -// -// [IPIP-417]: https://github.com/ipfs/specs/pull/417 -type BitswapRecord struct { - Schema string - Protocol string - ID *peer.ID - Addrs []Multiaddr `json:",omitempty"` -} - -func (br *BitswapRecord) GetSchema() string { - return br.Schema -} - -var _ Record = &WriteBitswapRecord{} - -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type WriteBitswapRecord struct { - Schema string - Protocol string - Signature string - - // this content must be untouched because it is signed and we need to verify it - RawPayload json.RawMessage `json:"Payload"` - Payload BitswapPayload `json:"-"` -} - -type BitswapPayload struct { - Keys []CID - Timestamp *Time - AdvisoryTTL *Duration - ID *peer.ID - Addrs []Multiaddr -} - -func (wr *WriteBitswapRecord) GetSchema() string { - return wr.Schema -} - -type tmpBWPR WriteBitswapRecord - -func (p *WriteBitswapRecord) UnmarshalJSON(b []byte) error { - var bwp tmpBWPR - err := json.Unmarshal(b, &bwp) - if err != nil { - return err - } - - p.Protocol = bwp.Protocol - p.Schema = bwp.Schema - p.Signature = bwp.Signature - p.RawPayload = bwp.RawPayload - - return json.Unmarshal(bwp.RawPayload, &p.Payload) -} - -func (p *WriteBitswapRecord) IsSigned() bool { - return p.Signature != "" -} - -func (p *WriteBitswapRecord) setRawPayload() error { - payloadBytes, err := drjson.MarshalJSONBytes(p.Payload) - if err != nil { - return fmt.Errorf("marshaling bitswap write provider payload: %w", err) - } - - p.RawPayload = payloadBytes - - return nil -} - -func (p *WriteBitswapRecord) Sign(peerID peer.ID, key crypto.PrivKey) error { - if p.IsSigned() { - return errors.New("already signed") - } - - if key == nil { - return errors.New("no key provided") - } - - sid, err := peer.IDFromPrivateKey(key) - if err != nil { - return err - } - if sid != peerID { - return errors.New("not the correct signing key") - } - - err = p.setRawPayload() - if err != nil { - return err - } - hash := sha256.Sum256([]byte(p.RawPayload)) - sig, err := key.Sign(hash[:]) - if err != nil { - return err - } - - sigStr, err := multibase.Encode(multibase.Base64, sig) - if err != nil { - return fmt.Errorf("multibase-encoding signature: %w", err) - } - - p.Signature = sigStr - return nil -} - -func (p *WriteBitswapRecord) Verify() error { - if !p.IsSigned() { - return errors.New("not signed") - } - - if p.Payload.ID == nil { - return errors.New("peer ID must be specified") - } - - // note that we only generate and set the payload if it hasn't already been set - // to allow for passing through the payload untouched if it is already provided - if p.RawPayload == nil { - err := p.setRawPayload() - if err != nil { - return err - } - } - - pk, err := p.Payload.ID.ExtractPublicKey() - if err != nil { - return fmt.Errorf("extracing public key from peer ID: %w", err) - } - - _, sigBytes, err := multibase.Decode(p.Signature) - if err != nil { - return fmt.Errorf("multibase-decoding signature to verify: %w", err) - } - - hash := sha256.Sum256([]byte(p.RawPayload)) - ok, err := pk.Verify(hash[:], sigBytes) - if err != nil { - return fmt.Errorf("verifying hash with signature: %w", err) - } - if !ok { - return errors.New("signature failed to verify") - } - - return nil -} - -var _ Record = &WriteBitswapRecordResponse{} - -// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: -// -// [IPIP-378]: https://github.com/ipfs/specs/pull/378 -type WriteBitswapRecordResponse struct { - Schema string - Protocol string - AdvisoryTTL *Duration -} - -func (r *WriteBitswapRecordResponse) GetSchema() string { - return r.Schema -} diff --git a/routing/http/types/time.go b/routing/http/types/time.go deleted file mode 100644 index 4b08f9bed..000000000 --- a/routing/http/types/time.go +++ /dev/null @@ -1,37 +0,0 @@ -package types - -import ( - "encoding/json" - "time" - - "github.com/ipfs/boxo/routing/http/internal/drjson" -) - -type Time struct{ time.Time } - -func (t *Time) MarshalJSON() ([]byte, error) { - return drjson.MarshalJSONBytes(t.Time.UnixMilli()) -} - -func (t *Time) UnmarshalJSON(b []byte) error { - var timestamp int64 - err := json.Unmarshal(b, ×tamp) - if err != nil { - return err - } - t.Time = time.UnixMilli(timestamp) - return nil -} - -type Duration struct{ time.Duration } - -func (d *Duration) MarshalJSON() ([]byte, error) { return drjson.MarshalJSONBytes(d.Duration) } -func (d *Duration) UnmarshalJSON(b []byte) error { - var dur int64 - err := json.Unmarshal(b, &dur) - if err != nil { - return err - } - d.Duration = time.Duration(dur) - return nil -}