Code for How to Extract YouTube Comments in Python Tutorial


View on Github

youtube_comment_extractor.py

import requests
import json
import time

# Code is partially grabbed from this repository:
# https://github.com/egbertbouman/youtube-comment-downloader

def search_dict(partial, key):
    """
    A handy function that searches for a specific `key` in a `data` dictionary/list
    """
    if isinstance(partial, dict):
        for k, v in partial.items():
            if k == key:
                # found the key, return the value
                yield v
            else:
                # value of the dict may be another dict, so we search there again
                for o in search_dict(v, key):
                    yield o
    elif isinstance(partial, list):
        # if the passed data is a list
        # iterate over it & search for the key at the items in the list
        for i in partial:
            for o in search_dict(i, key):
                yield o


def find_value(html, key, num_sep_chars=2, separator='"'):
    # define the start position by the position of the key + 
    # length of key + separator length (usually : and ")
    start_pos = html.find(key) + len(key) + num_sep_chars
    # the end position is the position of the separator (such as ")
    # starting from the start_pos
    end_pos = html.find(separator, start_pos)
    # return the content in this range
    return html[start_pos:end_pos]


def get_comments(url):
    session = requests.Session()
    # make the request
    res = session.get(url)
    # extract the XSRF token
    xsrf_token = find_value(res.text, "XSRF_TOKEN", num_sep_chars=3)
    # parse the YouTube initial data in the <script> tag
    data_str = find_value(res.text, 'window["ytInitialData"] = ', num_sep_chars=0, separator="\n").rstrip(";")
    # convert to Python dictionary instead of plain text string
    data = json.loads(data_str)
    # search for the ctoken & continuation parameter fields
    for r in search_dict(data, "itemSectionRenderer"):
        pagination_data = next(search_dict(r, "nextContinuationData"))
        if pagination_data:
            # if we got something, break out of the loop,
            # we have the data we need
            break

    continuation_tokens = [(pagination_data['continuation'], pagination_data['clickTrackingParams'])]

    while continuation_tokens:
        # keep looping until continuation tokens list is empty (no more comments)
        continuation, itct = continuation_tokens.pop()
    
        # construct params parameter (the ones in the URL)
        params = {
            "action_get_comments": 1,
            "pbj": 1,
            "ctoken": continuation,
            "continuation": continuation,
            "itct": itct,
        }

        # construct POST body data, which consists of the XSRF token
        data = {
            "session_token": xsrf_token,
        }

        # construct request headers
        headers = {
            "x-youtube-client-name": "1",
            "x-youtube-client-version": "2.20200731.02.01"
        }

        # make the POST request to get the comments data
        response = session.post("https://www.youtube.com/comment_service_ajax", params=params, data=data, headers=headers)
        # convert to a Python dictionary
        comments_data = json.loads(response.text)

        for comment in search_dict(comments_data, "commentRenderer"):
            # iterate over loaded comments and yield useful info
            yield {
                "commentId": comment["commentId"],
                "text": ''.join([c['text'] for c in comment['contentText']['runs']]),
                "time": comment['publishedTimeText']['runs'][0]['text'],
                "isLiked": comment["isLiked"],
                "likeCount": comment["likeCount"],
                # "replyCount": comment["replyCount"],
                'author': comment.get('authorText', {}).get('simpleText', ''),
                'channel': comment['authorEndpoint']['browseEndpoint']['browseId'],
                'votes': comment.get('voteCount', {}).get('simpleText', '0'),
                'photo': comment['authorThumbnail']['thumbnails'][-1]['url'],
                "authorIsChannelOwner": comment["authorIsChannelOwner"],
            }

        # load continuation tokens for next comments (ctoken & itct)
        continuation_tokens = [(next_cdata['continuation'], next_cdata['clickTrackingParams'])
                         for next_cdata in search_dict(comments_data, 'nextContinuationData')] + continuation_tokens

        # avoid heavy loads with popular videos
        time.sleep(0.1)
    




if __name__ == "__main__":
    # from pprint import pprint
    # url = "https://www.youtube.com/watch?v=jNQXAC9IVRw"
    # for count, comment in enumerate(get_comments(url)):
    #     if count == 3:
    #         break
    #     pprint(comment)
    #     print("="*50)
    import argparse
    import os

    parser = argparse.ArgumentParser(description="Simple YouTube Comment extractor")
    parser.add_argument("url", help="The YouTube video full URL")
    parser.add_argument("-l", "--limit", type=int, help="Number of maximum comments to extract, helpful for longer videos")
    parser.add_argument("-o", "--output", help="Output JSON file, e.g data.json")

    # parse passed arguments
    args = parser.parse_args()
    limit = args.limit
    output = args.output
    url = args.url

    from pprint import pprint
    for count, comment in enumerate(get_comments(url)):
        if limit and count >= limit:
            # break out of the loop when we exceed limit specified
            break
        if output:
            # write comment as JSON to a file
            with open(output, "a") as f:
                # begin writing, adding an opening brackets
                if count == 0:
                    f.write("[")
                f.write(json.dumps(comment, ensure_ascii=False) + ",")
        else:
            pprint(comment)
            print("="*50)
    print("total comments extracted:", count)
    if output:
        # remove the last comma ','
        with open(output, "rb+") as f:
            f.seek(-1, os.SEEK_END)
            f.truncate()
        # add "]" to close the list in the end of the file
        with open(output, "a") as f:
            print("]", file=f)