Continue reorganization, add asinkd stop cmd, make asinkd socket configurable
This commit is contained in:
164
asink/database.go
Normal file
164
asink/database.go
Normal file
@ -0,0 +1,164 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink"
|
||||
"code.google.com/p/goconf/conf"
|
||||
"database/sql"
|
||||
"errors"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type AsinkDB struct {
|
||||
db *sql.DB
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func GetAndInitDB(config *conf.ConfigFile) (*AsinkDB, error) {
|
||||
dbLocation, err := config.GetString("local", "dblocation")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: database location not specified in config file.")
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+dbLocation+"?cache=shared&mode=rwc")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//make sure the events table is created
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !rows.Next() {
|
||||
//if this is false, it means no rows were returned
|
||||
tx.Exec("CREATE TABLE events (id INTEGER, localid INTEGER PRIMARY KEY ASC, type INTEGER, localstatus INTEGER, path TEXT, hash TEXT, predecessor TEXT, timestamp INTEGER, permissions INTEGER);")
|
||||
// tx.Exec("CREATE INDEX IF NOT EXISTS localididx on events (localid)")
|
||||
tx.Exec("CREATE INDEX IF NOT EXISTS ididx on events (id);")
|
||||
tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);")
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := new(AsinkDB)
|
||||
ret.db = db
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (adb *AsinkDB) DatabaseAddEvent(e *asink.Event) (err error) {
|
||||
adb.lock.Lock()
|
||||
tx, err := adb.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//make sure the transaction gets rolled back on error, and the database gets unlocked
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
adb.lock.Unlock()
|
||||
}()
|
||||
|
||||
result, err := tx.Exec("INSERT INTO events (id, type, localstatus, path, hash, predecessor, timestamp, permissions) VALUES (?,?,?,?,?,?,?,?);", e.Id, e.Type, e.LocalStatus, e.Path, e.Hash, e.Predecessor, e.Timestamp, e.Permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.LocalId = id
|
||||
e.InDB = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (adb *AsinkDB) DatabaseUpdateEvent(e *asink.Event) (err error) {
|
||||
if !e.InDB {
|
||||
return errors.New("Attempting to update an event in the database which hasn't been previously added.")
|
||||
}
|
||||
|
||||
adb.lock.Lock()
|
||||
tx, err := adb.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//make sure the transaction gets rolled back on error, and the database gets unlocked
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
adb.lock.Unlock()
|
||||
}()
|
||||
|
||||
result, err := tx.Exec("UPDATE events SET id=?, type=?, localstatus=?, path=?, hash=?, prececessor=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.LocalStatus, e.Path, e.Hash, e.Predecessor, e.Timestamp, e.Permissions, e.LocalId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rows != 1 {
|
||||
return errors.New("Updated " + strconv.Itoa(int(rows)) + " row(s) when intending to update 1 event row.")
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//returns nil if no such event exists
|
||||
func (adb *AsinkDB) DatabaseLatestEventForPath(path string) (event *asink.Event, err error) {
|
||||
adb.lock.Lock()
|
||||
//make sure the database gets unlocked
|
||||
defer adb.lock.Unlock()
|
||||
|
||||
row := adb.db.QueryRow("SELECT id, localid, type, localstatus, path, hash, predecessor, timestamp, permissions FROM events WHERE path == ? ORDER BY timestamp DESC LIMIT 1;", path)
|
||||
|
||||
event = new(asink.Event)
|
||||
err = row.Scan(&event.Id, &event.LocalId, &event.Type, &event.LocalStatus, &event.Path, &event.Hash, &event.Predecessor, &event.Timestamp, &event.Permissions)
|
||||
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return nil, nil
|
||||
case err != nil:
|
||||
return nil, err
|
||||
default:
|
||||
return event, nil
|
||||
}
|
||||
}
|
||||
|
||||
//returns nil if no such event exists
|
||||
func (adb *AsinkDB) DatabaseLatestRemoteEvent() (event *asink.Event, err error) {
|
||||
adb.lock.Lock()
|
||||
//make sure the database gets unlocked
|
||||
defer adb.lock.Unlock()
|
||||
|
||||
row := adb.db.QueryRow("SELECT id, localid, type, localstatus, path, hash, predecessor, timestamp, permissions FROM events WHERE id > 0 ORDER BY id DESC LIMIT 1;")
|
||||
|
||||
event = new(asink.Event)
|
||||
err = row.Scan(&event.Id, &event.LocalId, &event.Type, &event.LocalStatus, &event.Path, &event.Hash, &event.Predecessor, &event.Timestamp, &event.Permissions)
|
||||
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return nil, nil
|
||||
case err != nil:
|
||||
return nil, err
|
||||
default:
|
||||
return event, nil
|
||||
}
|
||||
}
|
26
asink/hash.go
Normal file
26
asink/hash.go
Normal file
@ -0,0 +1,26 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
func HashFile(filename string) (string, error) {
|
||||
//TODO change to sha512?
|
||||
hashfn := sha256.New()
|
||||
|
||||
infile, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer infile.Close()
|
||||
|
||||
_, err = io.Copy(hashfn, infile)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%x", hashfn.Sum(nil)), nil
|
||||
}
|
288
asink/main.go
Normal file
288
asink/main.go
Normal file
@ -0,0 +1,288 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink"
|
||||
"asink/util"
|
||||
"code.google.com/p/goconf/conf"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
type AsinkGlobals struct {
|
||||
configFileName string
|
||||
syncDir string
|
||||
cacheDir string
|
||||
tmpDir string
|
||||
db *AsinkDB
|
||||
storage Storage
|
||||
server string
|
||||
port int
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
var globals AsinkGlobals
|
||||
|
||||
func init() {
|
||||
const config_usage = "Config File to use"
|
||||
userHomeDir := "~"
|
||||
|
||||
u, err := user.Current()
|
||||
if err == nil {
|
||||
userHomeDir = u.HomeDir
|
||||
}
|
||||
|
||||
flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
|
||||
flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
//make sure config file's permissions are read-write only for the current user
|
||||
if !util.FileExistsAndHasPermissions(globals.configFileName, 384 /*0b110000000*/) {
|
||||
fmt.Println("Error: Either the file at " + globals.configFileName + " doesn't exist, or it doesn't have permissions such that the current user is the only one allowed to read and write.")
|
||||
return
|
||||
}
|
||||
|
||||
config, err := conf.ReadConfigFile(globals.configFileName)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
|
||||
return
|
||||
}
|
||||
|
||||
globals.storage, err = GetStorage(config)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
globals.syncDir, err = config.GetString("local", "syncdir")
|
||||
globals.cacheDir, err = config.GetString("local", "cachedir")
|
||||
globals.tmpDir, err = config.GetString("local", "tmpdir")
|
||||
|
||||
//make sure all the necessary directories exist
|
||||
err = util.EnsureDirExists(globals.syncDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = util.EnsureDirExists(globals.cacheDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = util.EnsureDirExists(globals.tmpDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//TODO check errors on server settings
|
||||
globals.server, err = config.GetString("server", "host")
|
||||
globals.port, err = config.GetInt("server", "port")
|
||||
globals.username, err = config.GetString("server", "username")
|
||||
globals.password, err = config.GetString("server", "password")
|
||||
|
||||
globals.db, err = GetAndInitDB(config)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//spawn goroutine to handle locking file paths
|
||||
go PathLocker(globals.db)
|
||||
|
||||
//spawn goroutines to handle local events
|
||||
localFileUpdates := make(chan *asink.Event)
|
||||
go StartWatching(globals.syncDir, localFileUpdates)
|
||||
|
||||
//spawn goroutines to receive remote events
|
||||
remoteFileUpdates := make(chan *asink.Event)
|
||||
go GetEvents(globals, remoteFileUpdates)
|
||||
go ProcessRemoteEvents(globals, remoteFileUpdates)
|
||||
|
||||
for {
|
||||
event := <-localFileUpdates
|
||||
go ProcessLocalEvent(globals, event)
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
||||
//make the path relative before we save/send it anywhere
|
||||
var err error
|
||||
absolutePath := event.Path
|
||||
event.Path, err = filepath.Rel(globals.syncDir, event.Path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
latestLocal := LockPath(event.Path, true)
|
||||
defer UnlockPath(event)
|
||||
if latestLocal != nil {
|
||||
event.Predecessor = latestLocal.Hash
|
||||
}
|
||||
|
||||
if event.IsUpdate() {
|
||||
//copy to tmp
|
||||
//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
|
||||
tmpfilename, err := util.CopyToTmp(absolutePath, globals.tmpDir)
|
||||
if err != nil {
|
||||
//bail out if the file we are trying to upload already got deleted
|
||||
if util.ErrorFileNotFound(err) {
|
||||
event.LocalStatus |= asink.DISCARDED
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//try to collect the file's permissions
|
||||
fileinfo, err := os.Stat(absolutePath)
|
||||
if err != nil {
|
||||
//bail out if the file we are trying to upload already got deleted
|
||||
if util.ErrorFileNotFound(err) {
|
||||
event.LocalStatus |= asink.DISCARDED
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
} else {
|
||||
event.Permissions = fileinfo.Mode()
|
||||
}
|
||||
|
||||
//get the file's hash
|
||||
hash, err := HashFile(tmpfilename)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
event.Hash = hash
|
||||
|
||||
//If the file didn't actually change, squash this event
|
||||
if latestLocal != nil && event.Hash == latestLocal.Hash {
|
||||
os.Remove(tmpfilename)
|
||||
event.LocalStatus |= asink.DISCARDED
|
||||
return
|
||||
}
|
||||
|
||||
//rename to local cache w/ filename=hash
|
||||
cachedFilename := path.Join(globals.cacheDir, event.Hash)
|
||||
err = os.Rename(tmpfilename, cachedFilename)
|
||||
if err != nil {
|
||||
err := os.Remove(tmpfilename)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//upload file to remote storage
|
||||
err = globals.storage.Put(cachedFilename, event.Hash)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
//if we're trying to delete a file that we thought was already deleted, there's no need to delete it again
|
||||
if latestLocal != nil && latestLocal.IsDelete() {
|
||||
event.LocalStatus |= asink.DISCARDED
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//finally, send it off to the server
|
||||
err = SendEvent(globals, event)
|
||||
if err != nil {
|
||||
panic(err) //TODO handle sensibly
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
||||
latestLocal := LockPath(event.Path, true)
|
||||
defer UnlockPath(event)
|
||||
|
||||
//get the absolute path because we may need it later
|
||||
absolutePath := path.Join(globals.syncDir, event.Path)
|
||||
|
||||
//if we already have this event, or if it is older than our most recent event, bail out
|
||||
if latestLocal != nil {
|
||||
if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
|
||||
event.LocalStatus |= asink.DISCARDED
|
||||
return
|
||||
}
|
||||
|
||||
if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash {
|
||||
fmt.Printf("conflict:\n")
|
||||
fmt.Printf("OLD %+v\n", latestLocal)
|
||||
fmt.Printf("NEW %+v\n", event)
|
||||
//TODO handle conflict?
|
||||
}
|
||||
}
|
||||
|
||||
//Download event
|
||||
if event.IsUpdate() {
|
||||
if latestLocal == nil || event.Hash != latestLocal.Hash {
|
||||
|
||||
outfile, err := ioutil.TempFile(globals.tmpDir, "asink")
|
||||
if err != nil {
|
||||
panic(err) //TODO handle sensibly
|
||||
}
|
||||
tmpfilename := outfile.Name()
|
||||
outfile.Close()
|
||||
err = globals.storage.Get(tmpfilename, event.Hash)
|
||||
if err != nil {
|
||||
panic(err) //TODO handle sensibly
|
||||
}
|
||||
|
||||
//rename to local hashed filename
|
||||
hashedFilename := path.Join(globals.cacheDir, event.Hash)
|
||||
err = os.Rename(tmpfilename, hashedFilename)
|
||||
if err != nil {
|
||||
err := os.Remove(tmpfilename)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//copy hashed file to another tmp, then rename it to the actual file.
|
||||
tmpfilename, err = util.CopyToTmp(hashedFilename, globals.tmpDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//make sure containing directory exists
|
||||
err = util.EnsureDirExists(path.Dir(absolutePath))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = os.Rename(tmpfilename, absolutePath)
|
||||
if err != nil {
|
||||
err2 := os.Remove(tmpfilename)
|
||||
if err2 != nil {
|
||||
panic(err2)
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if latestLocal == nil || event.Permissions != latestLocal.Permissions {
|
||||
err := os.Chmod(absolutePath, event.Permissions)
|
||||
if err != nil && !util.ErrorFileNotFound(err) {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//intentionally ignore errors in case this file has been deleted out from under us
|
||||
os.Remove(absolutePath)
|
||||
//delete the directory previously containing this file if its the last file
|
||||
util.RecursiveRemoveEmptyDirs(path.Dir(absolutePath))
|
||||
}
|
||||
|
||||
//TODO make sure file being overwritten is either unchanged or already copied off and hashed
|
||||
}
|
||||
|
||||
func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
|
||||
for event := range eventChan {
|
||||
ProcessRemoteEvent(globals, event)
|
||||
}
|
||||
}
|
141
asink/net.go
Normal file
141
asink/net.go
Normal file
@ -0,0 +1,141 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const MIN_ERROR_WAIT = 100 // 1/10 of a second
|
||||
const MAX_ERROR_WAIT = 10000 // 10 seconds
|
||||
|
||||
func AuthenticatedRequest(method, url, bodyType string, body io.Reader, username, password string) (*http.Response, error) {
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest(method, url, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if bodyType != "" {
|
||||
req.Header.Set("Content-Type", bodyType)
|
||||
}
|
||||
req.SetBasicAuth(username, password)
|
||||
return client.Do(req)
|
||||
}
|
||||
func AuthenticatedGet(url string, username, password string) (*http.Response, error) {
|
||||
return AuthenticatedRequest("GET", url, "", nil, username, password)
|
||||
}
|
||||
func AuthenticatedPost(url, bodyType string, body io.Reader, username, password string) (*http.Response, error) {
|
||||
return AuthenticatedRequest("POST", url, bodyType, body, username, password)
|
||||
}
|
||||
|
||||
func SendEvent(globals AsinkGlobals, event *asink.Event) error {
|
||||
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
|
||||
|
||||
//construct json payload
|
||||
events := asink.EventList{
|
||||
Events: []*asink.Event{event},
|
||||
}
|
||||
b, err := json.Marshal(events)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//actually make the request
|
||||
resp, err := AuthenticatedPost(url, "application/json", bytes.NewReader(b), globals.username, globals.password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
//check to make sure request succeeded
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var apistatus asink.APIResponse
|
||||
err = json.Unmarshal(body, &apistatus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if apistatus.Status != asink.SUCCESS {
|
||||
return errors.New("API response was not success: " + apistatus.Explanation)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetEvents(globals AsinkGlobals, events chan *asink.Event) {
|
||||
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
|
||||
var successiveErrors uint = 0
|
||||
|
||||
errorWait := func(err error) {
|
||||
fmt.Println(err)
|
||||
var waitMilliseconds time.Duration = MIN_ERROR_WAIT << successiveErrors
|
||||
if waitMilliseconds > MAX_ERROR_WAIT {
|
||||
waitMilliseconds = MAX_ERROR_WAIT
|
||||
}
|
||||
time.Sleep(waitMilliseconds * time.Millisecond)
|
||||
successiveErrors++
|
||||
}
|
||||
|
||||
//query DB for latest remote event version number that we've seen locally
|
||||
latestEvent, err := globals.db.DatabaseLatestRemoteEvent()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for {
|
||||
//query for events after latest_event
|
||||
var fullUrl string
|
||||
if latestEvent != nil {
|
||||
fullUrl = url + strconv.FormatInt(latestEvent.Id+1, 10)
|
||||
} else {
|
||||
fullUrl = url + "0"
|
||||
}
|
||||
resp, err := AuthenticatedGet(fullUrl, globals.username, globals.password)
|
||||
|
||||
//if error, perform exponential backoff (with maximum timeout)
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
errorWait(err)
|
||||
continue
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close() //must be done after the last time resp is used
|
||||
if err != nil {
|
||||
errorWait(err)
|
||||
continue
|
||||
}
|
||||
|
||||
var apistatus asink.APIResponse
|
||||
err = json.Unmarshal(body, &apistatus)
|
||||
if err != nil {
|
||||
errorWait(err)
|
||||
continue
|
||||
}
|
||||
if apistatus.Status != asink.SUCCESS {
|
||||
errorWait(err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, event := range apistatus.Events {
|
||||
if latestEvent != nil && event.Id != latestEvent.Id+1 {
|
||||
break
|
||||
}
|
||||
events <- event
|
||||
latestEvent = event
|
||||
}
|
||||
successiveErrors = 0
|
||||
}
|
||||
}
|
96
asink/path_map.go
Normal file
96
asink/path_map.go
Normal file
@ -0,0 +1,96 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink"
|
||||
)
|
||||
|
||||
type pathMapRequest struct {
|
||||
path string
|
||||
local bool /*is this event local (true) or remote(false)?*/
|
||||
responseChan chan *asink.Event
|
||||
}
|
||||
|
||||
type pathMapValue struct {
|
||||
latestEvent *asink.Event
|
||||
locked bool
|
||||
localWaiters []chan *asink.Event
|
||||
remoteWaiters []chan *asink.Event
|
||||
}
|
||||
|
||||
var pathLockerChan = make(chan *pathMapRequest)
|
||||
var pathUnlockerChan = make(chan *asink.Event)
|
||||
|
||||
func PathLocker(db *AsinkDB) {
|
||||
var event *asink.Event
|
||||
var request *pathMapRequest
|
||||
var v *pathMapValue
|
||||
var ok bool
|
||||
var c chan *asink.Event
|
||||
m := make(map[string]*pathMapValue)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event = <-pathUnlockerChan:
|
||||
if v, ok = m[event.Path]; ok != false {
|
||||
//only update status in data structures if the event hasn't been discarded
|
||||
if event.LocalStatus&asink.DISCARDED == 0 {
|
||||
if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) {
|
||||
err := db.DatabaseAddEvent(event)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
//TODO batch database writes instead of doing one at a time
|
||||
}
|
||||
v.latestEvent = event
|
||||
}
|
||||
if len(v.localWaiters) > 0 {
|
||||
c = v.localWaiters[0]
|
||||
v.localWaiters = v.localWaiters[1:]
|
||||
c <- v.latestEvent
|
||||
} else if len(v.remoteWaiters) > 0 {
|
||||
c = v.remoteWaiters[0]
|
||||
v.remoteWaiters = v.remoteWaiters[1:]
|
||||
c <- v.latestEvent
|
||||
} else {
|
||||
v.locked = false
|
||||
}
|
||||
}
|
||||
case request = <-pathLockerChan:
|
||||
v, ok = m[request.path]
|
||||
//allocate pathMapValue object if it doesn't exist
|
||||
if !ok {
|
||||
v = new(pathMapValue)
|
||||
m[request.path] = v
|
||||
}
|
||||
if v.locked {
|
||||
if request.local {
|
||||
v.localWaiters = append(v.localWaiters, request.responseChan)
|
||||
} else {
|
||||
v.remoteWaiters = append(v.remoteWaiters, request.responseChan)
|
||||
}
|
||||
} else {
|
||||
v.locked = true
|
||||
event, err := db.DatabaseLatestEventForPath(request.path)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
request.responseChan <- event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//locks the path to ensure nothing else inside asink is mucking with that file.
|
||||
//'local' determines the precedence of the lock - all local lock requesters will
|
||||
//be served before any remote requesters.
|
||||
//The previous event for this path is returned, nil is returned if no previous event exists
|
||||
func LockPath(path string, local bool) (currentEvent *asink.Event) {
|
||||
c := make(chan *asink.Event)
|
||||
pathLockerChan <- &pathMapRequest{path, local, c}
|
||||
return <-c
|
||||
}
|
||||
|
||||
//unlocks the path, storing the updated event back to the database
|
||||
func UnlockPath(event *asink.Event) {
|
||||
pathUnlockerChan <- event
|
||||
}
|
37
asink/storage.go
Normal file
37
asink/storage.go
Normal file
@ -0,0 +1,37 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"code.google.com/p/goconf/conf"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
Put(filename string, hash string) error
|
||||
Get(filename string, hash string) error
|
||||
}
|
||||
|
||||
func GetStorage(config *conf.ConfigFile) (Storage, error) {
|
||||
storageMethod, err := config.GetString("storage", "method")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: storage method not specified in config file.")
|
||||
}
|
||||
|
||||
var storage Storage
|
||||
|
||||
switch storageMethod {
|
||||
case "local":
|
||||
storage, err = NewLocalStorage(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case "ftp":
|
||||
storage, err = NewFTPStorage(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
return nil, errors.New("Error: storage method '" + storageMethod + "' not found.")
|
||||
}
|
||||
|
||||
return storage, nil
|
||||
}
|
121
asink/storage_ftp.go
Normal file
121
asink/storage_ftp.go
Normal file
@ -0,0 +1,121 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"code.google.com/p/goconf/conf"
|
||||
"errors"
|
||||
"github.com/jlaffaye/goftp"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const FTP_MAX_CONNECTIONS = 10 //should this be configurable?
|
||||
|
||||
type FTPStorage struct {
|
||||
connectionsChan chan int
|
||||
server string
|
||||
port int
|
||||
directory string
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
func NewFTPStorage(config *conf.ConfigFile) (*FTPStorage, error) {
|
||||
server, err := config.GetString("storage", "server")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: FTPStorage indicated in config file, but 'server' not specified.")
|
||||
}
|
||||
port, err := config.GetInt("storage", "port")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: FTPStorage indicated in config file, but 'port' not specified.")
|
||||
}
|
||||
directory, err := config.GetString("storage", "directory")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: FTPStorage indicated in config file, but 'directory' not specified.")
|
||||
}
|
||||
username, err := config.GetString("storage", "username")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: FTPStorage indicated in config file, but 'username' not specified.")
|
||||
}
|
||||
password, err := config.GetString("storage", "password")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: FTPStorage indicated in config file, but 'password' not specified.")
|
||||
}
|
||||
|
||||
fs := new(FTPStorage)
|
||||
fs.server = server
|
||||
fs.port = port
|
||||
fs.directory = directory
|
||||
fs.username = username
|
||||
fs.password = password
|
||||
|
||||
fs.connectionsChan = make(chan int, FTP_MAX_CONNECTIONS)
|
||||
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
func (fs *FTPStorage) Put(filename string, hash string) (e error) {
|
||||
//make sure we don't flood the FTP server
|
||||
fs.connectionsChan <- 0
|
||||
defer func() { <-fs.connectionsChan }()
|
||||
|
||||
infile, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer infile.Close()
|
||||
|
||||
connection, err := ftp.Connect(fs.server + ":" + strconv.Itoa(fs.port))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer connection.Quit()
|
||||
|
||||
err = connection.Login(fs.username, fs.password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = connection.ChangeDir(fs.directory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return connection.Stor(hash, infile)
|
||||
}
|
||||
|
||||
func (fs *FTPStorage) Get(filename string, hash string) error {
|
||||
fs.connectionsChan <- 0
|
||||
defer func() { <-fs.connectionsChan }()
|
||||
|
||||
connection, err := ftp.Connect(fs.server + ":" + strconv.Itoa(fs.port))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer connection.Quit()
|
||||
|
||||
err = connection.Login(fs.username, fs.password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = connection.ChangeDir(fs.directory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
downloadedFile, err := connection.Retr(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer downloadedFile.Close()
|
||||
|
||||
outfile, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outfile.Close()
|
||||
|
||||
_, err = io.Copy(outfile, downloadedFile)
|
||||
return err
|
||||
}
|
73
asink/storage_local.go
Normal file
73
asink/storage_local.go
Normal file
@ -0,0 +1,73 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink/util"
|
||||
"code.google.com/p/goconf/conf"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
type LocalStorage struct {
|
||||
storageDir string
|
||||
tmpSubdir string
|
||||
}
|
||||
|
||||
func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) {
|
||||
storageDir, err := config.GetString("storage", "dir")
|
||||
if err != nil {
|
||||
return nil, errors.New("Error: LocalStorage indicated in config file, but lacking local storage directory ('dir = some/dir').")
|
||||
}
|
||||
|
||||
ls := new(LocalStorage)
|
||||
ls.storageDir = storageDir
|
||||
ls.tmpSubdir = path.Join(storageDir, ".asink-tmpdir")
|
||||
|
||||
//make sure the base directory and tmp subdir exist
|
||||
err = util.EnsureDirExists(ls.storageDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = util.EnsureDirExists(ls.tmpSubdir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ls, nil
|
||||
}
|
||||
|
||||
func (ls *LocalStorage) Put(filename string, hash string) (e error) {
|
||||
tmpfile, err := util.CopyToTmp(filename, ls.tmpSubdir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Rename(tmpfile, path.Join(ls.storageDir, hash))
|
||||
if err != nil {
|
||||
err := os.Remove(tmpfile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *LocalStorage) Get(filename string, hash string) error {
|
||||
infile, err := os.Open(path.Join(ls.storageDir, hash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer infile.Close()
|
||||
|
||||
outfile, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outfile.Close()
|
||||
|
||||
_, err = io.Copy(outfile, infile)
|
||||
|
||||
return err
|
||||
}
|
70
asink/watcher.go
Normal file
70
asink/watcher.go
Normal file
@ -0,0 +1,70 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"asink"
|
||||
"github.com/howeyc/fsnotify"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
func StartWatching(watchDir string, fileUpdates chan *asink.Event) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
panic("Failed to create fsnotify watcher")
|
||||
}
|
||||
|
||||
//function called by filepath.Walk to start watching a directory and all subdirectories
|
||||
watchDirFn := func(path string, info os.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
err = watcher.Watch(path)
|
||||
if err != nil {
|
||||
panic("Failed to watch " + path)
|
||||
}
|
||||
} else if info.Mode().IsRegular() {
|
||||
event := new(asink.Event)
|
||||
event.Path = path
|
||||
event.Type = asink.UPDATE
|
||||
event.Timestamp = info.ModTime().UnixNano()
|
||||
fileUpdates <- event
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//processes all the fsnotify events into asink events
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case ev := <-watcher.Event:
|
||||
//if a directory was created, begin recursively watching all its subdirectories
|
||||
if fi, err := os.Stat(ev.Name); err == nil && fi.IsDir() {
|
||||
if ev.IsCreate() {
|
||||
filepath.Walk(ev.Name, watchDirFn)
|
||||
//TODO do a scan of this directory so we ensure any file events we missed before starting to watch this directory are caught
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
event := new(asink.Event)
|
||||
if ev.IsCreate() || ev.IsModify() {
|
||||
event.Type = asink.UPDATE
|
||||
} else if ev.IsDelete() || ev.IsRename() {
|
||||
event.Type = asink.DELETE
|
||||
} else {
|
||||
panic("Unknown fsnotify event type")
|
||||
}
|
||||
|
||||
event.Path = ev.Name
|
||||
event.Timestamp = time.Now().UnixNano()
|
||||
|
||||
fileUpdates <- event
|
||||
|
||||
case err := <-watcher.Error:
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
//start watching the directory passed in
|
||||
filepath.Walk(watchDir, watchDirFn)
|
||||
}
|
Reference in New Issue
Block a user