mirror of
https://github.com/aclindsa/moneygo.git
synced 2025-07-12 07:51:08 -04:00
Use SQL transactions for the entirety of every request
This commit is contained in:
@ -22,48 +22,35 @@ func (od *OFXDownload) Read(json_str string) error {
|
||||
return dec.Decode(od)
|
||||
}
|
||||
|
||||
func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, accountid int64) {
|
||||
func ofxImportHelper(tx *Tx, r io.Reader, user *User, accountid int64) ResponseWriterWriter {
|
||||
itl, err := ImportOFX(r)
|
||||
|
||||
if err != nil {
|
||||
//TODO is this necessarily an invalid request (what if it was an error on our end)?
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
if len(itl.Accounts) != 1 {
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Printf("Found %d accounts when importing OFX, expected 1", len(itl.Accounts))
|
||||
return
|
||||
}
|
||||
|
||||
sqltransaction, err := db.Begin()
|
||||
if err != nil {
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
// Return Account with this Id
|
||||
account, err := GetAccountTx(sqltransaction, accountid, user.UserId)
|
||||
account, err := GetAccountTx(tx, accountid, user.UserId)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
importedAccount := itl.Accounts[0]
|
||||
|
||||
if len(account.ExternalAccountId) > 0 &&
|
||||
account.ExternalAccountId != importedAccount.ExternalAccountId {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Printf("OFX import has \"%s\" as ExternalAccountId, but the account being imported to has\"%s\"",
|
||||
importedAccount.ExternalAccountId,
|
||||
account.ExternalAccountId)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
// Find matching existing securities or create new ones for those
|
||||
@ -74,21 +61,17 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
// save off since ImportGetCreateSecurity overwrites SecurityId on
|
||||
// ofxsecurity
|
||||
oldsecurityid := ofxsecurity.SecurityId
|
||||
security, err := ImportGetCreateSecurity(sqltransaction, user.UserId, &ofxsecurity)
|
||||
security, err := ImportGetCreateSecurity(tx, user.UserId, &ofxsecurity)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
securitymap[oldsecurityid] = *security
|
||||
}
|
||||
|
||||
if account.SecurityId != securitymap[importedAccount.SecurityId].SecurityId {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Printf("OFX import account's SecurityId (%d) does not match this account's (%d)", securitymap[importedAccount.SecurityId].SecurityId, account.SecurityId)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
// TODO Ensure all transactions have at least one split in the account
|
||||
@ -99,10 +82,8 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
transaction.UserId = user.UserId
|
||||
|
||||
if !transaction.Valid() {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Unexpected invalid transaction from OFX import")
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
|
||||
// Ensure that either AccountId or SecurityId is set for this split,
|
||||
@ -112,10 +93,8 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
split.Status = Imported
|
||||
if split.AccountId != -1 {
|
||||
if split.AccountId != importedAccount.AccountId {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Imported split's AccountId wasn't -1 but also didn't match the account")
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
split.AccountId = account.AccountId
|
||||
} else if split.SecurityId != -1 {
|
||||
@ -123,12 +102,10 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
// TODO try to auto-match splits to existing accounts based on past transactions that look like this one
|
||||
if split.ImportSplitType == TradingAccount {
|
||||
// Find/make trading account if we're that type of split
|
||||
trading_account, err := GetTradingAccount(sqltransaction, user.UserId, sec.SecurityId)
|
||||
trading_account, err := GetTradingAccount(tx, user.UserId, sec.SecurityId)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Couldn't find split's SecurityId in map during OFX import")
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
split.AccountId = trading_account.AccountId
|
||||
split.SecurityId = -1
|
||||
@ -140,12 +117,10 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
SecurityId: sec.SecurityId,
|
||||
Type: account.Type,
|
||||
}
|
||||
subaccount, err := GetCreateAccountTx(sqltransaction, *subaccount)
|
||||
subaccount, err := GetCreateAccountTx(tx, *subaccount)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
split.AccountId = subaccount.AccountId
|
||||
split.SecurityId = -1
|
||||
@ -153,49 +128,39 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
split.SecurityId = sec.SecurityId
|
||||
}
|
||||
} else {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Couldn't find split's SecurityId in map during OFX import")
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
} else {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Neither Split.AccountId Split.SecurityId was set during OFX import")
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
}
|
||||
|
||||
imbalances, err := transaction.GetImbalancesTx(sqltransaction)
|
||||
imbalances, err := transaction.GetImbalancesTx(tx)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
|
||||
// Fixup any imbalances in transactions
|
||||
var zero big.Rat
|
||||
for imbalanced_security, imbalance := range imbalances {
|
||||
if imbalance.Cmp(&zero) != 0 {
|
||||
imbalanced_account, err := GetImbalanceAccount(sqltransaction, user.UserId, imbalanced_security)
|
||||
imbalanced_account, err := GetImbalanceAccount(tx, user.UserId, imbalanced_security)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
|
||||
// Add new split to fixup imbalance
|
||||
split := new(Split)
|
||||
r := new(big.Rat)
|
||||
r.Neg(&imbalance)
|
||||
security, err := GetSecurityTx(sqltransaction, imbalanced_security, user.UserId)
|
||||
security, err := GetSecurityTx(tx, imbalanced_security, user.UserId)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
split.Amount = r.FloatString(security.Precision)
|
||||
split.SecurityId = -1
|
||||
@ -210,24 +175,20 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
var already_imported bool
|
||||
for _, split := range transaction.Splits {
|
||||
if split.SecurityId != -1 || split.AccountId == -1 {
|
||||
imbalanced_account, err := GetImbalanceAccount(sqltransaction, user.UserId, split.SecurityId)
|
||||
imbalanced_account, err := GetImbalanceAccount(tx, user.UserId, split.SecurityId)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
|
||||
split.AccountId = imbalanced_account.AccountId
|
||||
split.SecurityId = -1
|
||||
}
|
||||
|
||||
exists, err := split.AlreadyImportedTx(sqltransaction)
|
||||
exists, err := split.AlreadyImportedTx(tx)
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print("Error checking if split was already imported:", err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
} else if exists {
|
||||
already_imported = true
|
||||
}
|
||||
@ -239,55 +200,38 @@ func ofxImportHelper(db *DB, r io.Reader, w http.ResponseWriter, user *User, acc
|
||||
}
|
||||
|
||||
for _, transaction := range transactions {
|
||||
err := InsertTransactionTx(sqltransaction, &transaction, user)
|
||||
err := InsertTransactionTx(tx, &transaction, user)
|
||||
if err != nil {
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
}
|
||||
|
||||
err = sqltransaction.Commit()
|
||||
if err != nil {
|
||||
sqltransaction.Rollback()
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
return
|
||||
}
|
||||
|
||||
WriteSuccess(w)
|
||||
return SuccessWriter{}
|
||||
}
|
||||
|
||||
func OFXImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User, accountid int64) {
|
||||
func OFXImportHandler(tx *Tx, r *http.Request, user *User, accountid int64) ResponseWriterWriter {
|
||||
download_json := r.PostFormValue("ofxdownload")
|
||||
if download_json == "" {
|
||||
log.Print("download_json")
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
var ofxdownload OFXDownload
|
||||
err := ofxdownload.Read(download_json)
|
||||
if err != nil {
|
||||
log.Print("ofxdownload.Read")
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
account, err := GetAccount(db, accountid, user.UserId)
|
||||
account, err := GetAccount(tx, accountid, user.UserId)
|
||||
if err != nil {
|
||||
log.Print("GetAccount")
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
ofxver := ofxgo.OfxVersion203
|
||||
if len(account.OFXVersion) != 0 {
|
||||
ofxver, err = ofxgo.NewOfxVersion(account.OFXVersion)
|
||||
if err != nil {
|
||||
log.Print("NewOfxVersion")
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
}
|
||||
|
||||
@ -308,9 +252,8 @@ func OFXImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User
|
||||
|
||||
transactionuid, err := ofxgo.RandomUID()
|
||||
if err != nil {
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
log.Println("Error creating uid for transaction:", err)
|
||||
return
|
||||
return NewError(999 /*Internal Error*/)
|
||||
}
|
||||
|
||||
if account.Type == Investment {
|
||||
@ -343,8 +286,7 @@ func OFXImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User
|
||||
// Import generic bank transactions
|
||||
acctTypeEnum, err := ofxgo.NewAcctType(account.OFXAcctType)
|
||||
if err != nil {
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
statementRequest := ofxgo.StatementRequest{
|
||||
TrnUID: *transactionuid,
|
||||
@ -361,49 +303,46 @@ func OFXImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User
|
||||
response, err := client.RequestNoParse(&query)
|
||||
if err != nil {
|
||||
// TODO this could be an error talking with the OFX server...
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
log.Print(err)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
ofxImportHelper(db, response.Body, w, user, accountid)
|
||||
return ofxImportHelper(tx, response.Body, user, accountid)
|
||||
}
|
||||
|
||||
func OFXFileImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User, accountid int64) {
|
||||
func OFXFileImportHandler(tx *Tx, r *http.Request, user *User, accountid int64) ResponseWriterWriter {
|
||||
multipartReader, err := r.MultipartReader()
|
||||
if err != nil {
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
|
||||
// assume there is only one 'part'
|
||||
part, err := multipartReader.NextPart()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
log.Print("Encountered unexpected EOF")
|
||||
} else {
|
||||
WriteError(w, 999 /*Internal Error*/)
|
||||
return NewError(999 /*Internal Error*/)
|
||||
log.Print(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ofxImportHelper(db, part, w, user, accountid)
|
||||
return ofxImportHelper(tx, part, user, accountid)
|
||||
}
|
||||
|
||||
/*
|
||||
* Assumes the User is a valid, signed-in user, but accountid has not yet been validated
|
||||
*/
|
||||
func AccountImportHandler(db *DB, w http.ResponseWriter, r *http.Request, user *User, accountid int64, importtype string) {
|
||||
func AccountImportHandler(tx *Tx, r *http.Request, user *User, accountid int64, importtype string) ResponseWriterWriter {
|
||||
|
||||
switch importtype {
|
||||
case "ofx":
|
||||
OFXImportHandler(db, w, r, user, accountid)
|
||||
return OFXImportHandler(tx, r, user, accountid)
|
||||
case "ofxfile":
|
||||
OFXFileImportHandler(db, w, r, user, accountid)
|
||||
return OFXFileImportHandler(tx, r, user, accountid)
|
||||
default:
|
||||
WriteError(w, 3 /*Invalid Request*/)
|
||||
return NewError(3 /*Invalid Request*/)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user