From f2f0a14b873f8b55ba6880b248a271d1de881b39 Mon Sep 17 00:00:00 2001 From: Ian Adam Naval Date: Fri, 31 Oct 2014 16:06:10 -0400 Subject: [PATCH] Initial commit --- python/jsh.py | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++ ruby/jsh.rb | 145 +++++++++++++++++++++++++++++++++++++ 2 files changed, 339 insertions(+) create mode 100644 python/jsh.py create mode 100644 ruby/jsh.rb diff --git a/python/jsh.py b/python/jsh.py new file mode 100644 index 0000000..e127c03 --- /dev/null +++ b/python/jsh.py @@ -0,0 +1,194 @@ +import json +import sys + + +class ComplexEncoder(json.JSONEncoder): + """Special JSON encoder that detects objects that implement the + JSONSerializable interface and invokes their to_json method.""" + + def default(self, obj): + if isinstance(obj, JSONSerializable): + return obj.to_json() + # Let the base class default method raise the TypeError + return json.JSONEncoder.default(self, obj) + + +class JSONSerializable(object): + """Interface for JSON-serializable classes""" + + def to_json(self): + raise NotImplementedError("Must implement 'to_json'") + + +class Counter(object): + """Wrapper for a count to be shared among substreams""" + + def __init__(self): + self.count = 0 + + def increment(self): + self.count += 1 + + def clear(self): + self.count = 0 + + +class Stream(JSONSerializable): + """A Stream manages printing serialized objects in a fixed-number + of chunks. Streams can have substreams that are keyed on a special + name. Substreams share a global counter for the number of items + that are printed out at once.""" + + def __init__(self, out, buf_size=10, counter=None, root=None): + if not counter: + counter = Counter() + if not root: + root = self + self.buf_size = buf_size + self.out = out + self.counter = counter + self.root = root + self.data = {} + self.substreams = {} + self.first = True + + def get_start(self): + return '[' + + def start(self): + """Marks the start of the Stream + + :param should_print: Whether it should write to self.out + :return: A value to be printed""" + self.out.write(self.get_start()) + + def flush(self): + """Serializes the current contents of the stream for output and + recursively flushes everything. + + Does nothing if it's not the root stream. + + :param should_print: Whether it should write to self.out + :return: A value to be printed""" + if self.root == self: + self.out.write(self.to_json()) + self.clear() + self.out.flush() + + def get_stop(self): + return ']' + + def stop(self): + """Marks the start of the Stream + + :param should_print: Whether it should write to self.out + :return: A value to be printed""" + self.flush() + self.out.write(self.get_stop()) + + def clear(self): + """Clears the shared counter and recursively clears all + substreams.""" + self.counter.clear() + self.data = {} + for substream in self.substreams.values(): + substream.clear() + + def output(self, stream_name, obj): + """Stores an object in the buffer. If the number of things + stored exceeds the global buffer size, automatically flush + the streams. + + :param stream_name: The key under which to store the stream + :param obj: The object""" + if stream_name not in self.data: + self.data[stream_name] = [] + self.data[stream_name].append(obj) + self.counter.increment() + if self.counter.count >= self.buf_size: + self.root.flush() + + def new_stream(self, name): + """Creates a substream with the same 'out' and 'buf_size'. + Copies the references to the counter and root node to the + substream, and stores a reference in the substreams dict. Also + invokes the "start" method without any side-effect printing. + """ + s = Stream(self.out, self.buf_size) + s.counter = self.counter + s.root = self.root + self.substreams[name] = s + return s + + def is_empty(self): + """Determine whether this substream and all substreams are + empty. + + :return: Whether this stream is empty""" + empty = self.data == {} + for substream in self.substreams.values(): + empty = empty and substream.is_empty() + return empty + + def add_comma_if_not_first(self, s): + """Adds a comma to the provided list 's' if it is not the first + thing to be printed. Otherwise, we set the flag. + + :param s: The list in which to add the comma""" + if self.first: + self.first = False + else: + s.append(',') + + def to_json(self): + """Converts the stream to JSON. Does not include the opening + and closing square brackets. If printing, you should include + those first. + + :return: JSON representation of the stream.""" + s = [] + # serialize streams + if self.data: + self.add_comma_if_not_first(s) + s.append(json.dumps(self.data, cls=ComplexEncoder)) + # recursively serialize substreams + for key, stream in self.substreams.items(): + if stream.is_empty(): + continue + self.add_comma_if_not_first(s) + s.append('{{"{}":'.format(key)) + s.append(stream.get_start()) + s.append(stream.to_json()) + s.append(stream.get_stop()) + s.append("}") + return ''.join(s) + + def __str__(self): + return self.to_json() + + def __repr__(self): + return str(self) + + +def main(): + s = Stream(sys.stdout, 4) + s.start() + + for i in range(7): + proc = {"pid": i + 1, "name": "init"} + s.output("things", proc) + q = s.new_stream('q') + q.output("test", 'potato') + + for i in range(10): + proc = {"pid": i + 1, "name": "init"} + s.output("processes", proc) + + q.output("test", 'salad') + q.output("test", 'rocks') + s.flush() + s.stop() + + +if __name__ == '__main__': + main() diff --git a/ruby/jsh.rb b/ruby/jsh.rb new file mode 100644 index 0000000..676dfa1 --- /dev/null +++ b/ruby/jsh.rb @@ -0,0 +1,145 @@ +require 'json' + +class Counter + def initialize + @count = 0 + end + + def increment + @count += 1 + end + + def clear + @count = 0 + end + + def count + @count + end +end + +class Stream + def initialize(out, buf_size = 10, counter = nil, root = self) + counter = Counter.new if counter.nil? + @buf_size = buf_size + @out = out + @counter = counter + @root = root + @data = {} + @substreams = {} + @first = true + end + + def start + '[' + end + + def start! + @out.write(start) + end + + def flush + if @root == self + r = to_json + clear + @out.write(r) + @out.flush + return r + end + end + + def stop + ']' + end + + def stop! + flush + @out.write(stop) + end + + def clear + @counter.clear + @data = {} + @substreams.values.each do | substream | + substream.clear + end + end + + def output(stream_name, obj) + if not @data.keys.include? stream_name + @data[stream_name] = [] + end + @data[stream_name].push(obj) + @counter.increment + if @counter.count >= @buf_size + @root.flush + end + end + + def new_stream(name) + s = Stream.new(@out, @buf_size, @counter, @root) + @substreams[name] = s + s + end + + def empty? + empty = @data == {} + @substreams.values.each do |substream| + empty = empty and substream.empty? + end + empty + end + + def add_comma_if_not_first(s) + if @first + @first = false + else + s.push(',') + end + end + + def to_json + s = [] + if not @data.empty? + add_comma_if_not_first(s) + s.push(@data.to_json) + end + @substreams.each do |key, stream| + if not stream.empty? + add_comma_if_not_first(s) + s.push("{#{key}:") + s.push(stream.start) + s.push(stream.to_json) + s.push(stream.stop) + s.push('}') + end + end + s.join '' + end +end + + +def main + s = Stream.new $stdout, 4 + s.start! + 7.times do |i| + proc = {:pid => i + 1, :name => "init"} + s.output(:things, proc) + end + q = s.new_stream 'q' + q.output(:test, 'potato') + + 10.times do |i| + proc = {:pid => i + 1, :name => "init"} + s.output(:processes, proc) + end + + q.output(:test, "salad") + q.output(:test, "rocks") + + s.flush + s.stop! +end + + +main \ No newline at end of file