From 3ec975d130280f202737ea9dfa7e44741ffea821 Mon Sep 17 00:00:00 2001 From: Ian Adam Naval Date: Sun, 27 Jul 2014 20:11:10 -0400 Subject: [PATCH] Initial commit. --- .gitignore | 4 + README.md | 4 + build.sbt | 9 + .../com/ianonavy/netflixsync/Client.scala | 197 ++++++++++++++++++ .../scala/com/ianonavy/netflixsync/Main.scala | 18 ++ .../com/ianonavy/netflixsync/MainPanel.scala | 175 ++++++++++++++++ .../scala/com/ianonavy/netflixsync/Room.scala | 82 ++++++++ .../com/ianonavy/netflixsync/Server.scala | 93 +++++++++ .../netflixsync/SocketEventPublisher.scala | 45 ++++ 9 files changed, 627 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 build.sbt create mode 100644 src/main/scala/com/ianonavy/netflixsync/Client.scala create mode 100644 src/main/scala/com/ianonavy/netflixsync/Main.scala create mode 100644 src/main/scala/com/ianonavy/netflixsync/MainPanel.scala create mode 100644 src/main/scala/com/ianonavy/netflixsync/Room.scala create mode 100644 src/main/scala/com/ianonavy/netflixsync/Server.scala create mode 100644 src/main/scala/com/ianonavy/netflixsync/SocketEventPublisher.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c0fda1e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +out/ +project/ +target/ +.idea/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..c1a3a60 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +netflix-sync +============ + +Hacked together tool to make long distance Netflix watching a tiny bit easier. diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..e7de47a --- /dev/null +++ b/build.sbt @@ -0,0 +1,9 @@ +name := "netflix-sync" + +version := "1.0" + +libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.1.0" % "test" + +libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-swing" % _) + +libraryDependencies += "com.typesafe.akka" % "akka-actor_2.10" % "2.3.4" diff --git a/src/main/scala/com/ianonavy/netflixsync/Client.scala b/src/main/scala/com/ianonavy/netflixsync/Client.scala new file mode 100644 index 0000000..418745f --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/Client.scala @@ -0,0 +1,197 @@ +package com.ianonavy.netflixsync + +import java.awt.Robot +import java.awt.event.{KeyEvent, InputEvent} +import java.net._ +import java.io._ +import scala.io._ +import scala.util.parsing.json.{JSON, JSONObject} + + +/** + * Client for interfacing with the Netflix sync server + * @param host Hostname of the server + * @param nTimes Number of messages to send to determine delay + */ +class Client(host: String, nTimes: Int) { + + /** + * Average delay to the server. Includes clock differences and network lag. + */ + var delay = 0.0 + + /** + * Runs a block of code and times it. + * @param block The block of code to run + * @tparam R The type that block returns + * @return A tuple of the number of nanoseconds passed and the results + */ + def time[R](block: => R): (Long, R) = { + val t0 = System.nanoTime() + val result = block + val t1 = System.nanoTime() + + (t1 - t0, result) + } + + /** + * Clicks in a particular set of coordinates after waiting a certain number + * of milliseconds. + * @param delay The delay in milliseconds to wait before clicking + * @param x The x coordinate to click + * @param y The y coordinate to click + */ + def synchronizedClick(delay: Long, x: Int, y: Int) { + val robot = new Robot + Thread.sleep(delay) + robot.mouseMove(x, y) + robot.mousePress(InputEvent.BUTTON1_MASK) + robot.mouseRelease(InputEvent.BUTTON1_MASK) + } + + /** + * Simulates a spacebar press after waiting a certain number of milliseconds. + * @param delay The delay in milliseconds to wait before pressing space + */ + def synchronizedSpacebar(delay: Long) { + val robot = new Robot + Thread.sleep(delay) + robot.keyPress(KeyEvent.VK_SPACE) + robot.keyRelease(KeyEvent.VK_SPACE) + } + + /** + * Sends an object to the server and returns the results as a string. + * @param obj Map corresponding to the JSON object to send. Should contain + * the key "command" + * @param close Whether to close the socket after sending the command + * @return The reply from the server + */ + def sendCommand(obj: Map[String, Any], close: Boolean): String = { + val s = new Socket(InetAddress.getByName(host), 9999) + lazy val in = new BufferedSource(s.getInputStream).getLines() + val out = new PrintStream(s.getOutputStream) + out.println(new JSONObject(obj)) + out.flush() + + val results = in.next().toString + s.close() + results + } + + /** + * Sends an object to the server and returns the results as a string. By + * default closes the socket after sending the command. + * @param obj Map corresponding to the JSON object to send. Should contain + * the key "command" + * @return The reply from the server + */ + def sendCommand(obj: Map[String, Any]): String = { + sendCommand(obj, close = true) + } + + /** + * Reports the current system time in milliseconds to the server and + * retrieves the difference from the server's time. Returns a positive integer + * if the server's clock is ahead. + * @return The number of milliseconds that the server's clock is ahead + */ + def getDelayFromServer: String = { + val cmd = Map( + "command" -> "GET DELAY", + "time" -> System.currentTimeMillis.toString) + sendCommand(cmd) + } + + /** + * Gets the number of users registered to a particular room. + * @param roomName The name of the room. + * @return The number of registered users in the given room + */ + def getNumberRegistered(roomName: String): String = { + val cmd = Map( + "command" -> "GET NUM REGISTERED", + "room" -> roomName) + sendCommand(cmd) + } + + /** + * Registers a name to the given room. + * @param roomName The name of the room in which to register + * @param watcherName The name of the watcher to register + * @return Whether the watcher was the first one in the room (aka the master) + */ + def registerName(roomName: String, watcherName: String): Boolean = { + val cmd = Map( + "command" -> "REGISTER", + "room" -> roomName, + "name" -> watcherName) + val res = JSON.parseFull(sendCommand(cmd)) + val isMaster = res match { + case Some(m: Map[String, Any]) => m("isMaster") match { + case isMaster: Boolean => isMaster + } + } + isMaster + } + + /** + * Instructs the server to store the socket for a given room name and + * watcher name. + * @param roomName The name of the relevant room + * @param watcherName The name of the watcher whose socket we're setting + * @return + */ + def setSocket(roomName: String, watcherName: String) = { + val cmd = Map( + "command" -> "SET SOCKET", + "room" -> roomName, + "name" -> watcherName) + sendCommand(cmd, close = false) + } + + /** + * Instructs the server to push a "CLICK" message to all clients with set + * sockets. + * @param roomName The name of the relevant room + */ + def sendClick(roomName: String) { + val cmd = Map( + "command" -> "CLICK", + "room" -> roomName) + sendCommand(cmd) + } + + /** + * Instructs the server to push a "SPACEBAR" message to all clients with set + * sockets. + * @param roomName The name of the relevant room + */ + def sendSpacebar(roomName: String) { + val cmd = Map( + "command" -> "SPACEBAR", + "room" -> roomName) + sendCommand(cmd) + } + + /** + * Sets the 'delay' attribute by polling the server for the offset between + * its clock and the client's and timing how long that roundtrip request + * takes in milliseconds. + */ + def calculateAverageTime() { + val times = new Array[Long](nTimes) + val delays = new Array[Long](nTimes) + + for (i <- 0 to nTimes - 1) { + val (elapsed, delay) = time { + getDelayFromServer + } + times(i) = elapsed + delays(i) = delay.toLong + } + val avgOffset = times.sum / times.length / 1000000.0 + val avgDelay = delays.sum / delays.length / 1000000.0 + delay = avgOffset + avgDelay + } +} diff --git a/src/main/scala/com/ianonavy/netflixsync/Main.scala b/src/main/scala/com/ianonavy/netflixsync/Main.scala new file mode 100644 index 0000000..c748e5e --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/Main.scala @@ -0,0 +1,18 @@ +package com.ianonavy.netflixsync + +import scala.swing._ + + +/** + * Main Swing instance + */ +object Main extends SimpleSwingApplication { + + def top = new MainFrame { + preferredSize = new Dimension(196, 196) + title = "Netflix Sync" + resizable = false + contents = new MainPanel + } + +} \ No newline at end of file diff --git a/src/main/scala/com/ianonavy/netflixsync/MainPanel.scala b/src/main/scala/com/ianonavy/netflixsync/MainPanel.scala new file mode 100644 index 0000000..77edc2f --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/MainPanel.scala @@ -0,0 +1,175 @@ +package com.ianonavy.netflixsync + +import scala.swing.GridBagPanel.Fill +import scala.swing._ +import scala.swing.event.ButtonClicked + + +/** + * Main swing panel for the main frame. + */ +class MainPanel extends FlowPanel { + var socketEventPublisher: SocketEventPublisher = null + var client: Client = null + + val SYNC_LABEL_DEFAULT_TEXT = "Please place me in the middle of your " + + "Netflix window" + + val serverField = new TextField { + text = "localhost" + } + val roomField = new TextField + val nameField = new TextField + + val registerButton = new Button { + text = "Register" + } + val setLocationButton = new Button { + text = "Set Location" + } + val setLocationLabel = new TextArea { + text = SYNC_LABEL_DEFAULT_TEXT + editable = false + opaque = false + focusable = false + cursor = null + lineWrap = true + wordWrap = true + } + + /** + * @return A panel containing the elements relevant to registration + */ + def registerPanel: Panel = new BoxPanel(Orientation.Vertical) { + contents += new Label("Server:") + contents += serverField + contents += new Label("Room:") + contents += roomField + contents += new Label("Name:") + contents += nameField + contents += registerButton + border = Swing.EmptyBorder(30, 30, 30, 30) + } + + /** + * @return A panel used for setting the location of the Netflix window + */ + def setLocationPanel: Panel = new GridBagPanel() { + val c = new Constraints + c.fill = Fill.Vertical + c.gridx = 0 + c.gridy = 0 + layout(setLocationLabel) = c + + c.gridx = 0 + c.gridy = 1 + c.insets = new Insets(32, 0, 0, 0) + layout(setLocationButton) = c + border = Swing.EmptyBorder(30, 30, 30, 30) + } + + /** + * The master control panel for a particular room. Shown only to the first + * user who registeres into a room. + * @param room The room to control + * @return The master control panel + */ + def masterPanel(room: String): Panel = new BoxPanel(Orientation.Vertical) { + val label = new Label { + text = "0 people have joined the room." + } + val clickButton = new Button { + text = "Send Click" + } + val spacebarButton = new Button { + text = "Send Spacebar" + } + val refreshButton = new Button { + text = "Refresh Count" + } + + contents += label + contents += clickButton + contents += spacebarButton + contents += refreshButton + border = Swing.EmptyBorder(30, 30, 30, 30) + + listenTo(clickButton) + listenTo(spacebarButton) + listenTo(refreshButton) + reactions += { + case ButtonClicked(`clickButton`) => + label.text = "Sending..." + client.sendClick(room) + case ButtonClicked(`spacebarButton`) => + label.text = "Sending..." + client.sendSpacebar(room) + case ButtonClicked(`refreshButton`) => + val numPeople = client.getNumberRegistered(room) + label.text = s"$numPeople people have joined the room." + } + } + + /** + * Frame for the master panel + * @param room The room to control + * @return The master frame + */ + def masterFrame(room: String) = new MainFrame { + title = "Netflix Sync" + contents = masterPanel(room) + } + + /** + * Registers a user to a client at a particular server based on the results + * of the form elements usually contained in the registerPanel. Starts the + * socket event publisher, configures the Netflix Sync client and + * optionally spawns the master frame. + */ + def register() { + client = new Client(serverField.text, 10) + client.calculateAverageTime() + + val room = roomField.text + val watcherName = nameField.text + contents.clear() + contents += setLocationPanel + + revalidate() + repaint() + + socketEventPublisher = new SocketEventPublisher(client, room, watcherName) + listenTo(socketEventPublisher) + + val isMaster = client.registerName(roomField.text, nameField.text) + if (isMaster) { + masterFrame(room).open() + } + } + + contents += registerPanel + + listenTo(registerButton) + listenTo(setLocationButton) + + // Netflix location + var (x, y) = (0, 0) + + reactions += { + case ButtonClicked(`registerButton`) => + register() + case ButtonClicked(`setLocationButton`) => + socketEventPublisher.startPublishing() + val myLocation = self.getLocationOnScreen + x = myLocation.getX.toInt - 10 + y = myLocation.getY.toInt - 10 + setLocationLabel.text = "Netflix location set. You may now minimize this window." + case ServerSaidClick(delay) => + client.synchronizedClick(delay.toLong, x, y) + setLocationLabel.text = SYNC_LABEL_DEFAULT_TEXT + case ServerSaidSpacebar(delay) => + client.synchronizedClick(delay.toLong, x, y) + client.synchronizedSpacebar(delay.toLong) + setLocationLabel.text = SYNC_LABEL_DEFAULT_TEXT + } +} diff --git a/src/main/scala/com/ianonavy/netflixsync/Room.scala b/src/main/scala/com/ianonavy/netflixsync/Room.scala new file mode 100644 index 0000000..291b41d --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/Room.scala @@ -0,0 +1,82 @@ +package com.ianonavy.netflixsync + +import java.io._ +import java.net.Socket + +import akka.actor.{ActorSystem, ActorRef, Props, Actor} + + +case class AddWatcher(name: String) + +case class WatcherReady(name: String, socket: Socket) + +case class GetSize() + +case class SendMessage(message: String) + +case class CloseAll() + + +/** + * Simple actor representing a Netflix Synx room. + * @param name + */ +class Room(name: String) extends Actor { + + var watchers: Map[String, Socket] = Map() + + implicit def inputStreamWrapper(in: InputStream) = + new BufferedReader(new InputStreamReader(in)) + + implicit def outputStreamWrapper(out: OutputStream) = + new PrintWriter(new OutputStreamWriter(out)) + + override def receive: Receive = { + case AddWatcher(name) => + watchers += (name -> null) + case WatcherReady(name, socket) => + watchers += (name -> socket) + case GetSize() => + sender() ! watchers.size + case SendMessage(message) => + for ((watcher, socket) <- watchers) { + if (socket != null) { + val out = new PrintStream(socket.getOutputStream()) + out.println(message) + out.flush() + println(s"Server (to $watcher): $message") + } + } + case CloseAll() => + for ((watcher, socket) <- watchers) { + if (socket != null) { + socket.close() + } + } + } +} + +/** + * Singleton object for static methods. Akka actors are meant to be + * instantiated using props on an actor system, so we do that. + */ +object Room { + def props(name: String): Props = Props(new Room(name)) +} + + +/** + * Simple class for storing room actor references by name. + */ +class RoomRegistry(system: ActorSystem) { + var allRooms = Map[String, ActorRef]() + + def getRoom(name: String): ActorRef = { + if (allRooms.contains(name)) { + return allRooms(name) + } + val newRoom = system.actorOf(Room.props(name)) + allRooms += (name -> newRoom) + newRoom + } +} \ No newline at end of file diff --git a/src/main/scala/com/ianonavy/netflixsync/Server.scala b/src/main/scala/com/ianonavy/netflixsync/Server.scala new file mode 100644 index 0000000..dbadce9 --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/Server.scala @@ -0,0 +1,93 @@ +package com.ianonavy.netflixsync + +import java.net._ +import java.io._ +import akka.actor.{ActorRef, ActorSystem} +import akka.pattern.ask +import akka.util.Timeout + +import scala.concurrent.Await +import scala.io._ +import scala.util.parsing.json.{JSONObject, JSON} +import scala.concurrent.duration._ + + +/** + * The Netflix Sync server is responsible for allowing clients to register names + * for a particular room. When a client registers and sets their socket, + * the socket is stored for a particular room and client name. Clients are + * also known as "watchers." The first client to register for a room is + * designated the master. When the master sends a message to the server, + * the message is echoed and propagated to all other clients. The server, + * however, does not check that the master client was indeed the first one to + * register. Rather, it returns whether or not the registering client is the + * first client upon registration. + */ +object Server { + + val system = ActorSystem("system") + val roomRegistry = new RoomRegistry(system) + + def delayFromClient(clientTime: Long): Long = { + System.currentTimeMillis - clientTime + } + + /** + * Main loop for the Netflix sync server. Delegates matched commands to + * other functions. + */ + def main(a: Array[String]) { + val server = new ServerSocket(9999) + println("Listening on TCP :::9999") + implicit val timeout = Timeout(5 seconds) + + while (true) { + val s = server.accept() + val in = new BufferedSource(s.getInputStream).getLines() + val out = new PrintStream(s.getOutputStream) + + var shouldClose = true + try { + val next = in.next().toString + + println("Client: " + next) + + val command = JSON.parseFull(next) + println(roomRegistry.allRooms) + + val res = command match { + case Some(m: Map[String, Any]) => m("command") match { + case "GET DELAY" => + delayFromClient(m("time").toString.toLong) + case "GET NUM REGISTERED" => + val room = roomRegistry.getRoom(m("room").toString) + Await.result(room ? GetSize(), timeout.duration).asInstanceOf[Int] + case "REGISTER" => + val room = roomRegistry.getRoom(m("room").toString) + room ! AddWatcher(m("name").toString) + val isMaster = Await.result(room ? GetSize(), timeout.duration).asInstanceOf[Int] == 1 + JSONObject(Map("isMaster" -> isMaster)) + case "SET SOCKET" => + val room = roomRegistry.getRoom(m("room").toString) + room ! WatcherReady(m("name").toString, s) + shouldClose = false + case s: String => + val room = roomRegistry.getRoom(m("room").toString) + room ! SendMessage(s) + "OK" + } + } + if (shouldClose) { + println("Server: " + res) + out.println(res.toString) + out.flush() + s.close() + } + } catch { + case e: Exception => + e.printStackTrace() + s.close() + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/ianonavy/netflixsync/SocketEventPublisher.scala b/src/main/scala/com/ianonavy/netflixsync/SocketEventPublisher.scala new file mode 100644 index 0000000..fbacbdf --- /dev/null +++ b/src/main/scala/com/ianonavy/netflixsync/SocketEventPublisher.scala @@ -0,0 +1,45 @@ +package com.ianonavy.netflixsync + +import scala.swing.Publisher +import scala.swing.event.Event + + +case class ServerSaidClick(delay: Double) extends Event + +case class ServerSaidSpacebar(delay: Double) extends Event + + +/** + * Simple Swing event publisher that connects to the client and waits on a + * socket. Once the server responds, it publishes the appropriate Swing event. + * @param client The Netflix sync client object + * @param roomName The name of the room we registered for + * @param watcherName The user's registered name + */ +class SocketEventPublisher(client: Client, roomName: String, + watcherName: String) extends Publisher { + /** + * Starts the background thread to publish events. Publishes both a + * ServerSaidClick and ServerSaidSpacebar message when the server pushes a + * SPACEBAR message. Publishes only a ServerSaidClick message when the + * server pushes a CLICK message. In either case, the messages are passed + * the delay in milliseconds to wait. This delay is calculated by the + * client upon registration. + */ + def startPublishing() { + val thread = new Thread(new Runnable { + def run() { + while (true) { + client.setSocket(roomName, watcherName) match { + case "SPACEBAR" => + publish(ServerSaidClick(client.delay)) + publish(ServerSaidSpacebar(client.delay)) + case "CLICK" => + publish(ServerSaidClick(client.delay)) + } + } + } + }) + thread.start() + } +} \ No newline at end of file