For checkouts or to view logs direct your SVN client to svn://svn.saintamh.org/code/lib-python/trunk/saintamh/twitter.py

#!/usr/bin/env python

"""
$Id: twitter.py 3161 2017-03-03 20:42:29Z herve $
"""

#----------------------------------------------------------------------------------------------------------------------------------
# includes

# standards
import collections
import datetime
import re

# saintamh
from saintamh.http import SimpleHTTPClient, HTTPStatus404
from saintamh.struct import *
from saintamh.util.dates import hours
from saintamh.util.etree import css_all, css_one, css_one_of, remove_all_css, xpath_one
from saintamh.util.iterables import one
from saintamh.util.html import html_etree, translate_named_html_entities_into_character_refs
from saintamh.util.scrapers import extract_text, join_urls, make_all_urls_absolute, parse_json

#----------------------------------------------------------------------------------------------------------------------------------
# data structs

class Twit(struct(
        handle = {
            'class': str,
            'coerce_value': lambda s: s and s.lower(),
            },
        )):
    @classmethod
    def get(cls, twit):
        if isinstance(twit, cls):
            return twit
        else:
            return cls(twit)
    def __str__(self):
        return self.handle

class TwitProfile(struct(
        twit = Twit,
        user_id = long,
        full_name = nullable(unicode),
        description = nullable(unicode),
        location = nullable(unicode),
        homepage_url = nullable(absolute_http_url),
        account_creation_dt = datetime.datetime,
        avatar_img_url = absolute_http_url,
        profile_bg_img_url = nullable(absolute_http_url),
        num_followers = nonnegative(long),
        num_listed = nonnegative(long),
        num_tweets = nonnegative(long),
        num_friends = nonnegative(long),
        num_favourites = nonnegative(long),
        )):
    pass

TweetID = {
    'class': str,
    'regex': r'^\d{15,}$',
    }

class Tweet(struct(
        id = TweetID,
        twit = Twit,
        datetime = datetime.datetime,
        body = unicode,

        # 2013-06-14 - we used to hold the parent tweet ID here, but they're no longer in the public data AFAICS
        is_reply = bool,

        # 2013-06-14 - IIUC, retweets are not really tweets themselves. At the data level, AFAICS you know a tweet is a retweet
        # when you see a tweet by Bob in Alice's feed.
        #
        # However, if we ask to fetch all of Alice's conversations, when the Conversation objects are returned they'll contain
        # plenty of tweets not by her -- replies by others in the conversation. So we need this to indicate the context we're
        # viewing the tweet in. It means the same tweet by Alice will be a different object when retweeted by Bob and loaded from
        # his feed than when retweeted by Claire and viewed in her feed.
        # 
        retweeted_by = nullable(Twit),

        picture_url = nullable(absolute_http_url),

        hashtags = lambda self: re.findall(r'\#\w+', self.body),
        url = lambda self: 'https://twitter.com/%s/status/%s' % (self.twit, self.id),
        )):
    pass

class Conversation(struct(
        tweets = seq_of(
            Tweet,
            coerce_value = lambda tweets: sorted(tweets, key=lambda t: t.datetime),
            ),
        participants = set_of(Twit),
        earliest_tweet = lambda self: self.tweets[0],
        latest_tweet = lambda self: self.tweets[-1],
        tweets_by_id = lambda self: dict((t.id,t) for t in self.tweets),
        )):
    def __iter__(self):
        return iter(self.tweets)
    def merge_with(self, other):
        return self.derive(
            tweets = set(self.tweets + other.tweets),
            participants = set(self.participants | other.participants),
            )

#----------------------------------------------------------------------------------------------------------------------------------
# private utils

_default_http_client = SimpleHTTPClient()

def _http_get(client, *args, **kwargs):
    kwargs.update(
        # Can't use just any old user-agent string, might get redirected to the mobile site. This one works as of 2014-02-28
        user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:27.0) Gecko/20100101 Firefox/27.0',
        retries_upon_error = 3,
        )
    return client.request_str(*args, **kwargs)

def _fetch_html(http_client, url, *args, **kwargs):
    return make_all_urls_absolute(
        url,
        html_etree(
            translate_named_html_entities_into_character_refs(
                unicode(
                    _http_get(http_client, url, *args, **kwargs),
                    'UTF-8',
                    )
                ),
            do_decode_entities = False,
            ),
        )

#----------------------------------------------------------------------------------------------------------------------------------
# private scraper functions

def _parse_dt(dt_str):
    return datetime.datetime.strptime(
        dt_str,
        '%a %b %d %H:%M:%S +0000 %Y'
        )

def _parse_profile(pdat):
    return TwitProfile(
        twit = pdat['screen_name'],
        user_id = pdat['id'],
        full_name = pdat['name'],
        description = pdat['description'],
        location = pdat['location'],
        homepage_url = pdat['url'] and pdat['url'].encode('UTF-8'),
        account_creation_dt = _parse_dt(pdat['created_at']),
        avatar_img_url = join_urls('https://a0.twimg.com/', pdat['profile_image_url']),
        profile_bg_img_url = pdat.get('profile_background_image_url')
            and join_urls('https://a0.twimg.com/', pdat['profile_background_image_url']),
        num_followers = pdat['followers_count'],
        num_listed = pdat['listed_count'],
        num_tweets = pdat['statuses_count'],
        num_friends = pdat['friends_count'],
        num_favourites = pdat['favourites_count'],
        )

def _parse_tweet_html(tweet_el):
    picture_el = css_one_of(
        # NB although this function can parse tweets both from a twit's home page and from the conversation pages, on the home page
        # the thumbnails for embedded media are sometimes missing, sometimes not. Not sure why.
        tweet_el,
        'img.thumbnail',
        '.media img',
        '.OldMedia-photoContainer img',
        '.js-adaptive-photo img',
        allow_mismatch = True,
        allow_multiple_matches_per_path = True,
    )
    if picture_el is not None:
        picture_url = re.sub(':thumb$', ':large', picture_el.get('src'))
        print "    " + picture_url
    else:
        picture_url = None
    body_el = css_one('.js-tweet-text', tweet_el)
    for link_el in css_all('a.twitter-timeline-link', body_el, allow_mismatch=True):
        if link_el.text:
            link_el.text = ' ' + link_el.text
    if 'pic.tw' in extract_text(body_el):
        print repr(extract_text(body_el))
    return Tweet(
        # NB for retweets, the id and twit name will be the original tweet's ID, not the retweet's
        id = tweet_el.get('data-item-id'),
        twit = tweet_el.get('data-screen-name'),
        datetime = datetime.datetime.fromtimestamp(int(xpath_one('.//span/@data-time', tweet_el))),
        body = extract_text(body_el),
        is_reply = (tweet_el.get('data-is-reply-to') == 'true'),
        retweeted_by = tweet_el.get('data-retweeter'),
        picture_url = picture_url,
    )

def _parse_all_tweets_from_page(html, seed_tweet=None):
    for tweet_el in css_all('div.js-actionable-tweet', html):
        tweet = _parse_tweet_html(tweet_el)
        if seed_tweet \
                and tweet.id == seed_tweet.id \
                and seed_tweet.retweeted_by:
            tweet = tweet.derive(retweeted_by = seed_tweet.retweeted_by)
        yield tweet

def _fetch_tweets_in_conversation(http_client, seed_tweet):
    return _parse_all_tweets_from_page(
        remove_all_css(
            _fetch_html(
                http_client,
                'https://twitter.com/%s/status/%s' % (seed_tweet.twit, seed_tweet.id),
                cache_life = datetime.datetime.now() - seed_tweet.datetime,
                ),
            '.proxy-tweet-container',
            ),
        seed_tweet,
        )


#----------------------------------------------------------------------------------------------------------------------------------
# public functions

class AccountSuspended(Exception):
    pass

def twitter_homepage_url(twit):
    return 'https://twitter.com/%s' % twit

def fetch_latest_tweets(
        twit,
        http_client = _default_http_client,
        include_replies = True,
        try_harder_to_get_picture_url = True,
        filter_tweet = lambda tweet: True,
        ):
    twit = Twit.get(twit)
    homepage_html = _fetch_html(http_client, twitter_homepage_url(twit), cache_life=hours(6))
    for tweet in _parse_all_tweets_from_page(homepage_html):
        if (include_replies or not tweet.is_reply) and filter_tweet(tweet):
            if tweet.picture_url is None and try_harder_to_get_picture_url:
                # It could be that the tweet carries no picture, but it could also be that the picture is not displayed on the
                # homepage. Happens sometimes, sometimes not. There might be a logic here, but rather than figure it out, we just
                # fetch the conversation page
                tweet = one(
                    _fetch_tweets_in_conversation(http_client, tweet),
                    lambda t: t.id == tweet.id,
                    )
            yield tweet

def fetch_profile(twit, http_client=_default_http_client):
    twit = Twit.get(twit)
    try:
        homepage_html = _fetch_html(http_client, twitter_homepage_url(twit), cache_life=hours(6))
    except HTTPStatus404:
        init_data = {}
    else:
        init_data = parse_json(xpath_one('//input[@id="init-data"]/@value', homepage_html))
    try:
        profile_data = init_data['profile_user']
    except KeyError:
        #if init_data.get('href') == '/account/suspended':
        raise AccountSuspended("Account %r has been suspended" % twit)
    else:
        return _parse_profile(profile_data)

def fetch_seeded_conversations(iter_seed_tweets, http_client=_default_http_client):
    # 2013-06-14 - We used to recursively explore conversations exhaustively, starting from the main twit, exploring replies to
    # their recent tweets, then replies to those, etc. However that often created very long conversations involving people I wasn't
    # interested in. They were actual connected threads of replies, it's just that as they got further and further from the person
    # I'm following, they got less and less interesting.
    #
    # So now we only include the close ancestors and descendants of the main twit's posts. We do that by fetching the page for each
    # post by our main twit, parsing the conversation there, and stop there.
    partial_convs_by_tweet_id = collections.defaultdict(set)
    for seed_tweet in iter_seed_tweets:
        # NB don't use the seed tweet itself, use the version from the details page, as it has all media links. This is why we
        # don't need to try_harder_to_get_picture_url, we won't use that Tweet object beyond its ID anyway
        partial_conv = partial_convs_by_tweet_id[seed_tweet.id]
        for related_tweet in _fetch_tweets_in_conversation(http_client, seed_tweet):
            partial_conv.add(related_tweet)
            if partial_convs_by_tweet_id.get(related_tweet.id) not in (None,partial_conv):
                partial_conv.update(partial_convs_by_tweet_id[related_tweet.id])
        for t in partial_conv:
            partial_convs_by_tweet_id[t.id] = partial_conv
    for partial_conv in partial_convs_by_tweet_id.itervalues():
        yield Conversation(
            tweets = partial_conv,
            participants = set(t.twit for t in partial_conv),
            )

def fetch_latest_conversations(twit, http_client=_default_http_client, include_replies=True, filter_seed_tweet=lambda tweet: True):
    twit = Twit.get(twit)
    return fetch_seeded_conversations(
        fetch_latest_tweets(
            twit,
            http_client,
            include_replies,
            try_harder_to_get_picture_url = False,
            filter_tweet = filter_seed_tweet,
        ),
        http_client=http_client,
        )

def fetch_tweet(tweet_url, http_client=_default_http_client, **http_kwargs):
    return _parse_tweet_html(
        css_one('.js-original-tweet', _fetch_html(http_client, tweet_url, **http_kwargs)),
        )

def fetch_conversation(tweet_url, http_client=_default_http_client, **http_kwargs):
    return fetch_seeded_conversations(
        [fetch_tweet(tweet_url, http_client, **http_kwargs)],
        http_client,
        )

#----------------------------------------------------------------------------------------------------------------------------------