-
Notifications
You must be signed in to change notification settings - Fork 2
Refactor transaction timestamp management for cross shard data #313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
dc280eb
fda8142
35f7783
b7e219a
daa1385
bf234fd
e2a20b0
ff9b016
e25b944
e480042
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
|
|
||
| "github.com/bootjp/elastickv/kv" | ||
|
|
@@ -29,13 +30,19 @@ var _ pb.InternalServer = (*Internal)(nil) | |
|
|
||
| var ErrNotLeader = errors.New("not leader") | ||
| var ErrLeaderNotFound = errors.New("leader not found") | ||
| var ErrTxnTimestampOverflow = errors.New("txn timestamp overflow") | ||
|
|
||
| func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error) { | ||
| if i.raft.State() != raft.Leader { | ||
| return nil, errors.WithStack(ErrNotLeader) | ||
| } | ||
|
|
||
| i.stampTimestamps(req) | ||
| if err := i.stampTimestamps(req); err != nil { | ||
| return &pb.ForwardResponse{ | ||
| Success: false, | ||
| CommitIndex: 0, | ||
| }, errors.WithStack(err) | ||
| } | ||
|
|
||
| r, err := i.transactionManager.Commit(req.Requests) | ||
| if err != nil { | ||
|
|
@@ -51,35 +58,125 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa | |
| }, nil | ||
| } | ||
|
|
||
| func (i *Internal) stampTimestamps(req *pb.ForwardRequest) { | ||
| func (i *Internal) stampTimestamps(req *pb.ForwardRequest) error { | ||
| if req == nil { | ||
| return | ||
| return nil | ||
| } | ||
| if req.IsTxn { | ||
| var startTs uint64 | ||
| // All requests in a transaction must have the same timestamp. | ||
| // Find a timestamp from the requests, or generate a new one if none exist. | ||
| for _, r := range req.Requests { | ||
| if r.Ts != 0 { | ||
| startTs = r.Ts | ||
| break | ||
| } | ||
| return i.stampTxnTimestamps(req.Requests) | ||
| } | ||
|
|
||
| i.stampRawTimestamps(req.Requests) | ||
| return nil | ||
| } | ||
|
|
||
| func (i *Internal) stampRawTimestamps(reqs []*pb.Request) { | ||
| for _, r := range reqs { | ||
| if r == nil { | ||
| continue | ||
| } | ||
| if r.Ts != 0 { | ||
| continue | ||
| } | ||
| if i.clock == nil { | ||
| r.Ts = 1 | ||
| continue | ||
| } | ||
| r.Ts = i.clock.Next() | ||
| } | ||
| } | ||
|
|
||
| if startTs == 0 && len(req.Requests) > 0 { | ||
| startTs = i.clock.Next() | ||
| func (i *Internal) stampTxnTimestamps(reqs []*pb.Request) error { | ||
| startTS := forwardedTxnStartTS(reqs) | ||
| if startTS == 0 { | ||
| if i.clock == nil { | ||
| startTS = 1 | ||
| } else { | ||
| startTS = i.clock.Next() | ||
| } | ||
| } | ||
| if startTS == ^uint64(0) { | ||
| return errors.WithStack(ErrTxnTimestampOverflow) | ||
| } | ||
|
|
||
| // Assign the unified timestamp to all requests in the transaction. | ||
| for _, r := range reqs { | ||
| if r != nil { | ||
| r.Ts = startTS | ||
| } | ||
| } | ||
|
|
||
| return i.fillForwardedTxnCommitTS(reqs, startTS) | ||
| } | ||
|
|
||
| // Assign the unified timestamp to all requests in the transaction. | ||
| for _, r := range req.Requests { | ||
| r.Ts = startTs | ||
| func forwardedTxnStartTS(reqs []*pb.Request) uint64 { | ||
| for _, r := range reqs { | ||
| if r != nil && r.Ts != 0 { | ||
| return r.Ts | ||
| } | ||
| return | ||
| } | ||
| return 0 | ||
| } | ||
|
|
||
| func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) { | ||
| if r == nil { | ||
| return nil, false | ||
| } | ||
| if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT { | ||
| return nil, false | ||
| } | ||
| if len(r.Mutations) == 0 || r.Mutations[0] == nil { | ||
| return nil, false | ||
| } | ||
| if !bytes.HasPrefix(r.Mutations[0].Key, metaPrefix) { | ||
| return nil, false | ||
| } | ||
| return r.Mutations[0], true | ||
| } | ||
|
|
||
| func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) error { | ||
| type metaToUpdate struct { | ||
| m *pb.Mutation | ||
| meta kv.TxnMeta | ||
| } | ||
|
|
||
| for _, r := range req.Requests { | ||
| if r.Ts == 0 { | ||
| r.Ts = i.clock.Next() | ||
| metaMutations := make([]metaToUpdate, 0, len(reqs)) | ||
| prefix := []byte(kv.TxnMetaPrefix) | ||
| for _, r := range reqs { | ||
| m, ok := forwardedTxnMetaMutation(r, prefix) | ||
| if !ok { | ||
| continue | ||
| } | ||
| meta, err := kv.DecodeTxnMeta(m.Value) | ||
| if err != nil { | ||
| continue | ||
| } | ||
| if meta.CommitTS != 0 { | ||
| continue | ||
| } | ||
| metaMutations = append(metaMutations, metaToUpdate{m: m, meta: meta}) | ||
| } | ||
| if len(metaMutations) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| commitTS := startTS + 1 | ||
| if commitTS == 0 { | ||
| // Overflow: can't choose a commit timestamp strictly greater than startTS. | ||
| return errors.WithStack(ErrTxnTimestampOverflow) | ||
| } | ||
| if i.clock != nil { | ||
| i.clock.Observe(startTS) | ||
| commitTS = i.clock.Next() | ||
| } | ||
| if commitTS <= startTS { | ||
| // Defensive: avoid writing an invalid CommitTS. | ||
| return errors.WithStack(ErrTxnTimestampOverflow) | ||
| } | ||
|
Comment on lines
163
to
175
|
||
|
|
||
| for _, item := range metaMutations { | ||
| item.meta.CommitTS = commitTS | ||
| item.m.Value = kv.EncodeTxnMeta(item.meta) | ||
| } | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/bootjp/elastickv/kv" | ||
| pb "github.com/bootjp/elastickv/proto" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestStampTxnTimestamps_RejectsMaxStartTS(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| i := &Internal{} | ||
| reqs := []*pb.Request{ | ||
| { | ||
| IsTxn: true, | ||
| Phase: pb.Phase_COMMIT, | ||
| Ts: ^uint64(0), | ||
| Mutations: []*pb.Mutation{ | ||
| { | ||
| Op: pb.Op_PUT, | ||
| Key: []byte(kv.TxnMetaPrefix), | ||
| Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}), | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| err := i.stampTxnTimestamps(reqs) | ||
| require.ErrorIs(t, err, ErrTxnTimestampOverflow) | ||
| } | ||
|
|
||
| func TestFillForwardedTxnCommitTS_RejectsOverflow(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| i := &Internal{} | ||
| reqs := []*pb.Request{ | ||
| { | ||
| IsTxn: true, | ||
| Phase: pb.Phase_COMMIT, | ||
| Mutations: []*pb.Mutation{ | ||
| { | ||
| Op: pb.Op_PUT, | ||
| Key: []byte(kv.TxnMetaPrefix), | ||
| Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}), | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| err := i.fillForwardedTxnCommitTS(reqs, ^uint64(0)) | ||
| require.ErrorIs(t, err, ErrTxnTimestampOverflow) | ||
| } | ||
|
|
||
| func TestFillForwardedTxnCommitTS_AssignsCommitTS(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| i := &Internal{} | ||
| startTS := uint64(10) | ||
| reqs := []*pb.Request{ | ||
| { | ||
| IsTxn: true, | ||
| Phase: pb.Phase_COMMIT, | ||
| Mutations: []*pb.Mutation{ | ||
| { | ||
| Op: pb.Op_PUT, | ||
| Key: []byte(kv.TxnMetaPrefix), | ||
| Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}), | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| require.NoError(t, i.fillForwardedTxnCommitTS(reqs, startTS)) | ||
|
|
||
| meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value) | ||
| require.NoError(t, err) | ||
| require.Equal(t, startTS+1, meta.CommitTS) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.