Code for How to Extract YouTube Data using YouTube API in Python Tutorial


View on Github

utils.py

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

import urllib.parse as p
import re
import os
import pickle

SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]

def youtube_authenticate():
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
    api_service_name = "youtube"
    api_version = "v3"
    client_secrets_file = "credentials.json"
    creds = None
    # the file token.pickle stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first time
    if os.path.exists("token.pickle"):
        with open("token.pickle", "rb") as token:
            creds = pickle.load(token)
    # if there are no (valid) credentials availablle, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(client_secrets_file, SCOPES)
            creds = flow.run_local_server(port=0)
        # save the credentials for the next run
        with open("token.pickle", "wb") as token:
            pickle.dump(creds, token)

    return build(api_service_name, api_version, credentials=creds)


def get_channel_details(youtube, **kwargs):
    return youtube.channels().list(
        part="statistics,snippet,contentDetails",
        **kwargs
    ).execute()


def search(youtube, **kwargs):
    return youtube.search().list(
        part="snippet",
        **kwargs
    ).execute()


def get_video_details(youtube, **kwargs):
    return youtube.videos().list(
        part="snippet,contentDetails,statistics",
        **kwargs
    ).execute()


def print_video_infos(video_response):
    items = video_response.get("items")[0]
    # get the snippet, statistics & content details from the video response
    snippet         = items["snippet"]
    statistics      = items["statistics"]
    content_details = items["contentDetails"]
    # get infos from the snippet
    channel_title = snippet["channelTitle"]
    title         = snippet["title"]
    description   = snippet["description"]
    publish_time  = snippet["publishedAt"]
    # get stats infos
    comment_count = statistics["commentCount"]
    like_count    = statistics["likeCount"]
    view_count    = statistics["viewCount"]
    # get duration from content details
    duration = content_details["duration"]
    # duration in the form of something like 'PT5H50M15S'
    # parsing it to be something like '5:50:15'
    parsed_duration = re.search(f"PT(\d+H)?(\d+M)?(\d+S)", duration).groups()
    duration_str = ""
    for d in parsed_duration:
        if d:
            duration_str += f"{d[:-1]}:"
    duration_str = duration_str.strip(":")
    print(f"""
    Title: {title}
    Description: {description}
    Channel Title: {channel_title}
    Publish time: {publish_time}
    Duration: {duration_str}
    Number of comments: {comment_count}
    Number of likes: {like_count}
    Number of views: {view_count}
    """)


def parse_channel_url(url):
    """
    This function takes channel `url` to check whether it includes a
    channel ID, user ID or channel name
    """
    path = p.urlparse(url).path
    id = path.split("/")[-1]
    if "/c/" in path:
        return "c", id
    elif "/channel/" in path:
        return "channel", id
    elif "/user/" in path:
        return "user", id


def get_channel_id_by_url(youtube, url):
    """
    Returns channel ID of a given `id` and `method`
    - `method` (str): can be 'c', 'channel', 'user'
    - `id` (str): if method is 'c', then `id` is display name
        if method is 'channel', then it's channel id
        if method is 'user', then it's username
    """
    # parse the channel URL
    method, id = parse_channel_url(url)
    if method == "channel":
        # if it's a channel ID, then just return it
        return id
    elif method == "user":
        # if it's a user ID, make a request to get the channel ID
        response = get_channel_details(youtube, forUsername=id)
        items = response.get("items")
        if items:
            channel_id = items[0].get("id")
            return channel_id
    elif method == "c":
        # if it's a channel name, search for the channel using the name
        # may be inaccurate
        response = search(youtube, q=id, maxResults=1)
        items = response.get("items")
        if items:
            channel_id = items[0]["snippet"]["channelId"]
            return channel_id
    raise Exception(f"Cannot find ID:{id} with {method} method")


def get_video_id_by_url(url):
    """
    Return the Video ID from the video `url`
    """
    # split URL parts
    parsed_url = p.urlparse(url)
    # get the video ID by parsing the query of the URL
    video_id = p.parse_qs(parsed_url.query).get("v")
    if video_id:
        return video_id[0]
    else:
        raise Exception(f"Wasn't able to parse video URL: {url}")

video_details.py

from utils import (
    youtube_authenticate,  
    get_video_id_by_url, 
    get_video_details,
    print_video_infos
)


if __name__ == "__main__":
    # authenticate to YouTube API
    youtube = youtube_authenticate()
    video_url = "https://www.youtube.com/watch?v=jNQXAC9IVRw&ab_channel=jawed"
    # parse video ID from URL
    video_id = get_video_id_by_url(video_url)
    # make API call to get video info
    response = get_video_details(youtube, id=video_id)
    # print extracted video infos
    print_video_infos(response)

search_by_keyword.py

from utils import (
    youtube_authenticate,  
    get_video_details,
    print_video_infos,
    search
)


if __name__ == "__main__":
    # authenticate to YouTube API
    youtube = youtube_authenticate()
    # search for the query 'python' and retrieve 2 items only
    response = search(youtube, q="python", maxResults=2)
    items = response.get("items")
    for item in items:
        # get the video ID
        video_id = item["id"]["videoId"]
        # get the video details
        video_response = get_video_details(youtube, id=video_id)
        # print the video details
        print_video_infos(video_response)
        print("="*50)

channel_details.py

from utils import (
    youtube_authenticate, 
    get_channel_id_by_url, 
    get_channel_details,
    get_video_details,
    print_video_infos
)


def get_channel_videos(youtube, **kwargs):
    return youtube.search().list(
        **kwargs
    ).execute()
        

if __name__ == "__main__":
    # authenticate to YouTube API
    youtube = youtube_authenticate()
    channel_url = "https://www.youtube.com/channel/UC8butISFwT-Wl7EV0hUK0BQ"
    # get the channel ID from the URL
    channel_id = get_channel_id_by_url(youtube, channel_url)
    # get the channel details
    response = get_channel_details(youtube, id=channel_id)
    # extract channel infos
    snippet = response["items"][0]["snippet"]
    statistics = response["items"][0]["statistics"]
    channel_country = snippet["country"]
    channel_description = snippet["description"]
    channel_creation_date = snippet["publishedAt"]
    channel_title = snippet["title"]
    channel_subscriber_count = statistics["subscriberCount"]
    channel_video_count = statistics["videoCount"]
    channel_view_count  = statistics["viewCount"]
    print(f"""
    Title: {channel_title}
    Published At: {channel_creation_date}
    Description: {channel_description}
    Country: {channel_country}
    Number of videos: {channel_video_count}
    Number of subscribers: {channel_subscriber_count}
    Total views: {channel_view_count}
    """)
    # the following is grabbing channel videos
    # number of pages you want to get
    n_pages = 2
    # counting number of videos grabbed
    n_videos = 0
    next_page_token = None
    for i in range(n_pages):
        params = {
            'part': 'snippet',
            'q': '',
            'channelId': channel_id,
            'type': 'video',
        }
        if next_page_token:
            params['pageToken'] = next_page_token
        res = get_channel_videos(youtube, **params)
        channel_videos = res.get("items")
        for video in channel_videos:
            n_videos += 1
            video_id = video["id"]["videoId"]
            # easily construct video URL by its ID
            video_url = f"https://www.youtube.com/watch?v={video_id}"
            video_response = get_video_details(youtube, id=video_id)
            print(f"================Video #{n_videos}================")
            # print the video details
            print_video_infos(video_response)
            print(f"Video URL: {video_url}")
            print("="*40)
        # if there is a next page, then add it to our parameters
        # to proceed to the next page
        if "nextPageToken" in res:
            next_page_token = res["nextPageToken"]

comments.py

from utils import youtube_authenticate, get_video_id_by_url, get_channel_id_by_url


def get_comments(youtube, **kwargs):
    return youtube.commentThreads().list(
        part="snippet",
        **kwargs
    ).execute()

        

if __name__ == "__main__":
    # authenticate to YouTube API
    youtube = youtube_authenticate()
    # URL can be a channel or a video, to extract comments
    url = "https://www.youtube.com/watch?v=jNQXAC9IVRw&ab_channel=jawed"
    if "watch" in url:
        # that's a video
        video_id = get_video_id_by_url(url)
        params = {
            'videoId': video_id, 
            'maxResults': 2,
            'order': 'relevance', # default is 'time' (newest)
        }
    else:
        # should be a channel
        channel_id = get_channel_id_by_url(url)
        params = {
            'allThreadsRelatedToChannelId': channel_id, 
            'maxResults': 2,
            'order': 'relevance', # default is 'time' (newest)
        }
    # get the first 2 pages (2 API requests)
    n_pages = 2
    for i in range(n_pages):
        # make API call to get all comments from the channel (including posts & videos)
        response = get_comments(youtube, **params)
        items = response.get("items")
        # if items is empty, breakout of the loop
        if not items:
            break
        for item in items:
            comment = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
            updated_at = item["snippet"]["topLevelComment"]["snippet"]["updatedAt"]
            like_count = item["snippet"]["topLevelComment"]["snippet"]["likeCount"]
            comment_id = item["snippet"]["topLevelComment"]["id"]
            print(f"""\
            Comment: {comment}
            Likes: {like_count}
            Updated At: {updated_at}
            ==================================\
            """)
        if "nextPageToken" in response:
            # if there is a next page
            # add next page token to the params we pass to the function
            params["pageToken"] =  response["nextPageToken"]
        else:
            # must be end of comments!!!!
            break
        print("*"*70)

You can also explore the code in this notebook.