package com.ianonavy.netflixsync import java.io._ import java.net.Socket import akka.actor.{ActorSystem, ActorRef, Props, Actor} case class AddWatcher(name: String) case class WatcherReady(name: String, socket: Socket) case class GetSize() case class SendMessage(message: String) case class CloseAll() /** * Simple actor representing a Netflix Synx room. * @param name */ class Room(name: String) extends Actor { var watchers: Map[String, Socket] = Map() implicit def inputStreamWrapper(in: InputStream) = new BufferedReader(new InputStreamReader(in)) implicit def outputStreamWrapper(out: OutputStream) = new PrintWriter(new OutputStreamWriter(out)) override def receive: Receive = { case AddWatcher(name) => watchers += (name -> null) case WatcherReady(name, socket) => watchers += (name -> socket) case GetSize() => sender() ! watchers.size case SendMessage(message) => for ((watcher, socket) <- watchers) { if (socket != null) { val out = new PrintStream(socket.getOutputStream()) out.println(message) out.flush() println(s"Server (to $watcher): $message") } } case CloseAll() => for ((watcher, socket) <- watchers) { if (socket != null) { socket.close() } } } } /** * Singleton object for static methods. Akka actors are meant to be * instantiated using props on an actor system, so we do that. */ object Room { def props(name: String): Props = Props(new Room(name)) } /** * Simple class for storing room actor references by name. */ class RoomRegistry(system: ActorSystem) { var allRooms = Map[String, ActorRef]() def getRoom(name: String): ActorRef = { if (allRooms.contains(name)) { return allRooms(name) } val newRoom = system.actorOf(Room.props(name)) allRooms += (name -> newRoom) newRoom } }