フォロー外鍵垢のフォロワーを探索する

概要

Twitter でのネトストを趣味とする友人 (何だそれは) から、「フォロー外の鍵垢のフォロワーを自動で探索できないか」との話を振られた。確かにフォロー外の鍵垢であっても、

  • 公開垢
  • 自分がフォローしている鍵垢

にフォローされていた場合は合法的にその情報を得ることが可能である。まず考えられるのは Twitter 上の全アカウントを走査しそのフォローリストをチェックするという方法だが、どう考えても現実的ではない。そこで、今回は「自分がフォローしているアカウント、およびそれらがフォローしているアカウント」を対象にして探索を試みることとする。

Rate Limit との戦い

さて Twitter API を用いて上記を実装したいわけだが、悲しいことに以下の API にはかなり厳しい Rate Limit が存在する。

  1. GET friends/list: フォローしているユーザの一覧を取得する。15 回 / 15 分まで

  2. GET friendships/show: ある2つのアカウントの関係を調べる。180 回 / 15 分まで

フォローしているアカウントおよびそれらがフォローしているアカウント全てについて a を叩くのはあまりにも時間がかかりすぎるため、5 秒に一回 b を叩くくらいが穏当かなあといったところである。しかしこれでも気が遠くなるほどの時間 (アカウントの規模などに依存するが、通常自分が数百フォローしている場合は日単位かかるはずだ) を要し実用的でないため、何か別の対策を講じる必要がある。

などと考えていたところ、ちょうど同じような状況 に陥っている人を発見した。こちらのページによると Rate Limit は access token ごとに定められており、access token を複数用意してセッションを切り替えていくことにより Rate Limit を回避できるらしい。以下のコードでは access token を 6 つ用意し、200 以外のレスポンスが返ってきた時点でセッションを切り替えるという我ながらひどい実装をしている。なぜ 6 つなのかというと、1 秒 1 リクエストを送るとすると 3 分後に Rate Limit (180 回) に達するが、これを 5 回 (つまり 15 分間) 繰り返しているうちに最初のセッション終了後から 15 分が経過し、アクセス制限が解除されるからである。

実装

方針

計算量削減のため以下の工夫あるいは諦めを行っている。

  • フォローしているユーザのフォローに被りが見られる場合があるので、一度探索したものをもう一度探すことのないようキャッシュしておく
  • 時間がかかりすぎるため、フォロー数・フォロワー数のいずれかが 1000 以上のアカウントはいわゆるリア垢ではないと見做して無視する

最終的な方針は以下の通り。以下、フォロワーを特定したい鍵垢を target と表記する。

  1. 自分がフォローしているアカウントについて a を叩き、フォロー一覧を取得する

  2. 上記の中から、

    • アクセスできないアカウント (フォロー外の鍵垢)

    • すでにキャッシュに存在するアカウント

    • フォロー数・フォロワー数が 1000 のいずれかが 1000 以上のアカウント

    を削除したリストを作成する

  3. 2 でできたリスト上のアカウントと target について 1 秒に 1 回 b を叩いていく

  4. 2 のリスト上のアカウントをキャッシュに追加する

実行

下記のスクリプト (explorer_private_account.py) と、access token 6 つ分の情報を入れた config.json を同ディレクトリに配置した上で、

python explorer_private_account.py source_screen_name target_screen_name

とする。起点となるアカウント (source_screen_name) は自分のアカウントでもよいし、ほかのアカウントでもよい。何らかのクラスタに属することはプロフィールから明らかだが中の人を特定しきれないときなどに、その界隈の公開垢を入れてみて試してみる、などの活用ができるかもしれない。

from requests_oauthlib import OAuth1Session
import json
import time
import sys

class private_account_explorer(object):
    def __init__(self, source_screen_name, target_screen_name):
        self.source_screen_name = source_screen_name
        self.target_screen_name = target_screen_name
        self.start_session()
        self.result_store = []
        self.no_need_to_search_name_list = []

    def start_session(self):
        with open('./config.json') as f:
            config_json = json.load(f)
        configs = config_json['configs']
        self.sessions = []
        for config in configs:
            CK = config['CONSUMER_KEY']
            CS = config['CONSUMER_SECRET']
            AT = config['ACCESS_TOKEN']
            ATS = config['ACCESS_TOKEN_SECRET']
            twitter = OAuth1Session(CK, CS, AT, ATS)
            self.sessions.append(twitter)
        self.session_num = 0
        self.twitter = self.sessions[self.session_num]

    def get_friend_list(self, screen_name):
        next_cursor = -1
        friend_obj_list = []
        while (next_cursor != 0):
            url = "https://api.twitter.com/1.1/friends/list.json" # 15 / 15 min
            params = {'screen_name': screen_name, 'count': 200, 'cursor': next_cursor}
            try:
                res = self.twitter.get(url, params=params)
            except:
                self.start_session()
                res = self.twitter.get(url, params=params)
            if res.status_code != 200: # API 制限に引っかかったらセッションを切り替えてやり直し
                print('\nswitching session...')
                time.sleep(5)
                self.session_num = self.session_num + 1 if self.session_num != 5 else 0
                self.twitter = self.sessions[self.session_num]
                return self.get_friend_list(screen_name)
            result = json.loads(res.text)
            friends = result['users']
            next_cursor = result['next_cursor']
            friend_obj_list.extend(friends)
            time.sleep(10)
        return friend_obj_list

    def get_relationship(self, source_screen_name, target_screen_name):
        url = "https://api.twitter.com/1.1/friendships/show.json" # 180 / 15 min
        params = {'source_screen_name': source_screen_name, 'target_screen_name': target_screen_name}
        try:
            res = self.twitter.get(url, params = params)
        except:
            self.start_session()
            res = self.twitter.get(url, params = params)
        if res.status_code != 200: # API 制限に引っかかったらセッションを切り替えてやり直し
            print('\nswitching session...')
            time.sleep(5)
            self.session_num = self.session_num + 1 if self.session_num != 5 else 0
            self.twitter = self.sessions[self.session_num]
            return self.get_relationship(source_screen_name, target_screen_name)
        result = json.loads(res.text)
        return result

    def detect_target_friends_from_name_list(self, screen_name_list):
        for i in range(len(screen_name_list)):
            screen_name = screen_name_list[i]
            print("\r({0}/{1}) found: {2}".format(i + 1, len(screen_name_list), ', '.join(self.result_store)), end="")
            result = self.get_relationship(screen_name, self.target_screen_name)
            screen_name = result['relationship']['source']['screen_name']
            if result['relationship']['source']['following']:
                self.result_store.append(screen_name)
            time.sleep(1)
        print('')

    def process_each_friend(self, friend_name):
        # ある友達の友達リストを取得
        friend_friend_obj_list = self.get_friend_list(friend_name)
        friend_friend_name_list = [friend['screen_name'] for friend in friend_friend_obj_list]
        # 友達の友達リストに target が含まれている場合は、当該友達を結果格納用リストに追加する
        if self.target_screen_name in friend_friend_name_list:
            if not friend_name in self.no_need_to_search_name_list:
                self.result_store.append(friend_name)
                self.no_need_to_search_name_list.append(friend_name)
        # 検索できるアカウント (フォローしているか、あるいは公開されている) もののみ抜き出す
        friend_friend_search_list = [friend['screen_name'] for friend in friend_friend_obj_list if (not friend['protected'] or friend['following'])]
        # no need to search に含まれていないものを検索対象とする
        friend_friend_search_list = list(set(friend_friend_search_list) - set(self.no_need_to_search_name_list))
        # 検索対象から target の友達を見つける
        self.detect_target_friends_from_name_list(friend_friend_search_list)
        # 検索済みリストの更新
        self.no_need_to_search_name_list = set(self.no_need_to_search_name_list) | set(friend_friend_search_list)
        
if __name__ == '__main__':
    argv = sys.argv
    explorer = private_account_explorer(argv[1], argv[2])
    # source user がフォローしている人 (friends) のリストを取得し、検索条件に合致するユーザだけに絞り込む
    friend_obj_list = explorer.get_friend_list(explorer.source_screen_name)
    friend_name_list = [friend['screen_name'] for friend in friend_obj_list]
    if explorer.source_screen_name in friend_name_list:
        print('The source user is following the target user')
        exit()
    # 検索できるアカウント (フォローしているか、あるいは公開されている) であり、かつフォロー数・フォロワー数がともに 1000 以下であるもののみ検索対象とする
    friend_search_list = [friend['screen_name'] for friend in friend_obj_list if ((not friend['protected'] or friend['following']) and (friend['friends_count'] < 1000) and (friend['followers_count'] < 1000))]
    
    for i in range(len(friend_search_list)):
        friend_name = friend_search_list[i]
        print("----- searching {0} ({1}/{2}) -----".format(friend_name, i + 1, len(friend_search_list), end=""))
        explorer.process_each_friend(friend_name)
    
    # 最終結果の出力
    print(', '.join(explorer.result_store))