diff --git a/syncplay/client.py b/syncplay/client.py index 6bedfe0..00cb467 100644 --- a/syncplay/client.py +++ b/syncplay/client.py @@ -436,11 +436,10 @@ class SyncplayClient(object): class SyncplayUser(object): - def __init__(self, username=None, room=None, file_=None, position=0): + def __init__(self, username=None, room=None, file_=None): self.username = username self.room = room self.file = file_ - self.lastPosition = position def setFile(self, filename, duration, size): file_ = { @@ -513,11 +512,10 @@ class SyncplayUserlist(object): message = getMessage("en", "file-differences-notification") + ", ".join(differences) self.ui.showMessage(message, not constants.SHOW_OSD_WARNINGS) - def addUser(self, username, room, file_, position=0, noMessage=False): + def addUser(self, username, room, file_, noMessage=False): if(username == self.currentUser.username): - self.currentUser.lastPosition = position return - user = SyncplayUser(username, room, file_, position) + user = SyncplayUser(username, room, file_) self._users[username] = user if(not noMessage): self.__showUserChangeMessage(username, room, file_) diff --git a/syncplay/protocols.py b/syncplay/protocols.py index 6feaef4..4fe8da4 100644 --- a/syncplay/protocols.py +++ b/syncplay/protocols.py @@ -142,8 +142,7 @@ class SyncClientProtocol(JSONCommandProtocol): for user in room[1].iteritems(): userName = user[0] file_ = user[1]['file'] if user[1]['file'] <> {} else None - position = user[1]['position'] - self._client.userlist.addUser(userName, roomName, file_, position, noMessage=True) + self._client.userlist.addUser(userName, roomName, file_, noMessage=True) self._client.userlist.showUserList() def sendList(self): @@ -217,7 +216,6 @@ class SyncClientProtocol(JSONCommandProtocol): def sendError(self, message): self.sendMessage({"Error": {"message": message}}) - class SyncServerProtocol(JSONCommandProtocol): def __init__(self, factory): self._factory = factory @@ -227,6 +225,7 @@ class SyncServerProtocol(JSONCommandProtocol): self._pingService = PingService() self._clientLatencyCalculation = 0 self._clientLatencyCalculationArrivalTime = 0 + self._watcher = None def __hash__(self): return hash('|'.join(( @@ -248,7 +247,7 @@ class SyncServerProtocol(JSONCommandProtocol): self.drop() def connectionLost(self, reason): - self._factory.removeWatcher(self) + self._factory.removeWatcher(self._watcher) def _extractHelloArguments(self, hello): roomName, roomPassword = None, None @@ -286,13 +285,16 @@ class SyncServerProtocol(JSONCommandProtocol): self._logged = True self.sendHello(version) + def setWatcher(self, watcher): + self._watcher = watcher + def sendHello(self, clientVersion): hello = {} - username = self._factory.watcherGetUsername(self) + username = self._watcher.getName() hello["username"] = username userIp = self.transport.getPeer().host - room = self._factory.watcherGetRoom(self) - if(room): hello["room"] = {"name": room} + room = self._watcher.getRoom() + if(room): hello["room"] = {"name": room.getName()} hello["version"] = syncplay.version hello["motd"] = self._factory.getMotd(userIp, username, room, clientVersion) self.sendMessage({"Hello": hello}) @@ -303,18 +305,15 @@ class SyncServerProtocol(JSONCommandProtocol): command = set_[0] if command == "room": roomName = set_[1]["name"] if set_[1].has_key("name") else None - self._factory.watcherSetRoom(self, roomName) + self._factory.setWatcherRoom(self._watcher, roomName) elif command == "file": - self._factory.watcherSetFile(self, set_[1]) + self._watcher.setFile(set_[1]) def sendSet(self, setting): self.sendMessage({"Set": setting}) - def sendRoomSetting(self, roomName): - self.sendSet({"room": {"name": roomName}}) - - def sendUserSetting(self, username, roomName, file_, event): - room = {"name": roomName} + def sendUserSetting(self, username, room, file_, event): + room = {"name": room.getName()} user = {} user[username] = {} user[username]["room"] = room @@ -324,20 +323,19 @@ class SyncServerProtocol(JSONCommandProtocol): user[username]["event"] = event self.sendSet({"user": user}) - def _addUserOnList(self, userlist, roomPositions, watcher): - if (not userlist.has_key(watcher.room)): - userlist[watcher.room] = {} - roomPositions[watcher.room] = watcher.getRoomPosition() - userlist[watcher.room][watcher.name] = { - "file": watcher.file if watcher.file else {}, - "position": roomPositions[watcher.room] if roomPositions[watcher.room] else 0 - } + def _addUserOnList(self, userlist, watcher): + room = watcher.getRoom() + if room: + if room.getName() not in userlist: + userlist[room.getName()] = {} + userFile = { "position": 0, "file": watcher.getFile() if watcher.getFile() else {} } + userlist[room.getName()][watcher.getName()] = userFile + def sendList(self): userlist = {} - roomPositions = {} - watchers = self._factory.getAllWatchers(self) - for watcher in watchers.itervalues(): - self._addUserOnList(userlist, roomPositions, watcher) + watchers = self._factory.getAllWatchersForUser(self._watcher) + for watcher in watchers: + self._addUserOnList(userlist, watcher) self.sendMessage({"List": userlist}) @requireLogged @@ -350,10 +348,10 @@ class SyncServerProtocol(JSONCommandProtocol): else: processingTime = 0 playstate = { - "position": position, + "position": position if position else 0, "paused": paused, "doSeek": doSeek, - "setBy": setBy + "setBy": setBy.getName() } ping = { "latencyCalculation": self._pingService.newTimestamp(), @@ -404,7 +402,7 @@ class SyncServerProtocol(JSONCommandProtocol): self._clientLatencyCalculationArrivalTime = time.time() self._pingService.receiveMessage(latencyCalculation, clientRtt) if(self.serverIgnoringOnTheFly == 0): - self._factory.updateWatcherState(self, position, paused, doSeek, self._pingService.getLastForwardDelay()) + self._watcher.updateState(position, paused, doSeek, self._pingService.getLastForwardDelay()) def handleError(self, error): self.dropWithError(error["message"]) # TODO: more processing and fallbacking @@ -412,7 +410,6 @@ class SyncServerProtocol(JSONCommandProtocol): def sendError(self, message): self.sendMessage({"Error": {"message": message}}) - class PingService(object): def __init__(self): diff --git a/syncplay/server.py b/syncplay/server.py index 723582b..6133b7d 100644 --- a/syncplay/server.py +++ b/syncplay/server.py @@ -1,4 +1,3 @@ -# coding:utf8 import hashlib from twisted.internet import task, reactor from twisted.internet.protocol import Factory @@ -12,79 +11,29 @@ import codecs import os from string import Template import argparse +from pprint import pprint class SyncFactory(Factory): - def __init__(self, password='', motdFilePath=None): + def __init__(self, password='', motdFilePath=None, isolateRooms=False): print getMessage("en", "welcome-server-notification").format(syncplay.version) if(password): password = hashlib.md5(password).hexdigest() self.password = password self._motdFilePath = motdFilePath - self._rooms = {} - self._roomStates = {} - self._roomUpdate = threading.RLock() + if(not isolateRooms): + self._roomManager = RoomManager() + else: + self._roomManager = PublicRoomManager() def buildProtocol(self, addr): return SyncServerProtocol(self) - def _createRoomIfDoesntExist(self, roomName): - if (not self._rooms.has_key(roomName)): - with self._roomUpdate: - self._rooms[roomName] = {} - self._roomStates[roomName] = { - "position": 0.0, - "paused": True, - "setBy": None, - "lastUpdate": time.time() - } - - def addWatcher(self, watcherProtocol, username, roomName, roomPassword): - allnames = [] - for room in self._rooms.itervalues(): - for watcher in room.itervalues(): - allnames.append(watcher.name.lower()) - while username.lower() in allnames: - username += '_' - self._createRoomIfDoesntExist(roomName) - watcher = Watcher(self, watcherProtocol, username, roomName) - with self._roomUpdate: - self._rooms[roomName][watcherProtocol] = watcher - reactor.callLater(0.1, watcher.scheduleSendState) - l = lambda w: w.sendUserSetting(username, roomName, None, {"joined": True}) - self.broadcast(watcherProtocol, l) - - def getWatcher(self, watcherProtocol): - for room in self._rooms.itervalues(): - if(room.has_key(watcherProtocol)): - return room[watcherProtocol] - - def getAllWatchers(self, watcherProtocol): # TODO: Optimize me - watchers = {} - for room in self._rooms.itervalues(): - for watcher in room.itervalues(): - watchers[watcher.watcherProtocol] = watcher - return watchers - - def _removeWatcherFromTheRoom(self, watcherProtocol): - for room in self._rooms.itervalues(): - with self._roomUpdate: - watcher = room.pop(watcherProtocol, None) - if(watcher): - return watcher - - def _deleteRoomIfEmpty(self, room): - if (self._rooms[room] == {}): - with self._roomUpdate: - self._rooms.pop(room) - self._roomStates.pop(room) - - def getRoomPausedAndPosition(self, room): - position = self._roomStates[room]["position"] - paused = self._roomStates[room]["paused"] - if (not paused): - timePassedSinceSet = time.time() - self._roomStates[room]["lastUpdate"] - position += timePassedSinceSet - return paused, position + def sendState(self, watcher, doSeek=False, forcedUpdate=False): + room = watcher.getRoom() + if room: + paused, position = room.isPaused(), room.getPosition() + setBy = room.getSetBy() + watcher.sendState(position, paused, doSeek, setBy, forcedUpdate) def getMotd(self, userIp, username, room, clientVersion): oldClient = False @@ -107,172 +56,258 @@ class SyncFactory(Factory): else: return "" - def sendState(self, watcherProtocol, doSeek=False, forcedUpdate=False): - watcher = self.getWatcher(watcherProtocol) - if(not watcher): - return - room = watcher.room - paused, position = self.getRoomPausedAndPosition(room) - setBy = self._roomStates[room]["setBy"] - watcher.paused = paused - watcher.position = position - watcherProtocol.sendState(position, paused, doSeek, setBy, forcedUpdate) - if(time.time() - watcher.lastUpdate > constants.PROTOCOL_TIMEOUT): - watcherProtocol.drop() - self.removeWatcher(watcherProtocol) + def addWatcher(self, watcherProtocol, username, roomName, roomPassword): + username = self._roomManager.findFreeUsername(username) + watcher = Watcher(self, watcherProtocol, username) + self.setWatcherRoom(watcher, roomName) - def __shouldServerForceUpdateOnRoom(self, pauseChanged, doSeek): - return doSeek or pauseChanged + def setWatcherRoom(self, watcher, roomName): + self._roomManager.moveWatcher(watcher, roomName) + self.sendJoinMessage(watcher) - def __updatePausedState(self, paused, watcher): - watcher.paused = paused - if(self._roomStates[watcher.room]["paused"] <> paused): - self._roomStates[watcher.room]["setBy"] = watcher.name - self._roomStates[watcher.room]["paused"] = paused - self._roomStates[watcher.room]["lastUpdate"] = time.time() - return True + def removeWatcher(self, watcher): + self.sendLeftMessage(watcher) + self._roomManager.removeWatcher(watcher) - def __updatePositionState(self, position, doSeek, watcher): - watcher.position = position - if (doSeek): - self._roomStates[watcher.room]["position"] = position - self._roomStates[watcher.room]["setBy"] = watcher.name - self._roomStates[watcher.room]["lastUpdate"] = time.time() + def sendLeftMessage(self, watcher): + l = lambda w: w.sendSetting(watcher.getName(), watcher.getRoom(), None, {"left": True}) + self._roomManager.broadcast(watcher, l) + + def sendJoinMessage(self, watcher): + l = lambda w: w.sendSetting(watcher.getName(), watcher.getRoom(), None, {"joined": True}) + self._roomManager.broadcast(watcher, l) + + def sendFileUpdate(self, watcher, file_): + l = lambda w: w.sendSetting(watcher.getName(), watcher.getRoom(), watcher.getFile(), None) + self._roomManager.broadcast(watcher, l) + + def forcePositionUpdate(self, room, watcher, doSeek): + room = watcher.getRoom() + paused, position = room.isPaused(), watcher.getPosition() + setBy = watcher + l = lambda w: w.sendState(position, paused, doSeek, setBy, True) + self._roomManager.broadcastRoom(watcher, l) + + def getAllWatchersForUser(self, forUser): + return self._roomManager.getAllWatchersForUser(forUser) + +class RoomManager(object): + def __init__(self): + self._rooms = {} + + def broadcastRoom(self, sender, whatLambda): + room = sender.getRoom() + if room and room.getName() in self._rooms: + for receiver in room.getWatchers(): + whatLambda(receiver) + + def broadcast(self, sender, whatLambda): + for room in self._rooms.itervalues(): + for receiver in room.getWatchers(): + whatLambda(receiver) + + def getAllWatchersForUser(self, watcher): + watchers = [] + for room in self._rooms.itervalues(): + for watcher in room.getWatchers(): + watchers.append(watcher) + return watchers + + def moveWatcher(self, watcher, roomName): + self.removeWatcher(watcher) + room = self._getRoom(roomName) + room.addWatcher(watcher) + l = lambda w: w.sendSetting(watcher.getName(), watcher.getRoom(), None, None) + self.broadcast(watcher, l) + + def removeWatcher(self, watcher): + oldRoom = watcher.getRoom() + if(oldRoom): + oldRoom.removeWatcher(watcher) + self._deleteRoomIfEmpty(oldRoom) + + def _getRoom(self, roomName): + if roomName in self._rooms: + return self._rooms[roomName] else: - setter = min(self._rooms[watcher.room].values()) - self._roomStates[watcher.room]["position"] = setter.position - self._roomStates[watcher.room]["setBy"] = setter.name - self._roomStates[watcher.room]["lastUpdate"] = setter.lastUpdate + room = Room(roomName) + self._rooms[roomName] = room + return room - def updateWatcherState(self, watcherProtocol, position, paused, doSeek, messageAge): - watcher = self.getWatcher(watcherProtocol) - if(not watcher): - return - watcher.lastUpdate = time.time() - if(watcher.file): - oldPosition = self._roomStates[watcher.room]["position"] - pauseChanged = False - if(paused is not None): - pauseChanged = self.__updatePausedState(paused, watcher) - if(position is not None): - if(not paused): - position += messageAge - self.__updatePositionState(position, doSeek or pauseChanged, watcher) - forceUpdate = self.__shouldServerForceUpdateOnRoom(pauseChanged, doSeek) - if(forceUpdate): - l = lambda w: self.sendState(w, doSeek, forceUpdate) - self.broadcastRoom(watcher.watcherProtocol, l) + def _deleteRoomIfEmpty(self, room): + if room.isEmpty() and room.getName() in self._rooms: + del self._rooms[room.getName()] - def removeWatcher(self, watcherProtocol): - watcher = self.getWatcher(watcherProtocol) - if(not watcher): - return - l = lambda w: w.sendUserSetting(watcher.name, watcher.room, None, {"left": True}) - self.broadcast(watcherProtocol, l) - self._removeWatcherFromTheRoom(watcherProtocol) - watcher.deactivate() - self._deleteRoomIfEmpty(watcher.room) + def findFreeUsername(self, username): + allnames = [] + for room in self._rooms.itervalues(): + for watcher in room.getWatchers(): + allnames.append(watcher.getName().lower()) + while username.lower() in allnames: + username += '_' + return username - def watcherGetUsername(self, watcherProtocol): - return self.getWatcher(watcherProtocol).name - def watcherGetRoom(self, watcherProtocol): - return self.getWatcher(watcherProtocol).room - - def watcherSetRoom(self, watcherProtocol, room): - watcher = self._removeWatcherFromTheRoom(watcherProtocol) - if(not watcher): - return - watcher.resetStateTimer() - oldRoom = watcher.room - self._createRoomIfDoesntExist(room) - with self._roomUpdate: - self._rooms[room][watcherProtocol] = watcher - self._deleteRoomIfEmpty(oldRoom) - watcher.room = room - self.sendState(watcherProtocol, True) - l = lambda w: w.sendUserSetting(watcher.name, watcher.room, None, None) - self.broadcast(watcherProtocol, l) - - def watcherSetFile(self, watcherProtocol, file_): - watcher = self.getWatcher(watcherProtocol) - if(not watcher): - return - watcher.file = file_ - l = lambda w: w.sendUserSetting(watcher.name, watcher.room, watcher.file, None) - self.broadcast(watcherProtocol, l) - - def broadcastRoom(self, sender, what): - room = self._rooms[self.watcherGetRoom(sender)] - if(room): - with self._roomUpdate: - for receiver in room: - what(receiver) - - def broadcast(self, sender, what): - with self._roomUpdate: - for room in self._rooms.itervalues(): - for receiver in room: - what(receiver) - -class SyncIsolatedFactory(SyncFactory): +class PublicRoomManager(RoomManager): def broadcast(self, sender, what): self.broadcastRoom(sender, what) - def getAllWatchers(self, watcherProtocol): - room = self.getWatcher(watcherProtocol).room - if(self._rooms.has_key(room)): - return self._rooms[room] - else: - return {} + def getAllWatchersForUser(self, watcher): + room = sender.getRoom().getWatchers() - def watcherSetRoom(self, watcherProtocol, room): - watcher = self.getWatcher(watcherProtocol) + def moveWatcher(self, watcher, room): oldRoom = watcher.room - l = lambda w: w.sendUserSetting(watcher.name, oldRoom, None, {"left": True}) - self.broadcast(watcherProtocol, l) - SyncFactory.watcherSetRoom(self, watcherProtocol, room) - self.watcherSetFile(watcherProtocol, watcher.file) + l = lambda w: w.sendSetting(watcher.getName(), oldRoom, None, {"left": True}) + self.broadcast(watcher, l) + RoomManager.watcherSetRoom(self, watcher, room) + watcher.setFile(watcher.getFile()) + + +class Room(object): + STATE_PAUSED = 0 + STATE_PLAYING = 1 + + def __init__(self, name): + self._name = name + self._watchers = {} + self._playState = self.STATE_PAUSED + self._setBy = None + + def __str__(self, *args, **kwargs): + return self.getName() + + def getName(self): + return self._name + + def getPosition(self): + watcher = min(self._watchers.values()) + self._setBy = watcher + return watcher.getPosition() + + def setPaused(self, paused=STATE_PAUSED, setBy=None): + self._playState = paused + self._setBy = setBy + + def isPlaying(self): + return self._playState == self.STATE_PLAYING + + def isPaused(self): + return self._playState == self.STATE_PAUSED + + def getWatchers(self): + return self._watchers.values() + + def addWatcher(self, watcher): + self._watchers[watcher.getName()] = watcher + watcher.setRoom(self) + + def removeWatcher(self, watcher): + if(watcher.getName() not in self._watchers): + return + del self._watchers[watcher.getName()] + watcher.setRoom(None) + + def isEmpty(self): + return bool(self._watchers) + + def getSetBy(self): + return self._setBy class Watcher(object): - def __init__(self, factory, watcherProtocol, name, room): - self.factory = factory - self.watcherProtocol = watcherProtocol - self.name = name - self.room = room - self.file = None + def __init__(self, server, connector, name): + self._server = server + self._connector = connector + self._name = name + self._room = None + self._file = None + self._position = None + self._lastUpdatedOn = time.time() self._sendStateTimer = None - self.position = None - self.lastUpdate = time.time() + self._connector.setWatcher(self) + reactor.callLater(0.1, self._scheduleSendState) + + def setFile(self, file): + self._file = file + self._server.sendFileUpdate(self, file) + + def setRoom(self, room): + self._room = room + if room is None: + self._deactivateStateTimer() + else: + self._resetStateTimer() + self._askForStateUpdate(True, True) + + def getRoom(self): + return self._room + + def getName(self): + return self._name + + def getFile(self): + return self._file + + def getPosition(self): + if self._position is None: + return None + if self._room.isPlaying(): + timePassedSinceSet = time.time() - self._lastUpdatedOn + else: + timePassedSinceSet = 0 + return self._position + timePassedSinceSet + + def sendSetting(self, username, roomName, file_, event): + self._connector.sendUserSetting(username, roomName, file_, event) def __lt__(self, b): - if(self.position is None): + if self.getPosition() is None: return False - elif(b.position is None): + if b.getPosition is None: return True - else: - return self.position < b.position + return self.getPosition() < b.getPosition() - def getRoomPosition(self): - _, position = self.factory.getRoomPausedAndPosition(self.room) - return position - - def scheduleSendState(self): - self._sendStateTimer = task.LoopingCall(self.sendState) + def _scheduleSendState(self): + self._sendStateTimer = task.LoopingCall(self._askForStateUpdate) self._sendStateTimer.start(constants.SERVER_STATE_INTERVAL, True) - def sendState(self): - self.factory.sendState(self.watcherProtocol) + def _askForStateUpdate(self, doSeek=False, forcedUpdate=False): + self._server.sendState(self, doSeek, forcedUpdate) - def resetStateTimer(self): - if(self._sendStateTimer): + def _resetStateTimer(self): + if self._sendStateTimer: self._sendStateTimer.stop() self._sendStateTimer.start(constants.SERVER_STATE_INTERVAL) - def deactivate(self): + def _deactivateStateTimer(self): if(self._sendStateTimer): self._sendStateTimer.stop() + def sendState(self, position, paused, doSeek, setBy, forcedUpdate): + self._connector.sendState(position, paused, doSeek, setBy, forcedUpdate) + if time.time() - self._lastUpdatedOn > constants.PROTOCOL_TIMEOUT: + self._server.removeWatcher(self) + self._connector.drop() + + def __hasPauseChanged(self, paused): + return self._room.isPaused() and not paused or not self._room.isPaused() and paused + + def updateState(self, position, paused, doSeek, messageAge): + if(self._file): + oldPosition = self.getPosition() + pauseChanged = False + if(paused is not None): + pauseChanged = self.__hasPauseChanged(paused) + if pauseChanged: + self.getRoom().setPaused(Room.STATE_PAUSED if paused else Room.STATE_PLAYING, self) + if(position is not None): + if(not paused): + position += messageAge + self._position = position + self._lastUpdatedOn = time.time() + if(doSeek or pauseChanged): + self._server.forcePositionUpdate(self._room, self, doSeek) + + class ConfigurationGetter(object): def getConfiguration(self): self._prepareArgParser() diff --git a/syncplayServer.py b/syncplayServer.py index d59233a..e441a5f 100755 --- a/syncplayServer.py +++ b/syncplayServer.py @@ -7,13 +7,11 @@ import site from twisted.internet import reactor -from syncplay.server import SyncFactory, SyncIsolatedFactory, ConfigurationGetter +from syncplay.server import SyncFactory, ConfigurationGetter -argsGetter = ConfigurationGetter() -args = argsGetter.getConfiguration() +if __name__ == '__main__': + argsGetter = ConfigurationGetter() + args = argsGetter.getConfiguration() -if(not args.isolate_rooms): - reactor.listenTCP(int(args.port), SyncFactory(args.password, args.motd_file)) -else: - reactor.listenTCP(int(args.port), SyncIsolatedFactory(args.password, args.motd_file)) -reactor.run() + reactor.listenTCP(int(args.port), SyncFactory(args.password, args.motd_file, args.isolate_rooms)) + reactor.run()