fix: add optimistic lock retry for wallet topup and withdraw

This commit is contained in:
zetaloop
2026-04-22 20:29:35 +08:00
parent 17daff03bd
commit 59256897e9
2 changed files with 93 additions and 85 deletions
@@ -7,6 +7,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"juwan-backend/app/wallet/rpc/pb" "juwan-backend/app/wallet/rpc/pb"
@@ -53,9 +54,6 @@ func (l *TopupLogic) Topup(req *types.TopupReq) (resp *types.EmptyResp, err erro
return nil, err return nil, err
} }
currentBalance := decimal.Zero
frozenBalance := decimal.Zero
expectedVersion := int64(0)
if walletResp.GetWallets() == nil { if walletResp.GetWallets() == nil {
zeroAmount := "0" zeroAmount := "0"
_, err = l.svcCtx.WalletRpc.AddWallets(l.ctx, &pb.AddWalletsReq{ _, err = l.svcCtx.WalletRpc.AddWallets(l.ctx, &pb.AddWalletsReq{
@@ -74,54 +72,54 @@ func (l *TopupLogic) Topup(req *types.TopupReq) (resp *types.EmptyResp, err erro
if walletResp.GetWallets() == nil { if walletResp.GetWallets() == nil {
return nil, errors.New("wallet not found") return nil, errors.New("wallet not found")
} }
currentBalance, err = decimal.NewFromString(walletResp.Wallets.Balance) }
if err != nil {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
currentBalance, perr := decimal.NewFromString(walletResp.Wallets.Balance)
if perr != nil {
return nil, errors.New("invalid wallet balance") return nil, errors.New("invalid wallet balance")
} }
frozenBalance, err = decimal.NewFromString(walletResp.Wallets.FrozenBalance) frozenBalance, perr := decimal.NewFromString(walletResp.Wallets.FrozenBalance)
if err != nil { if perr != nil {
return nil, errors.New("invalid wallet frozen balance") return nil, errors.New("invalid wallet frozen balance")
} }
expectedVersion = walletResp.Wallets.Version expectedVersion := walletResp.Wallets.Version
} else {
currentBalance, err = decimal.NewFromString(walletResp.Wallets.Balance) newBalance := currentBalance.Add(amount)
if err != nil { newBalanceStr := newBalance.String()
return nil, errors.New("invalid wallet balance") frozenBalanceStr := frozenBalance.String()
updatedAt := time.Now().Unix()
_, uerr := l.svcCtx.WalletRpc.UpdateWallets(l.ctx, &pb.UpdateWalletsReq{
UserId: userID,
Balance: &newBalanceStr,
FrozenBalance: &frozenBalanceStr,
UpdatedAt: &updatedAt,
Version: &expectedVersion,
})
if uerr == nil {
desc := fmt.Sprintf("topup via %s", req.Method)
_, err = l.svcCtx.WalletRpc.AddWalletTransactions(l.ctx, &pb.AddWalletTransactionsReq{
UserId: userID,
Type: "topup",
Amount: amount.String(),
BalanceAfter: newBalanceStr,
Description: desc,
CreatedAt: updatedAt,
})
if err != nil {
return nil, err
}
return &types.EmptyResp{}, nil
} }
frozenBalance, err = decimal.NewFromString(walletResp.Wallets.FrozenBalance) if !strings.Contains(uerr.Error(), "conflict") {
if err != nil { return nil, uerr
return nil, errors.New("invalid wallet frozen balance") }
walletResp, err = l.svcCtx.WalletRpc.GetWalletsById(l.ctx, &pb.GetWalletsByIdReq{Id: userID})
if err != nil {
return nil, err
} }
expectedVersion = walletResp.Wallets.Version
} }
newBalance := currentBalance.Add(amount) return nil, errors.New("wallet update conflict, please retry")
newBalanceStr := newBalance.String()
frozenBalanceStr := frozenBalance.String()
updatedAt := time.Now().Unix()
_, err = l.svcCtx.WalletRpc.UpdateWallets(l.ctx, &pb.UpdateWalletsReq{
UserId: userID,
Balance: &newBalanceStr,
FrozenBalance: &frozenBalanceStr,
UpdatedAt: &updatedAt,
Version: &expectedVersion,
})
if err != nil {
return nil, err
}
desc := fmt.Sprintf("topup via %s", req.Method)
_, err = l.svcCtx.WalletRpc.AddWalletTransactions(l.ctx, &pb.AddWalletTransactionsReq{
UserId: userID,
Type: "topup",
Amount: amount.String(),
BalanceAfter: newBalanceStr,
Description: desc,
CreatedAt: updatedAt,
})
if err != nil {
return nil, err
}
return &types.EmptyResp{}, nil
} }
@@ -7,6 +7,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"juwan-backend/app/wallet/rpc/pb" "juwan-backend/app/wallet/rpc/pb"
@@ -56,46 +57,55 @@ func (l *WithdrawLogic) Withdraw(req *types.TopupReq) (resp *types.EmptyResp, er
return nil, errors.New("wallet not found") return nil, errors.New("wallet not found")
} }
currentBalance, err := decimal.NewFromString(walletResp.Wallets.Balance) const maxRetries = 3
if err != nil { for i := 0; i < maxRetries; i++ {
return nil, errors.New("invalid wallet balance") currentBalance, perr := decimal.NewFromString(walletResp.Wallets.Balance)
} if perr != nil {
frozenBalance, err := decimal.NewFromString(walletResp.Wallets.FrozenBalance) return nil, errors.New("invalid wallet balance")
if err != nil { }
return nil, errors.New("invalid wallet frozen balance") frozenBalance, perr := decimal.NewFromString(walletResp.Wallets.FrozenBalance)
} if perr != nil {
expectedVersion := walletResp.Wallets.Version return nil, errors.New("invalid wallet frozen balance")
if currentBalance.LessThan(amount) { }
return nil, errors.New("insufficient balance") expectedVersion := walletResp.Wallets.Version
if currentBalance.LessThan(amount) {
return nil, errors.New("insufficient balance")
}
newBalance := currentBalance.Sub(amount)
newBalanceStr := newBalance.String()
frozenBalanceStr := frozenBalance.String()
updatedAt := time.Now().Unix()
_, uerr := l.svcCtx.WalletRpc.UpdateWallets(l.ctx, &pb.UpdateWalletsReq{
UserId: userID,
Balance: &newBalanceStr,
FrozenBalance: &frozenBalanceStr,
UpdatedAt: &updatedAt,
Version: &expectedVersion,
})
if uerr == nil {
desc := fmt.Sprintf("withdraw via %s", req.Method)
_, err = l.svcCtx.WalletRpc.AddWalletTransactions(l.ctx, &pb.AddWalletTransactionsReq{
UserId: userID,
Type: "withdrawal",
Amount: amount.String(),
BalanceAfter: newBalanceStr,
Description: desc,
CreatedAt: updatedAt,
})
if err != nil {
return nil, err
}
return &types.EmptyResp{}, nil
}
if !strings.Contains(uerr.Error(), "conflict") {
return nil, uerr
}
walletResp, err = l.svcCtx.WalletRpc.GetWalletsById(l.ctx, &pb.GetWalletsByIdReq{Id: userID})
if err != nil {
return nil, err
}
} }
newBalance := currentBalance.Sub(amount) return nil, errors.New("wallet update conflict, please retry")
newBalanceStr := newBalance.String()
frozenBalanceStr := frozenBalance.String()
updatedAt := time.Now().Unix()
_, err = l.svcCtx.WalletRpc.UpdateWallets(l.ctx, &pb.UpdateWalletsReq{
UserId: userID,
Balance: &newBalanceStr,
FrozenBalance: &frozenBalanceStr,
UpdatedAt: &updatedAt,
Version: &expectedVersion,
})
if err != nil {
return nil, err
}
desc := fmt.Sprintf("withdraw via %s", req.Method)
_, err = l.svcCtx.WalletRpc.AddWalletTransactions(l.ctx, &pb.AddWalletTransactionsReq{
UserId: userID,
Type: "withdrawal",
Amount: amount.String(),
BalanceAfter: newBalanceStr,
Description: desc,
CreatedAt: updatedAt,
})
if err != nil {
return nil, err
}
return &types.EmptyResp{}, nil
} }