From d883e3d92dd10b45b6fffe45fe1eb7bdd171ef2b Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Tue, 3 Sep 2013 23:33:36 -0400 Subject: [PATCH] Generalize RPC client code, add basic RPC functionality to client --- asink/client.go | 338 ++++++++++++++++++++++++++ asink/main.go | 299 +++-------------------- asink/rpc_server.go | 34 +++ asinkd/server.go | 2 +- asinkd/user_admin.go | 7 +- asinkd/rpc_client.go => rpc_client.go | 2 +- 6 files changed, 408 insertions(+), 274 deletions(-) create mode 100644 asink/client.go create mode 100644 asink/rpc_server.go rename asinkd/rpc_client.go => rpc_client.go (97%) diff --git a/asink/client.go b/asink/client.go new file mode 100644 index 0000000..82572c0 --- /dev/null +++ b/asink/client.go @@ -0,0 +1,338 @@ +package main + +import ( + "asink" + "asink/util" + "code.google.com/p/goconf/conf" + "flag" + "fmt" + "io/ioutil" + "os" + "os/user" + "path" + "path/filepath" +) + +type AsinkGlobals struct { + configFileName string + syncDir string + cacheDir string + tmpDir string + rpcSock string + db *AsinkDB + storage Storage + server string + port int + username string + password string +} + +var globals AsinkGlobals + +var flags *flag.FlagSet + +func init() { + asink.SetupCleanExitOnSignals() +} + +func StartClient(args []string) { + const config_usage = "Config File to use" + userHomeDir := "~" + + u, err := user.Current() + if err == nil { + userHomeDir = u.HomeDir + } + + flags := flag.NewFlagSet("start", flag.ExitOnError) + flags.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) + flags.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") + flags.Parse(args) + + //make sure config file's permissions are read-write only for the current user + if !util.FileExistsAndHasPermissions(globals.configFileName, 384 /*0b110000000*/) { + fmt.Println("Error: Either the file at " + globals.configFileName + " doesn't exist, or it doesn't have permissions such that the current user is the only one allowed to read and write.") + return + } + + config, err := conf.ReadConfigFile(globals.configFileName) + if err != nil { + fmt.Println(err) + fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?") + return + } + + globals.storage, err = GetStorage(config) + if err != nil { + fmt.Println(err) + return + } + + globals.syncDir, err = config.GetString("local", "syncdir") + globals.cacheDir, err = config.GetString("local", "cachedir") + globals.tmpDir, err = config.GetString("local", "tmpdir") + globals.rpcSock, err = config.GetString("local", "socket") //TODO make sure this exists + + //make sure all the necessary directories exist + err = util.EnsureDirExists(globals.syncDir) + if err != nil { + panic(err) + } + err = util.EnsureDirExists(globals.cacheDir) + if err != nil { + panic(err) + } + err = util.EnsureDirExists(globals.tmpDir) + if err != nil { + panic(err) + } + + //TODO check errors on server settings + globals.server, err = config.GetString("server", "host") + globals.port, err = config.GetInt("server", "port") + globals.username, err = config.GetString("server", "username") + globals.password, err = config.GetString("server", "password") + + globals.db, err = GetAndInitDB(config) + if err != nil { + panic(err) + } + + //spawn goroutine to handle locking file paths + go PathLocker(globals.db) + + //spawn goroutines to handle local events + localFileUpdates := make(chan *asink.Event) + go StartWatching(globals.syncDir, localFileUpdates) + + //spawn goroutines to receive remote events + remoteFileUpdates := make(chan *asink.Event) + go GetEvents(globals, remoteFileUpdates) + go ProcessLocalEvents(globals, localFileUpdates) + //TODO ensure remote updates wait until all local changes are saved off? + go ProcessRemoteEvents(globals, remoteFileUpdates) + + rpcTornDown := make(chan int) + go StartRPC(globals.rpcSock, rpcTornDown) + + asink.WaitOnExit() + <-rpcTornDown +} + +func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { + //make the path relative before we save/send it anywhere + var err error + absolutePath := event.Path + event.Path, err = filepath.Rel(globals.syncDir, event.Path) + if err != nil { + panic(err) + } + + latestLocal := LockPath(event.Path, true) + defer UnlockPath(event) + if latestLocal != nil { + event.Predecessor = latestLocal.Hash + } + + if event.IsUpdate() { + //copy to tmp + //TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off + tmpfilename, err := util.CopyToTmp(absolutePath, globals.tmpDir) + if err != nil { + //bail out if the file we are trying to upload already got deleted + if util.ErrorFileNotFound(err) { + event.LocalStatus |= asink.DISCARDED + return + } + panic(err) + } + + //try to collect the file's permissions + fileinfo, err := os.Stat(absolutePath) + if err != nil { + //bail out if the file we are trying to upload already got deleted + if util.ErrorFileNotFound(err) { + event.LocalStatus |= asink.DISCARDED + return + } + panic(err) + } else { + event.Permissions = fileinfo.Mode() + } + + //get the file's hash + hash, err := HashFile(tmpfilename) + if err != nil { + panic(err) + } + event.Hash = hash + + //If the file didn't actually change, squash this event + if latestLocal != nil && event.Hash == latestLocal.Hash { + os.Remove(tmpfilename) + event.LocalStatus |= asink.DISCARDED + return + } + + //rename to local cache w/ filename=hash + cachedFilename := path.Join(globals.cacheDir, event.Hash) + err = os.Rename(tmpfilename, cachedFilename) + if err != nil { + err := os.Remove(tmpfilename) + if err != nil { + panic(err) + } + panic(err) + } + + //upload file to remote storage + err = globals.storage.Put(cachedFilename, event.Hash) + if err != nil { + panic(err) + } + } else { + //if we're trying to delete a file that we thought was already deleted, there's no need to delete it again + if latestLocal != nil && latestLocal.IsDelete() { + event.LocalStatus |= asink.DISCARDED + return + } + } + + //finally, send it off to the server + err = SendEvent(globals, event) + if err != nil { + panic(err) //TODO handle sensibly + } +} + +func ProcessLocalEvents(globals AsinkGlobals, eventChan chan *asink.Event) { + for { + event := <-eventChan + go ProcessLocalEvent(globals, event) + } +} + +func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { + latestLocal := LockPath(event.Path, true) + defer UnlockPath(event) + + //get the absolute path because we may need it later + absolutePath := path.Join(globals.syncDir, event.Path) + + //if we already have this event, or if it is older than our most recent event, bail out + if latestLocal != nil { + if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) { + event.LocalStatus |= asink.DISCARDED + return + } + + if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash { + fmt.Printf("conflict:\n") + fmt.Printf("OLD %+v\n", latestLocal) + fmt.Printf("NEW %+v\n", event) + //TODO handle conflict? + } + } + + //Download event + if event.IsUpdate() { + if latestLocal == nil || event.Hash != latestLocal.Hash { + + outfile, err := ioutil.TempFile(globals.tmpDir, "asink") + if err != nil { + panic(err) //TODO handle sensibly + } + tmpfilename := outfile.Name() + outfile.Close() + err = globals.storage.Get(tmpfilename, event.Hash) + if err != nil { + panic(err) //TODO handle sensibly + } + + //rename to local hashed filename + hashedFilename := path.Join(globals.cacheDir, event.Hash) + err = os.Rename(tmpfilename, hashedFilename) + if err != nil { + err := os.Remove(tmpfilename) + if err != nil { + panic(err) + } + panic(err) + } + + //copy hashed file to another tmp, then rename it to the actual file. + tmpfilename, err = util.CopyToTmp(hashedFilename, globals.tmpDir) + if err != nil { + panic(err) + } + + //make sure containing directory exists + err = util.EnsureDirExists(path.Dir(absolutePath)) + if err != nil { + panic(err) + } + + err = os.Rename(tmpfilename, absolutePath) + if err != nil { + err2 := os.Remove(tmpfilename) + if err2 != nil { + panic(err2) + } + panic(err) + } + } + if latestLocal == nil || event.Permissions != latestLocal.Permissions { + err := os.Chmod(absolutePath, event.Permissions) + if err != nil && !util.ErrorFileNotFound(err) { + panic(err) + } + } + } else { + //intentionally ignore errors in case this file has been deleted out from under us + os.Remove(absolutePath) + //delete the directory previously containing this file if its the last file + util.RecursiveRemoveEmptyDirs(path.Dir(absolutePath)) + } + + //TODO make sure file being overwritten is either unchanged or already copied off and hashed +} + +func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) { + for event := range eventChan { + ProcessRemoteEvent(globals, event) + } +} + +func StopClient(args []string) { + const config_usage = "Config File to use" + userHomeDir := "~" + + u, err := user.Current() + if err == nil { + userHomeDir = u.HomeDir + } + + flags := flag.NewFlagSet("stop", flag.ExitOnError) + flags.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) + flags.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") + flags.Parse(args) + + config, err := conf.ReadConfigFile(globals.configFileName) + if err != nil { + fmt.Println(err) + fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?") + return + } + + rpcSock, err := config.GetString("local", "socket") //TODO make sure this exists + if err != nil { + fmt.Println("Error reading local.socket from config file at ", globals.configFileName) + } + + i := 99 + returnCode := 0 + err = asink.RPCCall(rpcSock, "ClientStopper.StopClient", &returnCode, &i) + if err != nil { + panic(err) + } +} diff --git a/asink/main.go b/asink/main.go index 0137b3a..96b9a1a 100644 --- a/asink/main.go +++ b/asink/main.go @@ -1,288 +1,49 @@ package main import ( - "asink" - "asink/util" - "code.google.com/p/goconf/conf" - "flag" "fmt" - "io/ioutil" "os" - "os/user" - "path" - "path/filepath" ) -type AsinkGlobals struct { - configFileName string - syncDir string - cacheDir string - tmpDir string - db *AsinkDB - storage Storage - server string - port int - username string - password string +type Command struct { + cmd string + fn func(args []string) + explanation string } -var globals AsinkGlobals - -func init() { - const config_usage = "Config File to use" - userHomeDir := "~" - - u, err := user.Current() - if err == nil { - userHomeDir = u.HomeDir - } - - flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) - flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") +var commands []Command = []Command{ + Command{ + cmd: "start", + fn: StartClient, + explanation: "Start the client daemon", + }, + Command{ + cmd: "stop", + fn: StopClient, + explanation: "Stop the client daemon", + }, + /* Command{ + cmd: "status", + fn: GetStatus, + explanation: "Get a summary of the client's status", + }, + */ } func main() { - flag.Parse() - - //make sure config file's permissions are read-write only for the current user - if !util.FileExistsAndHasPermissions(globals.configFileName, 384 /*0b110000000*/) { - fmt.Println("Error: Either the file at " + globals.configFileName + " doesn't exist, or it doesn't have permissions such that the current user is the only one allowed to read and write.") - return - } - - config, err := conf.ReadConfigFile(globals.configFileName) - if err != nil { - fmt.Println(err) - fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?") - return - } - - globals.storage, err = GetStorage(config) - if err != nil { - fmt.Println(err) - return - } - - globals.syncDir, err = config.GetString("local", "syncdir") - globals.cacheDir, err = config.GetString("local", "cachedir") - globals.tmpDir, err = config.GetString("local", "tmpdir") - - //make sure all the necessary directories exist - err = util.EnsureDirExists(globals.syncDir) - if err != nil { - panic(err) - } - err = util.EnsureDirExists(globals.cacheDir) - if err != nil { - panic(err) - } - err = util.EnsureDirExists(globals.tmpDir) - if err != nil { - panic(err) - } - - //TODO check errors on server settings - globals.server, err = config.GetString("server", "host") - globals.port, err = config.GetInt("server", "port") - globals.username, err = config.GetString("server", "username") - globals.password, err = config.GetString("server", "password") - - globals.db, err = GetAndInitDB(config) - if err != nil { - panic(err) - } - - //spawn goroutine to handle locking file paths - go PathLocker(globals.db) - - //spawn goroutines to handle local events - localFileUpdates := make(chan *asink.Event) - go StartWatching(globals.syncDir, localFileUpdates) - - //spawn goroutines to receive remote events - remoteFileUpdates := make(chan *asink.Event) - go GetEvents(globals, remoteFileUpdates) - go ProcessRemoteEvents(globals, remoteFileUpdates) - - for { - event := <-localFileUpdates - go ProcessLocalEvent(globals, event) - } -} - -func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { - //make the path relative before we save/send it anywhere - var err error - absolutePath := event.Path - event.Path, err = filepath.Rel(globals.syncDir, event.Path) - if err != nil { - panic(err) - } - - latestLocal := LockPath(event.Path, true) - defer UnlockPath(event) - if latestLocal != nil { - event.Predecessor = latestLocal.Hash - } - - if event.IsUpdate() { - //copy to tmp - //TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off - tmpfilename, err := util.CopyToTmp(absolutePath, globals.tmpDir) - if err != nil { - //bail out if the file we are trying to upload already got deleted - if util.ErrorFileNotFound(err) { - event.LocalStatus |= asink.DISCARDED + if len(os.Args) > 1 { + cmd := os.Args[1] + for _, c := range commands { + if c.cmd == cmd { + c.fn(os.Args[2:]) return } - panic(err) - } - - //try to collect the file's permissions - fileinfo, err := os.Stat(absolutePath) - if err != nil { - //bail out if the file we are trying to upload already got deleted - if util.ErrorFileNotFound(err) { - event.LocalStatus |= asink.DISCARDED - return - } - panic(err) - } else { - event.Permissions = fileinfo.Mode() - } - - //get the file's hash - hash, err := HashFile(tmpfilename) - if err != nil { - panic(err) - } - event.Hash = hash - - //If the file didn't actually change, squash this event - if latestLocal != nil && event.Hash == latestLocal.Hash { - os.Remove(tmpfilename) - event.LocalStatus |= asink.DISCARDED - return - } - - //rename to local cache w/ filename=hash - cachedFilename := path.Join(globals.cacheDir, event.Hash) - err = os.Rename(tmpfilename, cachedFilename) - if err != nil { - err := os.Remove(tmpfilename) - if err != nil { - panic(err) - } - panic(err) - } - - //upload file to remote storage - err = globals.storage.Put(cachedFilename, event.Hash) - if err != nil { - panic(err) } + fmt.Println("Invalid subcommand specified, please pick from the following:") } else { - //if we're trying to delete a file that we thought was already deleted, there's no need to delete it again - if latestLocal != nil && latestLocal.IsDelete() { - event.LocalStatus |= asink.DISCARDED - return - } + fmt.Println("No subcommand specified, please pick one from the following:") } - - //finally, send it off to the server - err = SendEvent(globals, event) - if err != nil { - panic(err) //TODO handle sensibly - } -} - -func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { - latestLocal := LockPath(event.Path, true) - defer UnlockPath(event) - - //get the absolute path because we may need it later - absolutePath := path.Join(globals.syncDir, event.Path) - - //if we already have this event, or if it is older than our most recent event, bail out - if latestLocal != nil { - if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) { - event.LocalStatus |= asink.DISCARDED - return - } - - if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash { - fmt.Printf("conflict:\n") - fmt.Printf("OLD %+v\n", latestLocal) - fmt.Printf("NEW %+v\n", event) - //TODO handle conflict? - } - } - - //Download event - if event.IsUpdate() { - if latestLocal == nil || event.Hash != latestLocal.Hash { - - outfile, err := ioutil.TempFile(globals.tmpDir, "asink") - if err != nil { - panic(err) //TODO handle sensibly - } - tmpfilename := outfile.Name() - outfile.Close() - err = globals.storage.Get(tmpfilename, event.Hash) - if err != nil { - panic(err) //TODO handle sensibly - } - - //rename to local hashed filename - hashedFilename := path.Join(globals.cacheDir, event.Hash) - err = os.Rename(tmpfilename, hashedFilename) - if err != nil { - err := os.Remove(tmpfilename) - if err != nil { - panic(err) - } - panic(err) - } - - //copy hashed file to another tmp, then rename it to the actual file. - tmpfilename, err = util.CopyToTmp(hashedFilename, globals.tmpDir) - if err != nil { - panic(err) - } - - //make sure containing directory exists - err = util.EnsureDirExists(path.Dir(absolutePath)) - if err != nil { - panic(err) - } - - err = os.Rename(tmpfilename, absolutePath) - if err != nil { - err2 := os.Remove(tmpfilename) - if err2 != nil { - panic(err2) - } - panic(err) - } - } - if latestLocal == nil || event.Permissions != latestLocal.Permissions { - err := os.Chmod(absolutePath, event.Permissions) - if err != nil && !util.ErrorFileNotFound(err) { - panic(err) - } - } - } else { - //intentionally ignore errors in case this file has been deleted out from under us - os.Remove(absolutePath) - //delete the directory previously containing this file if its the last file - util.RecursiveRemoveEmptyDirs(path.Dir(absolutePath)) - } - - //TODO make sure file being overwritten is either unchanged or already copied off and hashed -} - -func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) { - for event := range eventChan { - ProcessRemoteEvent(globals, event) + for _, c := range commands { + fmt.Printf("\t%s\t\t%s\n", c.cmd, c.explanation) } } diff --git a/asink/rpc_server.go b/asink/rpc_server.go new file mode 100644 index 0000000..3e71cf7 --- /dev/null +++ b/asink/rpc_server.go @@ -0,0 +1,34 @@ +package main + +import ( + "asink" + "net" + "net/http" + "net/rpc" +) + +type ClientStopper int + +func (c *ClientStopper) StopClient(code *int, result *int) error { + asink.Exit(*code) + *result = 0 + return nil +} + +func StartRPC(sock string, tornDown chan int) { + defer func() { tornDown <- 0 }() //the main thread waits for this to ensure the socket is closed + + clientstop := new(ClientStopper) + rpc.Register(clientstop) + + rpc.HandleHTTP() + l, err := net.Listen("unix", sock) + if err != nil { + panic(err) + } + defer l.Close() + + go http.Serve(l, nil) + + asink.WaitOnExit() +} diff --git a/asinkd/server.go b/asinkd/server.go index 4c731bf..316f222 100644 --- a/asinkd/server.go +++ b/asinkd/server.go @@ -74,7 +74,7 @@ func StopServer(args []string) { i := 99 returnCode := 0 - err := RPCCall(rpcSock, "ServerStopper.StopServer", &returnCode, &i) + err := asink.RPCCall(rpcSock, "ServerStopper.StopServer", &returnCode, &i) if err != nil { panic(err) } diff --git a/asinkd/user_admin.go b/asinkd/user_admin.go index d159bdb..eec6da4 100644 --- a/asinkd/user_admin.go +++ b/asinkd/user_admin.go @@ -1,6 +1,7 @@ package main import ( + "asink" "code.google.com/p/gopass" "flag" "fmt" @@ -70,7 +71,7 @@ func UserAdd(args []string) { user.PWHash = HashPassword(passwordOne) i := 99 - err = RPCCall(*rpcSocket, "UserModifier.AddUser", user, &i) + err = asink.RPCCall(*rpcSocket, "UserModifier.AddUser", user, &i) if err != nil { if _, ok := err.(rpc.ServerError); ok && err.Error() == DuplicateUsernameErr.Error() { fmt.Println("Error: " + err.Error()) @@ -94,7 +95,7 @@ func UserDel(args []string) { user.Username = args[0] i := 99 - err := RPCCall(*rpcSocket, "UserModifier.RemoveUser", user, &i) + err := asink.RPCCall(*rpcSocket, "UserModifier.RemoveUser", user, &i) if err != nil { if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() { fmt.Println("Error: " + err.Error()) @@ -162,7 +163,7 @@ func UserMod(args []string) { } i := 99 - err := RPCCall(*rpcSocket, "UserModifier.ModifyUser", rpcargs, &i) + err := asink.RPCCall(*rpcSocket, "UserModifier.ModifyUser", rpcargs, &i) if err != nil { if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() { fmt.Println("Error: " + err.Error()) diff --git a/asinkd/rpc_client.go b/rpc_client.go similarity index 97% rename from asinkd/rpc_client.go rename to rpc_client.go index 718690d..68272c7 100644 --- a/asinkd/rpc_client.go +++ b/rpc_client.go @@ -1,4 +1,4 @@ -package main +package asink import ( "log"