Blogs

Gary's Blog

March 2010 - Posts

  • Using the Gauge Control to Indicate Process Status

         

    In my last post I showed you how to create a process that will read from the Twitter firehose and post to an MSMQ queue, where another process can read from it and process a database. Well that’s fine in and of itself, but there is a slight wrinkle in as much as that process will stop from time to time for various reasons – Twitter even give a list of things that will cause them to close the connection from their end, thus stopping the process. Now if you are planning to sit around all day just watching that process then that’s fine but, if like me, you have better things to do with your time, then you really need a small application to watch over the process and restart it if it stops. That’s what we are going to look today.

    So, let’s create a little GUI app with two buttons, one to start the process and one to end it – we’ll include the DX gauge as well, so we can instantly see the state of our process. The form will look like this:

    image

    As you can see, there’s a little sad face to indicate that the process is not currently running.

    The first thing we have to do now is to define an enum so that we can hide the “magic numbers” for the stateIndex on the gauge:

    public enum State
    {
        Started = 0,
        Starting = 2,
        Stopped = 4
    }

    We’ll also need a backing store for a restart flag and the process that we are watching:

    //GS - Create a flag to show if we want to restart on process stop
    private bool restartFlag = true;
    
    //GS - Backing store for our process
    private Process process = null;

    Now when the “Start Process” button is pressed, we want to run the following code:

    private void button1_Click(object sender, EventArgs e)
    {
        //GS - Start the process on anther thread so the GIU stays responsive
        ThreadPool.QueueUserWorkItem(StartProc);
        
    }

    Which will start the process thus:

    private void StartProc(object stateInfo)
    {
        //GS - If we are not starting/restarting then we're done
        if (!restartFlag)
            return;
    
        //GS - If we've already got a process just restart it
        if (process != null)
        {
            process.Start();
            ThreadPool.QueueUserWorkItem(WatchProcess); 
            return;
        }
    
        //GS - Create a process to start Powershell
        stateIndicatorComponent1.StateIndex = (int)State.Starting;
        ProcessStartInfo si = new ProcessStartInfo();
        si.UseShellExecute = false;
        si.FileName = "powershell";
        Process p = new Process();
        p.StartInfo = si;
        process = p;
    
        //GS - Update the indicator to show the process has started
        stateIndicatorComponent1.StateIndex = (int)State.Started;
        
        //GS - Start the process 
        process.Start();
    
        //GS - Monitor the process on a seperate thread
        ThreadPool.QueueUserWorkItem(WatchProcess);            
    }

    Here, as you an see, we are starting an instance of Powershell, just for an example. This code is well commented and is very similar to that shown in the last post so we wont say anymore about it. As you can see though, once the process is started another thread is spun off to watch the process:

    private void WatchProcess(object stateInfo)
    {
        //GS - If there is no process to watch then we're done
        if (process == null)
            return;
    
        //GS - When the process exits...
        while (!process.HasExited) { };
    
        //GS - Check if I'm supposed to restart it..
        if (restartFlag)
            //GS - If so, start it.
            ThreadPool.QueueUserWorkItem(StartProc);
    }

    As you can see, the watcher does nothing whilst the process is running, but once it has excited it checks to see if the restart flag is set to true, and if it is then the process is restarted:

    image

    And our watcher application shows a smiley face to indicate that the process is running:

    image

    Of course at some point we are going to want to stop the process and not have it immediately restart. When the “Stop Process” button is pressed, the following code will achieve that:

    private void button2_Click(object sender, EventArgs e)
    {
        //GS - Set the flag so that we don't restart the process 
        restartFlag = false;
    
        //GS - Stop the process and indicate that it's stopped
        process.Kill();
        stateIndicatorComponent1.StateIndex = (int)State.Stopped;
    } 
    And there you have it, a small application to monitor your processes and have the automatically restart when they stop. Until next time, happy coding! :-)
  • How to Drink at the Twitter Firehose and not Drown

         

    Recently Twitter made the firehose feed available to the public via their API. Well I say that, they’ve made a 5% sample available to everyone, if you want anything above that then you have to apply directly to Twitter. But hey, 1 tweet in 20 is enough to be going on with, it still works out at around 25 tweets per second. :-)

    So let’s say we want to consume this feed how are we going to do it? Well first thing we have to do is to access the feed URL and make the connection, and handle all the HTTP stuff and… Meh, yeah that sounds boring to me, how about you? Thankfully we don’t have to do all that as it’s already been done for us by the nice guys at the cURL project. Go to the site and download it and we’ll have a nice little command line tool that is going to do all that HTTP stuff for us.

    After installing the tool and adding it’s home directory to your path, open a command window and type:

    curl http://stream.twitter.com/1/statuses/sample.json –uYourTwitterUserName:YourTwitterPassword

    and you’ll see all the tweets passing by in JSON format, like so:

    image

    Why JSON format? Well we could have asked for the feed in XML but there are two reasons why we didn’t. One, JSON is much more compact and bandwidth costs money and two, Twitter are considering XML for deprication from the streaming API so it’s best that we get used to using JSON from the start.

    Well so far so good, we’ve hooked up to the stream and we can see the tweets flying past but that doesn’t really do us any good. What we need to do is to consume this feed and we need to do it in such a way as we can keep up with the rate at which Twitter sends the information down the line. We really need a two part solution to this problem, firstly we need to read the feed and secondly we need to process the tweets from the feed. These two parts need to be completely decoupled.

    Let’s deal with the reading part first. What we need to do is to wrap the cURL tool and then post the raw JSON onto a queue, what happens after that we don’t need to worry about at the moment. We just need to keep pace with the flow from Twitter or they will cut our connection. We’ll do that with the following code:

    using System;
    using System.Diagnostics;
    using System.IO;
    using System.Messaging;
    
    class Program
    {
        static void Main()
        {
            //GS - Create a curl process so we can piggy back on its capabilities
            ProcessStartInfo curl = new ProcessStartInfo();
            curl.FileName = @"c:\program files\curl\curl.exe";
            curl.Arguments = http://stream.twitter.com/1/statuses/sample.json –uYourTwitterUID:YourTwitterPassword;
            curl.UseShellExecute = false;
            curl.RedirectStandardOutput = true;
    
            //GS - Get the message queue, create it if required
            MessageQueue q = 
                (MessageQueue.Exists(@".\private$\TwitterQueue")) ?
                new MessageQueue(@".\private$\TwitterQueue") :
                MessageQueue.Create(@".\private$\TwitterQueue");
    
            //GS - Start curl process
            using (Process process = Process.Start(curl))
            {
                using (StreamReader reader = process.StandardOutput)
                {
                    //GS - Post the output from curl to the queue.
                    //One line = one tweet in json format.
                    while (!reader.EndOfStream)
                    {
                        string result = reader.ReadLine();
                        Message message = new Message(result);
                        q.Send(message);
                    }               
                }
            }
        }
    }

    That code is pretty simple and well commented, so we wont say anymore about it. As you can see, it simply takes the output from cURL and posts it onto a MSMQ queue running on my local machine. Let’s run that for 20 seconds and see what happens:

    image

    We can see that running this script for ~20 seconds get’s us around half a meg of data from Twitter and results in about 1,000 messages being posted to our queue:

    image

    The next thing we are going to do is to consume this queue. We’ll have two issues in doing that. Firstly, we will have to process quickly or the queue will grow out of hand over time, and secondly, the messages have the tweet information in JSON format, and there is little or no JSON support in the .Net framework at this time. We’ll solve the first problem by reading the messages in and then processing them on a separate thread for speed and we’ll solve the second problem by downloading and using JSON.Net a third party library that handles all that JSON gloop very well. The code to consume the queue and store the tweet information in a database, for later reporting, looks like this:

    using System;
    using System.Messaging;
    using Newtonsoft.Json.Linq;
    using Newtonsoft.Json;
    using System.Data.SqlClient;
    using System.Data;
    using System.Threading;
    
    namespace Reader
    {
        class Program
        {
            static void Main()
            {
                //GS - Get the queue
                MessageQueue q;
                if(MessageQueue.Exists(@".\private$\TwitterQueue")) 
                {
                    q = new MessageQueue(@".\private$\TwitterQueue");
                }
                else
                {
                    //GS - If there is no queue then we're done here
                    Console.WriteLine("Queue has not been created!");
                    return;
                }
    
                //GS - Consume the queue
                while (true)
                {
                    //GS - Try and pull back the next message
                    Message message;
                    try
                    {
                        message = q.Receive();
                        message.Formatter =
                            new XmlMessageFormatter(new String[] 
                                { "System.String" });
    
                        //GS - Multi-thread for speed. Process the message on the 
                        //next available thread
                        ThreadPool.QueueUserWorkItem(ProcessMessage, message);
                    }
                    //GS - Any errors, skip and go to the next message
                    catch { continue; }
                }
            }
            private static void ProcessMessage(object stateInfo)
            {
                //GS - Try and process the message...
                try
                {
                    //GS - Turn stateInfo back into a message
                    Message message = stateInfo as Message;
    
                    //GS - Then turn the message into a JSON object
                    JObject obj = JObject.Parse(message.Body.ToString());
    
                    //GS - We don't want all the user info just the id and name
                    string userId = obj["user"]["id"].ToString();
                    string userName = obj["user"]["screen_name"].ToString();
                    obj.Remove("user");
    
                    //GS - Turn it into a CLR object. This is not strictly required 
                    //but the serializer will handle all of that nasty string or 
                    //null stuff for us and I'm lazy :-)
                    Tweet tweet =
                        new JsonSerializer().Deserialize(
                        new JTokenReader(obj),
                        typeof(Tweet)) as Tweet;
    
                    //GS - Add back the user id and screen name
                    tweet.User = userId;
                    tweet.ScreenName = userName;
    
                    //GS - Persist the tweet
                    using (SqlCommand command = new SqlCommand())
                    {
                        command.CommandType = CommandType.StoredProcedure;
                        command.Connection = 
                            new SqlConnection("Data Source=.;Initial Catalog=" +
                                "TwitterFirehose;Integrated Security=True");
    
                        command.CommandText = "InsertTweet";
                        command.Parameters.AddWithValue("Contributors", 
                            tweet.Contributors);
    
                        command.Parameters.AddWithValue("Coordinates", 
                            tweet.Coordinates);
    
                        command.Parameters.AddWithValue("CreatedAt", 
                            tweet.Created_at);
    
                        command.Parameters.AddWithValue("Favourited", 
                            tweet.Favorited);
    
                        command.Parameters.AddWithValue("Geo", tweet.Geo);
                        command.Parameters.AddWithValue("Id", tweet.Id);
                        command.Parameters.AddWithValue("InReplyToScreenName", 
                            tweet.In_reply_to_screen_name);
    
                        command.Parameters.AddWithValue("InReplyToStatus", 
                            tweet.In_reply_to_status_id);
    
                        command.Parameters.AddWithValue("InReplyToUserId", 
                            tweet.In_reply_to_user_id);
    
                        command.Parameters.AddWithValue("Place", tweet.Place);
                        command.Parameters.AddWithValue("Source", tweet.Source);
                        command.Parameters.AddWithValue("Text", tweet.Text);
                        command.Parameters.AddWithValue("Truncated", 
                            tweet.Truncated);
    
                        command.Parameters.AddWithValue("User", tweet.User);
                        command.Parameters.AddWithValue("ScreenName", 
                            tweet.ScreenName);
    
                        command.Connection.Open();
                        command.ExecuteNonQuery();
                        command.Connection.Close();
                    }
                }
                //GS - Twitter will throw "keep alive" chars and delete requests 
                //down the wire so we wont be able to process every message. If 
                //there is an error processing a message, then we're done here.
                catch { return; }
            }
        }
    }

    This seems to work very well, in fact in tests this script consumes the queue faster than the first script can fill it, so we can be confident that we can run these two scripts and not run out of resources. Running these scripts for a couple of minutes garners us around 10,000 tweets in the database. With that information we can start to look at things like the most popular client for Twitter:

    image

    (Sample size: 10,116)

    Now that you know how to do this with the sample feed, you can experiment on your own with the filter feed and start to chart things that are important for your organisation; things like who posts the most about your product, are the influencers in your market taking about you, if so are they talking about you more or less than last month etc. Also, you will will notice that we capture geo information too, so if you combine this post with my last post you can see that you can start to get a physical picture of where in the world your most vocal customers are. If you filter for reputation (whether posts are negative or positive) you might be able to see a problem with a particular office as your map may show a cluster of negative comments around a particular city. The possibilities are endless and I’ll leave you to experiment on your own, but rest assured I am archiving all Twitter mentions of DevExpress and our products, so remember to say nice things about us. :-)

    I’ll leave you with this pre launch checklist from Twitter that will help you consume the feed and not fall foul of their connection police:

    Pre-Launch Checklist

    1. Not purposefully attempting to circumvent access limits and levels?
    2. Creating the minimal number of connections?
    3. Avoiding duplicate logins?
    4. Backing off from failures: none for first disconnect, seconds for repeated network (TCP/IP) level issues, minutes for repeated HTTP (4XX codes)?
    5. Using long-lived connections?
    6. Tolerant of other objects and newlines in markup stream? (Non <status> objects...)
    7. Tolerant of duplicate messages?
    8. Using JSON if at all possible?

    Until next time, happy coding!

  • XPO, Bing, Twitter and the TARDIS

         

    As you may know, Twitter recently introduced geocoding into their tweets, so now when you tweet, if you do so from a device with GPS capabilities, your position is recorded along with your post. That got me thinking… since I travel around a lot, I wonder if I can map my position on Bing maps.

    First I thought I could just import my twitter stream as Bing maps says it supports the <geo:point> element, but this didn’t work. I think that may be down to some XSS protection built into Bing maps, though I’m not 100% certain as I didn’t bother to delve into it after I found that it didn’t work. Mainly because it’s not the best solution due to the fact that Twitter ages your posts and eventually they will “fall off the end”. So if you want to be able to record your locations (via your tweets) for a whole year say, then you are going to have to store that information yourself, and that’s what we are going to look at in this post.

    So first thing we need is a class to hold the information we want to store, which is Date (including the time) as well as latitude and longitude, kind of the Time And Relative Dimension In Space. Now, I wonder what we could call such a class… :-) Anyway, the class looks like this:

    using System;
    using DevExpress.Xpo;
    
    namespace sharedModel
    {
        public class Tardis : XPObject
        {
            public Tardis(Session session) : base(session) { }
    
            private DateTime date;
            public DateTime Date
            {
                get
                {
                    return date;
                }
                set
                {
                    SetPropertyValue("Date", ref date, value);
                }
            }
    
            private decimal lat;
            public decimal Lat
            {
                get
                {
                    return lat;
                }
                set
                {
                    SetPropertyValue("Lat", ref lat, value);
                }
            }
    
            private decimal @long;
            public decimal Long
            {
                get
                {
                    return @long;
                }
                set
                {
                    SetPropertyValue("Long", ref @long, value);
                }
            }
    
            private long status_Id;
            public long Status_Id
            {
                get
                {
                    return status_Id;
                }
                set
                {
                    SetPropertyValue("Status_Id", ref status_Id, value);
                }
            }
        }
    }

    There isn’t anything here that you’ve not seen before so we’ll say no more about it, other than we’ll have to put it in a DLL project as we want to share it between a console and a web project that we are going to go ahead and create now.

    Firstly the console project, which is going to fetch and store our most recent tweets:

    using System;
    using System.Linq;
    using System.Xml.Linq;
    using DevExpress.Xpo;
    using sharedModel;
    
    namespace LocationFetcher
    {
        class Program
        {
            static void Main(string[] args)
            {
                //GS - Specify the SQL Server DB we want to use
                XpoDefault.Session.ConnectionString =
                    @"data source=.;integrated security=true;" + 
                        "initial catalog=TweetLocations;";
    
                //GS - Use a linq query to get that last status_id recorded
                XPQuery<Tardis> tardisQuery =
                    new XPQuery<Tardis>(XpoDefault.Session);
    
                var lastStatusId =
                    (from t in tardisQuery select (t.Status_Id)).Max();
    
                //GS - If there are no persisted tardis values, 
                //set the lastSessionId = 1 as Twitter can't handle 0
                lastStatusId = (lastStatusId == 0) ? 1 : lastStatusId;
    
                //GS - Fetch Tweets newer than lastStatusId
                XDocument timeline =
                    XDocument.Load(String.Format(
                        @"http://api.twitter.com/1/statuses/user_timeline.xml?"
                            + "screen_name=garyshort&since_id={0}", lastStatusId));
    
                //GS - If there have been no updates since last id then 
                //we're done here
                if (timeline.Descendants("status").Count<XElement>() == 0)
                {
                    Console.WriteLine("No new Tweets to process!");
                    return;
                }
    
                //GS - Add the georss namespaces
                XNamespace nsGeoRSS = "http://www.georss.org/georss";
    
                //GS - Persist the status information
                using (UnitOfWork uow = new UnitOfWork())
                {
                    //GS - Specify the DB we're using
                    uow.ConnectionString =
                        @"data source=.;integrated security=true;" +
                            @"initial catalog=TweetLocations;";
    
                    foreach (var status in timeline.Root.Descendants("status"))
                    {
                        //GS - If there is no geo tag then skip this status
                        if (status.Element("geo").IsEmpty)
                            continue;
    
                        new Tardis(uow)
                        {
                            Date = DateTime.ParseExact(status.Element("created_at")
                                .Value, "ddd MMM dd HH:mm:ss zz00 yyyy", null),
    
                            Status_Id = long.Parse(status.Element("id").Value),
    
                            Lat = Decimal.Parse(status.Element("geo").Element(
                                    nsGeoRSS + "point").Value.Split(" "
                                        .ToCharArray())[0]),
    
                            Long = Decimal.Parse(status.Element("geo").Element(
                                    nsGeoRSS + "point").Value.Split(" "
                                        .ToCharArray())[1]),
                        };
                    }
                    uow.CommitChanges();
                }            
            }
        }
    }

    Taking a look at this script can see that it’s pretty standard XPO stuff. Up front we tell XPO that we are going to be using a SQL Server database and where to find it. Then we use Linq to XPO to retrieve the latest status id and we only ask Twitter for tweets that were posted after this id, this should stop us from having duplicates in the database. After that, we simply iterate across the tweets returned by Twitter and store them in the database.

    The next thing we need to do is to create a web script that will return our stored TARDIS entries as a GeoRSS feed. The script to do that looks like this:

    using System;
    using System.Linq;
    using System.Xml;
    using System.Text;
    using DevExpress.Xpo;
    using sharedModel;
    
    public partial class GetTwitterLocations : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            //GS - Connect to the db and return all the locations ordered 
            //by status id - that's newest tweets last
            XPQuery<Tardis> tardisQuery =
                new XPQuery<Tardis>(XpoDefault.Session);
    
            var tweetLocations = from t in tardisQuery
                                 orderby t.Status_Id ascending
                                 select t;
            
            //GS - Clear any previous response and state we're returning XML
            Response.Clear();
            Response.ContentType = "text/xml";
    
            //GS - Instantiate an XML writer to use
            using (XmlTextWriter writer = new XmlTextWriter(Response.OutputStream, 
                Encoding.UTF8))
            {
                //GS - Set indentation level and start writing the document
                writer.Formatting = Formatting.Indented;
                writer.Indentation = 3;
                writer.Namespaces = true;
                writer.WriteStartDocument();
    
                //GS - Identify this as an RSS document
                writer.WriteStartElement("rss");
                writer.WriteAttributeString("version", "2.0");
    
                //GS - Add the GeoRSS namespace
                writer.WriteAttributeString("xmlns:geo", 
                    "http://www.w3.org/2003/01/geo/wgs84_pos#");
    
                //GS - Add in each of the locations
                foreach (var t in tweetLocations)
                {
                    writer.WriteStartElement("item");
                    writer.WriteElementString("title", t.Date.ToLongDateString());
                    writer.WriteElementString("geo:lat", t.Lat.ToString());
                    writer.WriteElementString("geo:long", t.Long.ToString());
                    writer.WriteEndElement();
                }
    
                //GS - Close tags, flush and send
                writer.WriteEndElement();
                writer.WriteEndDocument();
                writer.Flush();
                writer.Close();
            }
            Response.End();
        }
    }

    The first thing to note here is that we use the default session, but how does the default session get set to the SQL Server database that we are using? I mean you can’t set it in this method as the session is already established and XPO will throw and exception if you try to change the database after you have established the session. To avoid this you must set the database connection in the Global.asax file, like so:

    void Application_Start(object sender, EventArgs e) 
    {
        DevExpress.Xpo.XpoDefault.Session.ConnectionString =
                @"data source=.;integrated security=true;" +
                    @"initial catalog=TweetLocations;";
    
    }

    Next, you’ll note, we ask XPO for the TARDIS entries in ascending order. This is because I make numerous posts form the same location. In this example I don’t want to filter that list and only map one tweet per location, instead I want to ensure that the most recent one is “on top”. The rest of the script is fairly well commented and I don’t think it needs further explanation. Drop me a note in the comments if you have any questions.

    When you navigate to this page the feed is generated, like so:

    image

    Which means the only thing left to do is to write a web page which will show where I’ve been tweeting from recently. And that script looks like this:

    <%@ Page Language="C#" AutoEventWireup="true"  CodeFile="Default.aspx.cs" 
        Inherits="_Default" %>
    
    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
    <html>
       <head>
          <title>Where's Gary?</title>
          <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
          <script 
            src="http://ecn.dev.virtualearth.net/mapcontrol/mapcontrol.ashx?v=6.2">
          </script>
    
          <script type="text/javascript">
              
              var map = null;
              
              //GS - Initialise the page
              function InitPage() {
                  GetMap();
                  AddMyLayer();
              }
                      
             //GS - Load the map
             function GetMap() {
                map = new VEMap('myMap');
                map.LoadMap();          
             }
    
             //GS - Add the layer and load in our tweet location data
             function AddMyLayer() {
                var l = new VEShapeLayer();
                var veLayerSpec = new VEShapeSourceSpecification(VEDataType.GeoRSS, 
                    "http://localhost:8341/twitterMap/GetTwitterLocations.aspx", l);
                map.ImportShapeLayerData(veLayerSpec);
             }
          </script>
       </head>
       <body onload="InitPage();">
          <div id="myMap" style="position:relative;"></div>
       </body>
    </html>

    Again this script is well commented and so you should be able to see exactly what is going on. Pointing your browser at this page shows the following map:

    image

    Here you can see that I’ve recently been to Germany and if you hover over the pins you can see the latest date that I was there:

    image

    Well that’s a bit of fun really but you can see how you can use this to instrument your social network if you so desired. Here I’m just showing the location of my own tweets, but you could just as easily map your followers with with greatest amount of retweets of your post, for example. The possibilities are endless and I’ll leave you to ponder those in your own time, but if you build something cool along these lines, with XPO, then leave a comment or drop me an email and I’ll highlight it on this blog.

    Although we are mapping Time And Relative Dimension In Space and we can vary our location (relative dimension) sadly this application will not yet let us vary time, so I guess we can’t compete with the Doctor just yet. Maybe that’ll be an 11.1 feature for XPO – until that does happen, happy XPOing! :-)

More from DevExpress
Live Chat
Have a pre-sales question?
Need assistance with your evaluation?
We are here to help.
Chat is one of the many ways you can contact members of the DevExpress Team. We are available Monday-Friday between 8:30am and 5:00pm Pacific Time.
If you need additional product information, require pre-sales assistance, or want help with your order, write to us at info@devexpress.com or call us at
+1 (818) 844-3383.