#!/usr/bin/env python
#  subunit: extensions to python unittest to get test results from subprocesses.
#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
#
#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
#  license at the users choice. A copy of both licenses are available in the
#  project source as Apache-2.0 and BSD. You may not use this file except in
#  compliance with one of these two licences.
#  
#  Unless required by applicable law or agreed to in writing, software
#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
#  license you chose for the specific language governing permissions and
#  limitations under that license.
#

### The GTK progress bar __init__ function is derived from the pygtk tutorial:
# The PyGTK Tutorial is Copyright (C) 2001-2005 John Finlay.
# 
# The GTK Tutorial is Copyright (C) 1997 Ian Main.
# 
# Copyright (C) 1998-1999 Tony Gale.
# 
# Permission is granted to make and distribute verbatim copies of this manual
# provided the copyright notice and this permission notice are preserved on all
# copies.
# 
# Permission is granted to copy and distribute modified versions of this
# document under the conditions for verbatim copying, provided that this
# copyright notice is included exactly as in the original, and that the entire
# resulting derived work is distributed under the terms of a permission notice
# identical to this one.
# 
# Permission is granted to copy and distribute translations of this document
# into another language, under the above conditions for modified versions.
# 
# If you are intending to incorporate this document into a published work,
# please contact the maintainer, and we will make an effort to ensure that you
# have the most up to date information available.
# 
# There is no guarantee that this document lives up to its intended purpose.
# This is simply provided as a free resource. As such, the authors and
# maintainers of the information provided within can not make any guarantee
# that the information is even accurate.

"""Display a subunit stream in a gtk progress window."""

import sys
import unittest

import pygtk
pygtk.require('2.0')
import gtk, gtk.gdk, gobject

from subunit import (
    PROGRESS_POP,
    PROGRESS_PUSH,
    PROGRESS_SET,
    TestProtocolServer,
    )
from subunit.progress_model import  ProgressModel


class GTKTestResult(unittest.TestResult):

    def __init__(self):
        super(GTKTestResult, self).__init__()
        # Instance variables (in addition to TestResult)
        self.window = None
        self.run_label = None
        self.ok_label = None
        self.not_ok_label = None
        self.total_tests = None

        self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
        self.window.set_resizable(True)

        self.window.connect("destroy", gtk.main_quit)
        self.window.set_title("Tests...")
        self.window.set_border_width(0)

        vbox = gtk.VBox(False, 5)
        vbox.set_border_width(10)
        self.window.add(vbox)
        vbox.show()

        # Create a centering alignment object
        align = gtk.Alignment(0.5, 0.5, 0, 0)
        vbox.pack_start(align, False, False, 5)
        align.show()

        # Create the ProgressBar
        self.pbar = gtk.ProgressBar()
        align.add(self.pbar)
        self.pbar.set_text("Running")
        self.pbar.show()
        self.progress_model = ProgressModel()

        separator = gtk.HSeparator()
        vbox.pack_start(separator, False, False, 0)
        separator.show()

        # rows, columns, homogeneous
        table = gtk.Table(2, 3, False)
        vbox.pack_start(table, False, True, 0)
        table.show()
        # Show summary details about the run. Could use an expander.
        label = gtk.Label("Run:")
        table.attach(label, 0, 1, 1, 2, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        label.show()
        self.run_label = gtk.Label("N/A")
        table.attach(self.run_label, 1, 2, 1, 2, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        self.run_label.show()

        label = gtk.Label("OK:")
        table.attach(label, 0, 1, 2, 3, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        label.show()
        self.ok_label = gtk.Label("N/A")
        table.attach(self.ok_label, 1, 2, 2, 3, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        self.ok_label.show()

        label = gtk.Label("Not OK:")
        table.attach(label, 0, 1, 3, 4, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        label.show()
        self.not_ok_label = gtk.Label("N/A")
        table.attach(self.not_ok_label, 1, 2, 3, 4, gtk.EXPAND | gtk.FILL,
            gtk.EXPAND | gtk.FILL, 5, 5)
        self.not_ok_label.show()

        self.window.show()
        # For the demo.
        self.window.set_keep_above(True)
        self.window.present()

    def stopTest(self, test):
        super(GTKTestResult, self).stopTest(test)
        self.progress_model.advance()
        if self.progress_model.width() == 0:
            self.pbar.pulse()
        else:
            pos = self.progress_model.pos()
            width = self.progress_model.width()
            percentage = (pos / float(width))
            self.pbar.set_fraction(percentage)

    def stopTestRun(self):
        try:
            super(GTKTestResult, self).stopTestRun()
        except AttributeError:
            pass
        self.pbar.set_text('Finished')

    def addError(self, test, err):
        super(GTKTestResult, self).addError(test, err)
        self.update_counts()

    def addFailure(self, test, err):
        super(GTKTestResult, self).addFailure(test, err)
        self.update_counts()

    def addSuccess(self, test):
        super(GTKTestResult, self).addSuccess(test)
        self.update_counts()

    def addSkip(self, test, reason):
        # addSkip is new in Python 2.7/3.1
        addSkip = getattr(super(GTKTestResult, self), 'addSkip', None)
        if callable(addSkip):
            addSkip(test, reason)
        self.update_counts()

    def addExpectedFailure(self, test, err):
        # addExpectedFailure is new in Python 2.7/3.1
        addExpectedFailure = getattr(super(GTKTestResult, self),
            'addExpectedFailure', None)
        if callable(addExpectedFailure):
            addExpectedFailure(test, err)
        self.update_counts()

    def addUnexpectedSuccess(self, test):
        # addUnexpectedSuccess is new in Python 2.7/3.1
        addUnexpectedSuccess = getattr(super(GTKTestResult, self),
            'addUnexpectedSuccess', None)
        if callable(addUnexpectedSuccess):
            addUnexpectedSuccess(test)
        self.update_counts()

    def progress(self, offset, whence):
        if whence == PROGRESS_PUSH:
            self.progress_model.push()
        elif whence == PROGRESS_POP:
            self.progress_model.pop()
        elif whence == PROGRESS_SET:
            self.total_tests = offset
            self.progress_model.set_width(offset)
        else:
            self.total_tests += offset
            self.progress_model.adjust_width(offset)

    def time(self, a_datetime):
        # We don't try to estimate completion yet.
        pass

    def update_counts(self):
        self.run_label.set_text(str(self.testsRun))
        bad = len(self.failures + self.errors)
        self.ok_label.set_text(str(self.testsRun - bad))
        self.not_ok_label.set_text(str(bad))


class GIOProtocolTestCase(object):

    def __init__(self, stream, result, on_finish):
        self.stream = stream
        self.schedule_read()
        self.hup_id = gobject.io_add_watch(stream, gobject.IO_HUP, self.hup)
        self.protocol = TestProtocolServer(result)
        self.on_finish = on_finish

    def read(self, source, condition, all=False):
        #NB: \o/ actually blocks
        line = source.readline()
        if not line:
            self.protocol.lostConnection()
            self.on_finish()
            return False
        self.protocol.lineReceived(line)
        # schedule more IO shortly - if we say we're willing to do it
        # immediately we starve things.
        if not all:
            source_id = gobject.timeout_add(1, self.schedule_read)
            return False
        else:
            return True

    def schedule_read(self):
        self.read_id = gobject.io_add_watch(self.stream, gobject.IO_IN, self.read)

    def hup(self, source, condition):
        while self.read(source, condition, all=True): pass
        self.protocol.lostConnection()
        gobject.source_remove(self.read_id)
        self.on_finish()
        return False


result = GTKTestResult()
test = GIOProtocolTestCase(sys.stdin, result, result.stopTestRun)
gtk.main()
if result.wasSuccessful():
    exit_code = 0
else:
    exit_code = 1
sys.exit(exit_code)