diff --git a/api/api.go b/api/api.go index 41c63e33..9371d3d6 100644 --- a/api/api.go +++ b/api/api.go @@ -344,6 +344,7 @@ func (s *Server) setupRoutes(e *echo.Echo) { // Piece e.GET("/api/preparation/:id/piece", s.toEchoHandler(s.dataprepHandler.ListPiecesHandler)) e.POST("/api/preparation/:id/piece", s.toEchoHandler(s.dataprepHandler.AddPieceHandler)) + e.DELETE("/api/preparation/:id/piece/:piece_cid", s.toEchoHandler(s.dataprepHandler.DeletePieceHandler)) // Wallet e.POST("/api/wallet", s.toEchoHandler(s.walletHandler.ImportHandler)) diff --git a/cmd/app.go b/cmd/app.go index 2d21eab7..8c437b77 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -203,6 +203,7 @@ Upgrading: dataprep.PauseDagGenCmd, dataprep.ListPiecesCmd, dataprep.AddPieceCmd, + dataprep.DeletePieceCmd, dataprep.ExploreCmd, dataprep.AttachWalletCmd, dataprep.ListWalletsCmd, diff --git a/cmd/dataprep/piece.go b/cmd/dataprep/piece.go index 48d59321..d2550640 100644 --- a/cmd/dataprep/piece.go +++ b/cmd/dataprep/piece.go @@ -82,3 +82,38 @@ var AddPieceCmd = &cli.Command{ return nil }, } + +var DeletePieceCmd = &cli.Command{ + Name: "delete-piece", + Usage: "Delete a piece from a preparation", + Category: "Piece Management", + ArgsUsage: " ", + Before: cliutil.CheckNArgs, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "delete-car", + Usage: "Delete the physical CAR file from storage", + Value: true, + }, + &cli.BoolFlag{ + Name: "force", + Usage: "Delete even if deals reference this piece", + }, + }, + Action: func(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer closer.Close() + + return dataprep.Default.DeletePieceHandler( + c.Context, db, + c.Args().Get(0), + c.Args().Get(1), + dataprep.DeletePieceRequest{ + DeleteCar: c.Bool("delete-car"), + Force: c.Bool("force"), + }) + }, +} diff --git a/handler/dataprep/delete_piece.go b/handler/dataprep/delete_piece.go new file mode 100644 index 00000000..dcf1490c --- /dev/null +++ b/handler/dataprep/delete_piece.go @@ -0,0 +1,192 @@ +package dataprep + +import ( + "context" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/handler/handlererror" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/storagesystem" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-log/v2" + "github.com/rclone/rclone/fs" + "gorm.io/gorm" +) + +var logger = log.Logger("dataprep") + +// DeletePieceRequest contains options for deleting a piece from a preparation. +type DeletePieceRequest struct { + DeleteCar bool `json:"deleteCar"` // Delete the physical CAR file from storage (default: true) + Force bool `json:"force"` // Delete even if deals reference this piece +} + +// DeletePieceHandler deletes a piece (CAR) from a preparation. +// +// This function handles deletion of both data pieces and DAG pieces, with appropriate +// cleanup for each type: +// - Data pieces: resets file_ranges.job_id to NULL so ranges can be re-packed +// - DAG pieces: resets directories.exported to false so DAG can be re-generated +// +// For non-inline preparations with physical CAR files stored in output storage, +// the CAR file can optionally be deleted from storage (default behavior). +// +// Parameters: +// - ctx: The context for database transactions and other operations. +// - db: A pointer to the gorm.DB instance representing the database connection. +// - prepName: The ID or name for the desired Preparation record. +// - pieceCIDStr: The piece CID (CommP) identifying the piece to delete. +// - request: Options controlling deletion behavior. +// +// Returns: +// - An error if the piece doesn't exist, belongs to a different preparation, +// has active deals (without Force flag), or storage deletion fails. +func (DefaultHandler) DeletePieceHandler( + ctx context.Context, + db *gorm.DB, + prepName string, + pieceCIDStr string, + request DeletePieceRequest, +) error { + db = db.WithContext(ctx) + + // 1. Parse and validate piece CID + pieceCID, err := cid.Parse(pieceCIDStr) + if err != nil { + return errors.Join(handlererror.ErrInvalidParameter, errors.Wrapf(err, "invalid piece CID %s", pieceCIDStr)) + } + if pieceCID.Type() != cid.FilCommitmentUnsealed { + return errors.Wrap(handlererror.ErrInvalidParameter, "piece CID must be commp") + } + + // 2. Find preparation + var preparation model.Preparation + err = preparation.FindByIDOrName(db, prepName) + if errors.Is(err, gorm.ErrRecordNotFound) { + return errors.Wrapf(handlererror.ErrNotFound, "preparation '%s' does not exist", prepName) + } + if err != nil { + return errors.WithStack(err) + } + + // 3. Find car by piece_cid AND preparation_id + var car model.Car + err = db.Preload("Storage"). + Where("piece_cid = ? AND preparation_id = ?", model.CID(pieceCID), preparation.ID). + First(&car).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return errors.Wrapf(handlererror.ErrNotFound, "piece '%s' not found in preparation '%s'", pieceCIDStr, prepName) + } + if err != nil { + return errors.WithStack(err) + } + + // 4. Check for active deals (block unless Force) + var dealCount int64 + err = db.Model(&model.Deal{}).Where("piece_cid = ?", car.PieceCID).Count(&dealCount).Error + if err != nil { + return errors.WithStack(err) + } + if dealCount > 0 && !request.Force { + return errors.Wrapf(handlererror.ErrInvalidParameter, + "piece has %d deals; use --force to delete anyway", dealCount) + } + + // 5. Handle piece type-specific cleanup + if car.PieceType == model.DataPiece { + // For data pieces: reset file_ranges.job_id to allow re-packing + if car.JobID != nil { + err = db.Model(&model.FileRange{}). + Where("job_id = ?", car.JobID). + Update("job_id", nil).Error + if err != nil { + return errors.Wrap(err, "failed to reset file ranges") + } + logger.Infow("reset file ranges for re-packing", "job_id", *car.JobID) + } + } else if car.PieceType == model.DagPiece { + // For DAG pieces: reset directories.exported to allow re-generation + if car.AttachmentID != nil { + err = db.Model(&model.Directory{}). + Where("attachment_id = ?", car.AttachmentID). + Update("exported", false).Error + if err != nil { + return errors.Wrap(err, "failed to reset directory export flags") + } + logger.Infow("reset directories for DAG re-generation", "attachment_id", *car.AttachmentID) + } + } + + // 6. Delete physical CAR file from storage (if requested and applicable) + if request.DeleteCar && car.StorageID != nil && car.StoragePath != "" { + handler, err := storagesystem.NewRCloneHandler(ctx, *car.Storage) + if err != nil { + return errors.Wrap(err, "failed to connect to storage") + } + + entry, err := handler.Check(ctx, car.StoragePath) + if err != nil { + // File might already be deleted - warn but continue with DB cleanup + logger.Warnw("CAR file not found in storage, continuing with DB cleanup", + "path", car.StoragePath, "storage_id", *car.StorageID, "err", err) + } else { + obj, ok := entry.(fs.Object) + if !ok { + return errors.Errorf("%s is not a file object", car.StoragePath) + } + err = handler.Remove(ctx, obj) + if err != nil { + return errors.Wrapf(err, "failed to delete CAR file %s", car.StoragePath) + } + logger.Infow("deleted CAR file from storage", "path", car.StoragePath, "storage_id", *car.StorageID) + } + } + + // 7. Delete car_blocks and car record in a transaction + err = database.DoRetry(ctx, func() error { + return db.Transaction(func(tx *gorm.DB) error { + // Delete car_blocks first + result := tx.Where("car_id = ?", car.ID).Delete(&model.CarBlock{}) + if result.Error != nil { + return errors.Wrap(result.Error, "failed to delete car blocks") + } + logger.Infow("deleted car blocks", "car_id", car.ID, "count", result.RowsAffected) + + // Delete the car record + err := tx.Delete(&car).Error + if err != nil { + return errors.Wrap(err, "failed to delete car record") + } + return nil + }) + }) + if err != nil { + return errors.WithStack(err) + } + + logger.Infow("successfully deleted piece", + "piece_cid", pieceCIDStr, + "preparation", prepName, + "piece_type", car.PieceType, + "deals_existed", dealCount) + + return nil +} + +// @ID DeletePiece +// @Summary Delete a piece from a preparation +// @Description Deletes a piece (CAR) and its associated records. For data pieces, resets file ranges +// @Description to allow re-packing. For DAG pieces, resets directory export flags for re-generation. +// @Tags Piece +// @Accept json +// @Produce json +// @Param id path string true "Preparation ID or name" +// @Param piece_cid path string true "Piece CID" +// @Param request body DeletePieceRequest true "Delete options" +// @Success 204 "No Content" +// @Failure 400 {object} api.HTTPError +// @Failure 404 {object} api.HTTPError +// @Failure 500 {object} api.HTTPError +// @Router /preparation/{id}/piece/{piece_cid} [delete] +func _() {} diff --git a/handler/dataprep/delete_piece_test.go b/handler/dataprep/delete_piece_test.go new file mode 100644 index 00000000..72d16bbe --- /dev/null +++ b/handler/dataprep/delete_piece_test.go @@ -0,0 +1,342 @@ +package dataprep + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/data-preservation-programs/singularity/handler/handlererror" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/gotidy/ptr" + "github.com/ipfs/boxo/util" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +// Helper to create a valid piece CID for testing +func testPieceCID(seed string) cid.Cid { + return cid.NewCidV1(cid.FilCommitmentUnsealed, util.Hash([]byte(seed))) +} + +func TestDeletePieceHandler_PrepNotFound(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + pieceCID := testPieceCID("test1") + err := Default.DeletePieceHandler(ctx, db, "nonexistent", pieceCID.String(), DeletePieceRequest{}) + require.ErrorIs(t, err, handlererror.ErrNotFound) + require.ErrorContains(t, err, "preparation") + }) +} + +func TestDeletePieceHandler_InvalidPieceCID(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", "invalid-cid", DeletePieceRequest{}) + require.ErrorIs(t, err, handlererror.ErrInvalidParameter) + }) +} + +func TestDeletePieceHandler_PieceNotFound(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("nonexistent") + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{}) + require.ErrorIs(t, err, handlererror.ErrNotFound) + require.ErrorContains(t, err, "piece") + }) +} + +func TestDeletePieceHandler_DealExistsWithoutForce(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("test-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + // Create a wallet first to satisfy FK constraint + wallet := model.Wallet{ID: "f01234", Address: "f01234"} + require.NoError(t, db.Create(&wallet).Error) + + deal := model.Deal{ + PieceCID: model.CID(pieceCID), + Provider: "f05678", + ClientID: "f01234", + } + require.NoError(t, db.Create(&deal).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{Force: false}) + require.ErrorIs(t, err, handlererror.ErrInvalidParameter) + require.ErrorContains(t, err, "deals") + }) +} + +func TestDeletePieceHandler_DealExistsWithForce(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("test-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + // Create a wallet first to satisfy FK constraint + wallet := model.Wallet{ID: "f01234", Address: "f01234"} + require.NoError(t, db.Create(&wallet).Error) + + deal := model.Deal{ + PieceCID: model.CID(pieceCID), + Provider: "f05678", + ClientID: "f01234", + } + require.NoError(t, db.Create(&deal).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{Force: true}) + require.NoError(t, err) + + // Verify car is deleted + var count int64 + db.Model(&model.Car{}).Count(&count) + require.Zero(t, count) + }) +} + +func TestDeletePieceHandler_DataPiece_ResetsFileRanges(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{ + Name: "test-prep", + SourceStorages: []model.Storage{{}}, + } + require.NoError(t, db.Create(&prep).Error) + + job := model.Job{ + AttachmentID: ptr.Of(model.SourceAttachmentID(1)), + State: model.Complete, + } + require.NoError(t, db.Create(&job).Error) + + pieceCID := testPieceCID("data-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + JobID: &job.ID, + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + file := model.File{ + AttachmentID: ptr.Of(model.SourceAttachmentID(1)), + Path: "test.txt", + Size: 100, + } + require.NoError(t, db.Create(&file).Error) + + fileRange := model.FileRange{ + FileID: file.ID, + JobID: &job.ID, + Offset: 0, + Length: 100, + } + require.NoError(t, db.Create(&fileRange).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{DeleteCar: false}) + require.NoError(t, err) + + // Verify file_range.job_id is reset to NULL + var fr model.FileRange + require.NoError(t, db.First(&fr, fileRange.ID).Error) + require.Nil(t, fr.JobID, "file_range.job_id should be reset to NULL") + + // Verify car is deleted + var count int64 + db.Model(&model.Car{}).Count(&count) + require.Zero(t, count) + }) +} + +func TestDeletePieceHandler_DagPiece_ResetsDirectories(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{ + Name: "test-prep", + SourceStorages: []model.Storage{{}}, + } + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("dag-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + AttachmentID: ptr.Of(model.SourceAttachmentID(1)), + PieceType: model.DagPiece, + } + require.NoError(t, db.Create(&car).Error) + + dir := model.Directory{ + AttachmentID: ptr.Of(model.SourceAttachmentID(1)), + Name: "test-dir", + Exported: true, + } + require.NoError(t, db.Create(&dir).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{DeleteCar: false}) + require.NoError(t, err) + + // Verify directory.exported is reset to false + var d model.Directory + require.NoError(t, db.First(&d, dir.ID).Error) + require.False(t, d.Exported, "directory.exported should be reset to false") + + // Verify car is deleted + var count int64 + db.Model(&model.Car{}).Count(&count) + require.Zero(t, count) + }) +} + +func TestDeletePieceHandler_NonInline_DeletesCarFile(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + tmp := t.TempDir() + carFile := filepath.Join(tmp, "test.car") + require.NoError(t, os.WriteFile(carFile, []byte("car data"), 0o644)) + + storage := model.Storage{ + Name: "output", + Type: "local", + Path: tmp, + } + require.NoError(t, db.Create(&storage).Error) + + prep := model.Preparation{ + Name: "test-prep", + OutputStorages: []model.Storage{storage}, + } + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("noninline-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + StorageID: &storage.ID, + StoragePath: "test.car", + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{DeleteCar: true}) + require.NoError(t, err) + + // Verify file is deleted + _, err = os.Stat(carFile) + require.True(t, os.IsNotExist(err), "CAR file should be deleted") + + // Verify car record is deleted + var count int64 + db.Model(&model.Car{}).Count(&count) + require.Zero(t, count) + }) +} + +func TestDeletePieceHandler_Inline_NoStorageDeletion(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("inline-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + StorageID: nil, // Inline - no storage + PieceType: model.DataPiece, + MinPieceSizePadding: 100, // Indicates inline + } + require.NoError(t, db.Create(&car).Error) + + carBlock := model.CarBlock{ + CarID: &car.ID, + } + require.NoError(t, db.Create(&carBlock).Error) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{DeleteCar: true}) + require.NoError(t, err) + + // Verify car and car_blocks are deleted + var carCount, blockCount int64 + db.Model(&model.Car{}).Count(&carCount) + db.Model(&model.CarBlock{}).Count(&blockCount) + require.Zero(t, carCount) + require.Zero(t, blockCount) + }) +} + +func TestDeletePieceHandler_DeletesCarBlocks(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep := model.Preparation{Name: "test-prep"} + require.NoError(t, db.Create(&prep).Error) + + pieceCID := testPieceCID("blocks-piece") + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep.ID, + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + // Create multiple car blocks + for i := 0; i < 5; i++ { + carBlock := model.CarBlock{ + CarID: &car.ID, + CarOffset: int64(i * 1000), + } + require.NoError(t, db.Create(&carBlock).Error) + } + + var blockCount int64 + db.Model(&model.CarBlock{}).Count(&blockCount) + require.Equal(t, int64(5), blockCount) + + err := Default.DeletePieceHandler(ctx, db, "test-prep", pieceCID.String(), DeletePieceRequest{DeleteCar: false}) + require.NoError(t, err) + + // Verify all car_blocks are deleted + db.Model(&model.CarBlock{}).Count(&blockCount) + require.Zero(t, blockCount) + }) +} + +func TestDeletePieceHandler_PieceBelongsToDifferentPrep(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + prep1 := model.Preparation{Name: "prep1"} + prep2 := model.Preparation{Name: "prep2"} + require.NoError(t, db.Create(&prep1).Error) + require.NoError(t, db.Create(&prep2).Error) + + pieceCID := testPieceCID("other-prep-piece") + // Create car belonging to prep1 + car := model.Car{ + PieceCID: model.CID(pieceCID), + PreparationID: &prep1.ID, + PieceType: model.DataPiece, + } + require.NoError(t, db.Create(&car).Error) + + // Try to delete from prep2 - should fail + err := Default.DeletePieceHandler(ctx, db, "prep2", pieceCID.String(), DeletePieceRequest{}) + require.ErrorIs(t, err, handlererror.ErrNotFound) + require.ErrorContains(t, err, "piece") + }) +} diff --git a/handler/dataprep/interface.go b/handler/dataprep/interface.go index d4e2f8dc..87b11671 100644 --- a/handler/dataprep/interface.go +++ b/handler/dataprep/interface.go @@ -43,6 +43,14 @@ type Handler interface { request AddPieceRequest, ) (*model.Car, error) + DeletePieceHandler( + ctx context.Context, + db *gorm.DB, + prepName string, + pieceCID string, + request DeletePieceRequest, + ) error + AddSourceStorageHandler(ctx context.Context, db *gorm.DB, id string, source string) (*model.Preparation, error) ListSchedulesHandler( ctx context.Context, @@ -120,6 +128,11 @@ func (m *MockDataPrep) AddPieceHandler(ctx context.Context, db *gorm.DB, id stri return args.Get(0).(*model.Car), args.Error(1) } +func (m *MockDataPrep) DeletePieceHandler(ctx context.Context, db *gorm.DB, prepName string, pieceCID string, request DeletePieceRequest) error { + args := m.Called(ctx, db, prepName, pieceCID, request) + return args.Error(0) +} + func (m *MockDataPrep) AddSourceStorageHandler(ctx context.Context, db *gorm.DB, id string, source string) (*model.Preparation, error) { args := m.Called(ctx, db, id, source) return args.Get(0).(*model.Preparation), args.Error(1)