Generalize RPC client code, add basic RPC functionality to client
This commit is contained in:
		
							
								
								
									
										338
									
								
								asink/client.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										338
									
								
								asink/client.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,338 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"asink"
 | 
			
		||||
	"asink/util"
 | 
			
		||||
	"code.google.com/p/goconf/conf"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/user"
 | 
			
		||||
	"path"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AsinkGlobals struct {
 | 
			
		||||
	configFileName string
 | 
			
		||||
	syncDir        string
 | 
			
		||||
	cacheDir       string
 | 
			
		||||
	tmpDir         string
 | 
			
		||||
	rpcSock        string
 | 
			
		||||
	db             *AsinkDB
 | 
			
		||||
	storage        Storage
 | 
			
		||||
	server         string
 | 
			
		||||
	port           int
 | 
			
		||||
	username       string
 | 
			
		||||
	password       string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var globals AsinkGlobals
 | 
			
		||||
 | 
			
		||||
var flags *flag.FlagSet
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	asink.SetupCleanExitOnSignals()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StartClient(args []string) {
 | 
			
		||||
	const config_usage = "Config File to use"
 | 
			
		||||
	userHomeDir := "~"
 | 
			
		||||
 | 
			
		||||
	u, err := user.Current()
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		userHomeDir = u.HomeDir
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	flags := flag.NewFlagSet("start", flag.ExitOnError)
 | 
			
		||||
	flags.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
 | 
			
		||||
	flags.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
 | 
			
		||||
	flags.Parse(args)
 | 
			
		||||
 | 
			
		||||
	//make sure config file's permissions are read-write only for the current user
 | 
			
		||||
	if !util.FileExistsAndHasPermissions(globals.configFileName, 384 /*0b110000000*/) {
 | 
			
		||||
		fmt.Println("Error: Either the file at " + globals.configFileName + " doesn't exist, or it doesn't have permissions such that the current user is the only one allowed to read and write.")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	config, err := conf.ReadConfigFile(globals.configFileName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	globals.storage, err = GetStorage(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	globals.syncDir, err = config.GetString("local", "syncdir")
 | 
			
		||||
	globals.cacheDir, err = config.GetString("local", "cachedir")
 | 
			
		||||
	globals.tmpDir, err = config.GetString("local", "tmpdir")
 | 
			
		||||
	globals.rpcSock, err = config.GetString("local", "socket") //TODO make sure this exists
 | 
			
		||||
 | 
			
		||||
	//make sure all the necessary directories exist
 | 
			
		||||
	err = util.EnsureDirExists(globals.syncDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = util.EnsureDirExists(globals.cacheDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = util.EnsureDirExists(globals.tmpDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//TODO check errors on server settings
 | 
			
		||||
	globals.server, err = config.GetString("server", "host")
 | 
			
		||||
	globals.port, err = config.GetInt("server", "port")
 | 
			
		||||
	globals.username, err = config.GetString("server", "username")
 | 
			
		||||
	globals.password, err = config.GetString("server", "password")
 | 
			
		||||
 | 
			
		||||
	globals.db, err = GetAndInitDB(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//spawn goroutine to handle locking file paths
 | 
			
		||||
	go PathLocker(globals.db)
 | 
			
		||||
 | 
			
		||||
	//spawn goroutines to handle local events
 | 
			
		||||
	localFileUpdates := make(chan *asink.Event)
 | 
			
		||||
	go StartWatching(globals.syncDir, localFileUpdates)
 | 
			
		||||
 | 
			
		||||
	//spawn goroutines to receive remote events
 | 
			
		||||
	remoteFileUpdates := make(chan *asink.Event)
 | 
			
		||||
	go GetEvents(globals, remoteFileUpdates)
 | 
			
		||||
	go ProcessLocalEvents(globals, localFileUpdates)
 | 
			
		||||
	//TODO ensure remote updates wait until all local changes are saved off?
 | 
			
		||||
	go ProcessRemoteEvents(globals, remoteFileUpdates)
 | 
			
		||||
 | 
			
		||||
	rpcTornDown := make(chan int)
 | 
			
		||||
	go StartRPC(globals.rpcSock, rpcTornDown)
 | 
			
		||||
 | 
			
		||||
	asink.WaitOnExit()
 | 
			
		||||
	<-rpcTornDown
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	//make the path relative before we save/send it anywhere
 | 
			
		||||
	var err error
 | 
			
		||||
	absolutePath := event.Path
 | 
			
		||||
	event.Path, err = filepath.Rel(globals.syncDir, event.Path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	latestLocal := LockPath(event.Path, true)
 | 
			
		||||
	defer UnlockPath(event)
 | 
			
		||||
	if latestLocal != nil {
 | 
			
		||||
		event.Predecessor = latestLocal.Hash
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if event.IsUpdate() {
 | 
			
		||||
		//copy to tmp
 | 
			
		||||
		//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
 | 
			
		||||
		tmpfilename, err := util.CopyToTmp(absolutePath, globals.tmpDir)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			//bail out if the file we are trying to upload already got deleted
 | 
			
		||||
			if util.ErrorFileNotFound(err) {
 | 
			
		||||
				event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//try to collect the file's permissions
 | 
			
		||||
		fileinfo, err := os.Stat(absolutePath)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			//bail out if the file we are trying to upload already got deleted
 | 
			
		||||
			if util.ErrorFileNotFound(err) {
 | 
			
		||||
				event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		} else {
 | 
			
		||||
			event.Permissions = fileinfo.Mode()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//get the file's hash
 | 
			
		||||
		hash, err := HashFile(tmpfilename)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		event.Hash = hash
 | 
			
		||||
 | 
			
		||||
		//If the file didn't actually change, squash this event
 | 
			
		||||
		if latestLocal != nil && event.Hash == latestLocal.Hash {
 | 
			
		||||
			os.Remove(tmpfilename)
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//rename to local cache w/ filename=hash
 | 
			
		||||
		cachedFilename := path.Join(globals.cacheDir, event.Hash)
 | 
			
		||||
		err = os.Rename(tmpfilename, cachedFilename)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			err := os.Remove(tmpfilename)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//upload file to remote storage
 | 
			
		||||
		err = globals.storage.Put(cachedFilename, event.Hash)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		//if we're trying to delete a file that we thought was already deleted, there's no need to delete it again
 | 
			
		||||
		if latestLocal != nil && latestLocal.IsDelete() {
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//finally, send it off to the server
 | 
			
		||||
	err = SendEvent(globals, event)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err) //TODO handle sensibly
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessLocalEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
 | 
			
		||||
	for {
 | 
			
		||||
		event := <-eventChan
 | 
			
		||||
		go ProcessLocalEvent(globals, event)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	latestLocal := LockPath(event.Path, true)
 | 
			
		||||
	defer UnlockPath(event)
 | 
			
		||||
 | 
			
		||||
	//get the absolute path because we may need it later
 | 
			
		||||
	absolutePath := path.Join(globals.syncDir, event.Path)
 | 
			
		||||
 | 
			
		||||
	//if we already have this event, or if it is older than our most recent event, bail out
 | 
			
		||||
	if latestLocal != nil {
 | 
			
		||||
		if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash {
 | 
			
		||||
			fmt.Printf("conflict:\n")
 | 
			
		||||
			fmt.Printf("OLD %+v\n", latestLocal)
 | 
			
		||||
			fmt.Printf("NEW %+v\n", event)
 | 
			
		||||
			//TODO handle conflict?
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//Download event
 | 
			
		||||
	if event.IsUpdate() {
 | 
			
		||||
		if latestLocal == nil || event.Hash != latestLocal.Hash {
 | 
			
		||||
 | 
			
		||||
			outfile, err := ioutil.TempFile(globals.tmpDir, "asink")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err) //TODO handle sensibly
 | 
			
		||||
			}
 | 
			
		||||
			tmpfilename := outfile.Name()
 | 
			
		||||
			outfile.Close()
 | 
			
		||||
			err = globals.storage.Get(tmpfilename, event.Hash)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err) //TODO handle sensibly
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//rename to local hashed filename
 | 
			
		||||
			hashedFilename := path.Join(globals.cacheDir, event.Hash)
 | 
			
		||||
			err = os.Rename(tmpfilename, hashedFilename)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err := os.Remove(tmpfilename)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					panic(err)
 | 
			
		||||
				}
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//copy hashed file to another tmp, then rename it to the actual file.
 | 
			
		||||
			tmpfilename, err = util.CopyToTmp(hashedFilename, globals.tmpDir)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//make sure containing directory exists
 | 
			
		||||
			err = util.EnsureDirExists(path.Dir(absolutePath))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			err = os.Rename(tmpfilename, absolutePath)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err2 := os.Remove(tmpfilename)
 | 
			
		||||
				if err2 != nil {
 | 
			
		||||
					panic(err2)
 | 
			
		||||
				}
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if latestLocal == nil || event.Permissions != latestLocal.Permissions {
 | 
			
		||||
			err := os.Chmod(absolutePath, event.Permissions)
 | 
			
		||||
			if err != nil && !util.ErrorFileNotFound(err) {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		//intentionally ignore errors in case this file has been deleted out from under us
 | 
			
		||||
		os.Remove(absolutePath)
 | 
			
		||||
		//delete the directory previously containing this file if its the last file
 | 
			
		||||
		util.RecursiveRemoveEmptyDirs(path.Dir(absolutePath))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//TODO make sure file being overwritten is either unchanged or already copied off and hashed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
 | 
			
		||||
	for event := range eventChan {
 | 
			
		||||
		ProcessRemoteEvent(globals, event)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StopClient(args []string) {
 | 
			
		||||
	const config_usage = "Config File to use"
 | 
			
		||||
	userHomeDir := "~"
 | 
			
		||||
 | 
			
		||||
	u, err := user.Current()
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		userHomeDir = u.HomeDir
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	flags := flag.NewFlagSet("stop", flag.ExitOnError)
 | 
			
		||||
	flags.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
 | 
			
		||||
	flags.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
 | 
			
		||||
	flags.Parse(args)
 | 
			
		||||
 | 
			
		||||
	config, err := conf.ReadConfigFile(globals.configFileName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rpcSock, err := config.GetString("local", "socket") //TODO make sure this exists
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println("Error reading local.socket from config file at ", globals.configFileName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	i := 99
 | 
			
		||||
	returnCode := 0
 | 
			
		||||
	err = asink.RPCCall(rpcSock, "ClientStopper.StopClient", &returnCode, &i)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										299
									
								
								asink/main.go
									
									
									
									
									
								
							
							
						
						
									
										299
									
								
								asink/main.go
									
									
									
									
									
								
							@@ -1,288 +1,49 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"asink"
 | 
			
		||||
	"asink/util"
 | 
			
		||||
	"code.google.com/p/goconf/conf"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/user"
 | 
			
		||||
	"path"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AsinkGlobals struct {
 | 
			
		||||
	configFileName string
 | 
			
		||||
	syncDir        string
 | 
			
		||||
	cacheDir       string
 | 
			
		||||
	tmpDir         string
 | 
			
		||||
	db             *AsinkDB
 | 
			
		||||
	storage        Storage
 | 
			
		||||
	server         string
 | 
			
		||||
	port           int
 | 
			
		||||
	username       string
 | 
			
		||||
	password       string
 | 
			
		||||
type Command struct {
 | 
			
		||||
	cmd         string
 | 
			
		||||
	fn          func(args []string)
 | 
			
		||||
	explanation string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var globals AsinkGlobals
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	const config_usage = "Config File to use"
 | 
			
		||||
	userHomeDir := "~"
 | 
			
		||||
 | 
			
		||||
	u, err := user.Current()
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		userHomeDir = u.HomeDir
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
 | 
			
		||||
	flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
 | 
			
		||||
var commands []Command = []Command{
 | 
			
		||||
	Command{
 | 
			
		||||
		cmd:         "start",
 | 
			
		||||
		fn:          StartClient,
 | 
			
		||||
		explanation: "Start the client daemon",
 | 
			
		||||
	},
 | 
			
		||||
	Command{
 | 
			
		||||
		cmd:         "stop",
 | 
			
		||||
		fn:          StopClient,
 | 
			
		||||
		explanation: "Stop the client daemon",
 | 
			
		||||
	},
 | 
			
		||||
	/*	Command{
 | 
			
		||||
			cmd:         "status",
 | 
			
		||||
			fn:          GetStatus,
 | 
			
		||||
			explanation: "Get a summary of the client's status",
 | 
			
		||||
		},
 | 
			
		||||
	*/
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
	//make sure config file's permissions are read-write only for the current user
 | 
			
		||||
	if !util.FileExistsAndHasPermissions(globals.configFileName, 384 /*0b110000000*/) {
 | 
			
		||||
		fmt.Println("Error: Either the file at " + globals.configFileName + " doesn't exist, or it doesn't have permissions such that the current user is the only one allowed to read and write.")
 | 
			
		||||
	if len(os.Args) > 1 {
 | 
			
		||||
		cmd := os.Args[1]
 | 
			
		||||
		for _, c := range commands {
 | 
			
		||||
			if c.cmd == cmd {
 | 
			
		||||
				c.fn(os.Args[2:])
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
	config, err := conf.ReadConfigFile(globals.configFileName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
 | 
			
		||||
		return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	globals.storage, err = GetStorage(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		fmt.Println(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	globals.syncDir, err = config.GetString("local", "syncdir")
 | 
			
		||||
	globals.cacheDir, err = config.GetString("local", "cachedir")
 | 
			
		||||
	globals.tmpDir, err = config.GetString("local", "tmpdir")
 | 
			
		||||
 | 
			
		||||
	//make sure all the necessary directories exist
 | 
			
		||||
	err = util.EnsureDirExists(globals.syncDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = util.EnsureDirExists(globals.cacheDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = util.EnsureDirExists(globals.tmpDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//TODO check errors on server settings
 | 
			
		||||
	globals.server, err = config.GetString("server", "host")
 | 
			
		||||
	globals.port, err = config.GetInt("server", "port")
 | 
			
		||||
	globals.username, err = config.GetString("server", "username")
 | 
			
		||||
	globals.password, err = config.GetString("server", "password")
 | 
			
		||||
 | 
			
		||||
	globals.db, err = GetAndInitDB(config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//spawn goroutine to handle locking file paths
 | 
			
		||||
	go PathLocker(globals.db)
 | 
			
		||||
 | 
			
		||||
	//spawn goroutines to handle local events
 | 
			
		||||
	localFileUpdates := make(chan *asink.Event)
 | 
			
		||||
	go StartWatching(globals.syncDir, localFileUpdates)
 | 
			
		||||
 | 
			
		||||
	//spawn goroutines to receive remote events
 | 
			
		||||
	remoteFileUpdates := make(chan *asink.Event)
 | 
			
		||||
	go GetEvents(globals, remoteFileUpdates)
 | 
			
		||||
	go ProcessRemoteEvents(globals, remoteFileUpdates)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		event := <-localFileUpdates
 | 
			
		||||
		go ProcessLocalEvent(globals, event)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	//make the path relative before we save/send it anywhere
 | 
			
		||||
	var err error
 | 
			
		||||
	absolutePath := event.Path
 | 
			
		||||
	event.Path, err = filepath.Rel(globals.syncDir, event.Path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	latestLocal := LockPath(event.Path, true)
 | 
			
		||||
	defer UnlockPath(event)
 | 
			
		||||
	if latestLocal != nil {
 | 
			
		||||
		event.Predecessor = latestLocal.Hash
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if event.IsUpdate() {
 | 
			
		||||
		//copy to tmp
 | 
			
		||||
		//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
 | 
			
		||||
		tmpfilename, err := util.CopyToTmp(absolutePath, globals.tmpDir)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			//bail out if the file we are trying to upload already got deleted
 | 
			
		||||
			if util.ErrorFileNotFound(err) {
 | 
			
		||||
				event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//try to collect the file's permissions
 | 
			
		||||
		fileinfo, err := os.Stat(absolutePath)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			//bail out if the file we are trying to upload already got deleted
 | 
			
		||||
			if util.ErrorFileNotFound(err) {
 | 
			
		||||
				event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		fmt.Println("Invalid subcommand specified, please pick from the following:")
 | 
			
		||||
	} else {
 | 
			
		||||
			event.Permissions = fileinfo.Mode()
 | 
			
		||||
		fmt.Println("No subcommand specified, please pick one from the following:")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		//get the file's hash
 | 
			
		||||
		hash, err := HashFile(tmpfilename)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
		event.Hash = hash
 | 
			
		||||
 | 
			
		||||
		//If the file didn't actually change, squash this event
 | 
			
		||||
		if latestLocal != nil && event.Hash == latestLocal.Hash {
 | 
			
		||||
			os.Remove(tmpfilename)
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//rename to local cache w/ filename=hash
 | 
			
		||||
		cachedFilename := path.Join(globals.cacheDir, event.Hash)
 | 
			
		||||
		err = os.Rename(tmpfilename, cachedFilename)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			err := os.Remove(tmpfilename)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//upload file to remote storage
 | 
			
		||||
		err = globals.storage.Put(cachedFilename, event.Hash)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		//if we're trying to delete a file that we thought was already deleted, there's no need to delete it again
 | 
			
		||||
		if latestLocal != nil && latestLocal.IsDelete() {
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//finally, send it off to the server
 | 
			
		||||
	err = SendEvent(globals, event)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err) //TODO handle sensibly
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	latestLocal := LockPath(event.Path, true)
 | 
			
		||||
	defer UnlockPath(event)
 | 
			
		||||
 | 
			
		||||
	//get the absolute path because we may need it later
 | 
			
		||||
	absolutePath := path.Join(globals.syncDir, event.Path)
 | 
			
		||||
 | 
			
		||||
	//if we already have this event, or if it is older than our most recent event, bail out
 | 
			
		||||
	if latestLocal != nil {
 | 
			
		||||
		if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
 | 
			
		||||
			event.LocalStatus |= asink.DISCARDED
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash {
 | 
			
		||||
			fmt.Printf("conflict:\n")
 | 
			
		||||
			fmt.Printf("OLD %+v\n", latestLocal)
 | 
			
		||||
			fmt.Printf("NEW %+v\n", event)
 | 
			
		||||
			//TODO handle conflict?
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//Download event
 | 
			
		||||
	if event.IsUpdate() {
 | 
			
		||||
		if latestLocal == nil || event.Hash != latestLocal.Hash {
 | 
			
		||||
 | 
			
		||||
			outfile, err := ioutil.TempFile(globals.tmpDir, "asink")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err) //TODO handle sensibly
 | 
			
		||||
			}
 | 
			
		||||
			tmpfilename := outfile.Name()
 | 
			
		||||
			outfile.Close()
 | 
			
		||||
			err = globals.storage.Get(tmpfilename, event.Hash)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err) //TODO handle sensibly
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//rename to local hashed filename
 | 
			
		||||
			hashedFilename := path.Join(globals.cacheDir, event.Hash)
 | 
			
		||||
			err = os.Rename(tmpfilename, hashedFilename)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err := os.Remove(tmpfilename)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					panic(err)
 | 
			
		||||
				}
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//copy hashed file to another tmp, then rename it to the actual file.
 | 
			
		||||
			tmpfilename, err = util.CopyToTmp(hashedFilename, globals.tmpDir)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			//make sure containing directory exists
 | 
			
		||||
			err = util.EnsureDirExists(path.Dir(absolutePath))
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			err = os.Rename(tmpfilename, absolutePath)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err2 := os.Remove(tmpfilename)
 | 
			
		||||
				if err2 != nil {
 | 
			
		||||
					panic(err2)
 | 
			
		||||
				}
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if latestLocal == nil || event.Permissions != latestLocal.Permissions {
 | 
			
		||||
			err := os.Chmod(absolutePath, event.Permissions)
 | 
			
		||||
			if err != nil && !util.ErrorFileNotFound(err) {
 | 
			
		||||
				panic(err)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		//intentionally ignore errors in case this file has been deleted out from under us
 | 
			
		||||
		os.Remove(absolutePath)
 | 
			
		||||
		//delete the directory previously containing this file if its the last file
 | 
			
		||||
		util.RecursiveRemoveEmptyDirs(path.Dir(absolutePath))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//TODO make sure file being overwritten is either unchanged or already copied off and hashed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
 | 
			
		||||
	for event := range eventChan {
 | 
			
		||||
		ProcessRemoteEvent(globals, event)
 | 
			
		||||
	for _, c := range commands {
 | 
			
		||||
		fmt.Printf("\t%s\t\t%s\n", c.cmd, c.explanation)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										34
									
								
								asink/rpc_server.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								asink/rpc_server.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,34 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"asink"
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/rpc"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ClientStopper int
 | 
			
		||||
 | 
			
		||||
func (c *ClientStopper) StopClient(code *int, result *int) error {
 | 
			
		||||
	asink.Exit(*code)
 | 
			
		||||
	*result = 0
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StartRPC(sock string, tornDown chan int) {
 | 
			
		||||
	defer func() { tornDown <- 0 }() //the main thread waits for this to ensure the socket is closed
 | 
			
		||||
 | 
			
		||||
	clientstop := new(ClientStopper)
 | 
			
		||||
	rpc.Register(clientstop)
 | 
			
		||||
 | 
			
		||||
	rpc.HandleHTTP()
 | 
			
		||||
	l, err := net.Listen("unix", sock)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer l.Close()
 | 
			
		||||
 | 
			
		||||
	go http.Serve(l, nil)
 | 
			
		||||
 | 
			
		||||
	asink.WaitOnExit()
 | 
			
		||||
}
 | 
			
		||||
@@ -74,7 +74,7 @@ func StopServer(args []string) {
 | 
			
		||||
 | 
			
		||||
	i := 99
 | 
			
		||||
	returnCode := 0
 | 
			
		||||
	err := RPCCall(rpcSock, "ServerStopper.StopServer", &returnCode, &i)
 | 
			
		||||
	err := asink.RPCCall(rpcSock, "ServerStopper.StopServer", &returnCode, &i)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"asink"
 | 
			
		||||
	"code.google.com/p/gopass"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
@@ -70,7 +71,7 @@ func UserAdd(args []string) {
 | 
			
		||||
	user.PWHash = HashPassword(passwordOne)
 | 
			
		||||
 | 
			
		||||
	i := 99
 | 
			
		||||
	err = RPCCall(*rpcSocket, "UserModifier.AddUser", user, &i)
 | 
			
		||||
	err = asink.RPCCall(*rpcSocket, "UserModifier.AddUser", user, &i)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if _, ok := err.(rpc.ServerError); ok && err.Error() == DuplicateUsernameErr.Error() {
 | 
			
		||||
			fmt.Println("Error: " + err.Error())
 | 
			
		||||
@@ -94,7 +95,7 @@ func UserDel(args []string) {
 | 
			
		||||
	user.Username = args[0]
 | 
			
		||||
 | 
			
		||||
	i := 99
 | 
			
		||||
	err := RPCCall(*rpcSocket, "UserModifier.RemoveUser", user, &i)
 | 
			
		||||
	err := asink.RPCCall(*rpcSocket, "UserModifier.RemoveUser", user, &i)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() {
 | 
			
		||||
			fmt.Println("Error: " + err.Error())
 | 
			
		||||
@@ -162,7 +163,7 @@ func UserMod(args []string) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	i := 99
 | 
			
		||||
	err := RPCCall(*rpcSocket, "UserModifier.ModifyUser", rpcargs, &i)
 | 
			
		||||
	err := asink.RPCCall(*rpcSocket, "UserModifier.ModifyUser", rpcargs, &i)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if _, ok := err.(rpc.ServerError); ok && err.Error() == NoUserErr.Error() {
 | 
			
		||||
			fmt.Println("Error: " + err.Error())
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
package main
 | 
			
		||||
package asink
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"log"
 | 
			
		||||
		Reference in New Issue
	
	Block a user