首先,解析m3u8文件,将其密钥和所有视频地址存储下来。
import os
def makedir_video(video_name: str):
if not os.path.exists(video_name):
os.mkdir(video_name)
def get_videos_url(video_name, m3u8_url: str):
res = requests.get(m3u8_url, headers=headers)
m3u8_filename = m3u8_url.split(r'/')[-1]
with open(os.path.join(video_name, m3u8_filename), 'w', encoding='GBK') as f:
f.write(res.text)
all_videos_url = []
for i in res.text.strip().split('\n'):
if 'AES' in i and 'KEY' in i:
key_uri = re.findall('URI="(.*?)"', i)[0]
if "http" not in key_uri:
key_uri = r'/'.join(m3u8_url.split(r'/')[:-1]) + '/' + key_uri
key = requests.get(key_uri, headers=headers).content
with open(os.path.join(video_name, 'enc.key'), 'wb') as f:
f.write(key)
if i.startswith('#') or len(i) == 0:
continue
all_videos_url.append(i)
print(all_videos_url)
return all_videos_url
if __name__ == '__main__':
url = 'https://www.66s.cc/e/DownSys/play/?classid=4&id=20778&pathid1=0&bf=0'
m3u8_url, video_name = get_m3u8_url(url)
makedir_video(video_name)
all_videos = get_videos_url(video_name, m3u8_url)
运行完毕后再对应电影的文件夹下会生成index.m3u8
文件和enc.key
文件
现在我们已经有了所有的视频地址,接下来只需要根据视频地址将所有的视频下载下来即可。
由于总共视频片段较多,如果使用传统的代码运行方式进行运行,将会十分耗时,故在这里我们采用协程异步操作来进行访问和下载。
由于使用协程会对服务器产生较大的访问压力,所有在这里我们引入信号量去控制访问协程的并发数,减轻服务器访问压力。
其次,由于这里的请求采用的是异步操作,所有很有可能出现中间部分视频片段第一次下载失败的情况,在这里我们处理的方式是,先让其重复访问10次,如果10次还没有访问到视频资源,则将其记录到对应的文件中,便于后续对其进行进一步分析,为什么没有访问到资源?
import asyncio
import aiofiles
import aiohttp
async def download_video(filepath, video_url, sem):
async with sem:
for i in range(10):
try:
video_name = video_url.split(r'/')[-1]
async with aiohttp.ClientSession() as session:
async with session.get(video_url, headers=headers) as res:
content = await res.content.read()
async with aiofiles.open(os.path.join(filepath, video_name), 'wb') as f:
await f.write(content)
break
except Exception as e:
if i == 9:
with open(filepath+'_Error.txt', 'a', encoding='utf-8') as f:
f.write(video_url+'\n')
print(f'----- {video_name}下载失败,请求次数达到上限({i+1}次),已写入文件{filepath+"_Error.txt"} -----')
break
print(f'----- {video_name}下载失败,正在重试{i} -----')
print(e)
async def download_all_videos(sem_num, filepath, all_videos_url):
sem = asyncio.Semaphore(sem_num)
tasks = []
for video_url in all_videos_url:
tasks.append(asyncio.create_task(download_video(filepath, video_url, sem)))
await asyncio.wait(tasks)
if __name__ == '__main__':
url = 'https://www.66s.cc/e/DownSys/play/?classid=4&id=20778&pathid1=0&bf=0'
m3u8_url, video_name = get_m3u8_url(url)
makedir_video(video_name)
all_videos = get_videos_url(video_name, m3u8_url)
sem_num = 100
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(download_all_videos(sem_num, video_name, all_videos))
可以看到数据已经下载成功。
并且是有部分片段第一次是无法获取到数据的,并且越往后走这种情况出现得越多。我们也可以在请求过程中加入timeout
参数,控制代码一定时间内无响应的话就中断请求,重新请求,加快效率。否则最后几个包可能需要耗费大量的时间才能抓下来。
但是大部分经过二次访问都可以获取到视频片段。
例如这里的Hda3SBaf.ts
片段,第一次没有获取到,但是第二次获取到了,在文件夹中也能搜索到。
跟我们前面分析的一样,这里的视频文件是无法打开的。
而且前面我们已经将其对应的密钥文存储到了enc.key
文件中。直接使用该文件对视频进行解密即可。
由于解密过程是需要先将视频读取进来,解密完成后还要将其再次存储下来,同样需要大量的IO操作,故也使用的是协程异步的方式去操作。
解密使用的是AES算法,要调用AES进行解密需要先下载pycryptodome
,下载完毕后即可使用from Crypto.Cipher import AES
方式进行调用。
这里为了适配没有加密的视频,做了一个判断,是否有加密文件存在,如果有才是加密的视频。如果没有则直接跳过此步骤。
from Crypto.Cipher import AES
async def parse_video(video_file, video_name, new_video_name, key):
print(os.path.join(video_name, video_file))
print(os.path.join(new_video_name, video_file))
async with aiofiles.open(os.path.join(video_name, video_file), 'rb') as f1, aiofiles.open(os.path.join(new_video_name, video_file), 'wb') as f2:
content = await f1.read()
aes = AES.new(key=key, mode=AES.MODE_CBC, IV=b'0000000000000000')
new_content = aes.decrypt(content)
new_content = PKCS7_unpad(new_content)
await f2.write(new_content)
print(f'------ {video_file}解密成功 ------')
async def parse_all_videos(video_name):
all_file = os.listdir(video_name)
new_video_name = video_name + '_parse'
if 'enc.key' in all_file:
video_files = [i for i in all_file if i.endswith('ts')]
makedir_video(new_video_name)
print('------ 开始解密视频 ------')
with open(os.path.join(video_name, 'enc.key'), 'rb') as f:
key = f.read()
tasks = []
for video_file in video_files:
tasks.append(asyncio.create_task(parse_video(video_file, video_name, new_video_name, key)))
await asyncio.wait(tasks)
print('------ 视频解密完成 ------')
else:
os.rename(video_name, new_video_name)
print('------ 视频无加密 ------')
if __name__ == '__main__':
url = 'https://www.66s.cc/e/DownSys/play/?classid=4&id=20778&pathid1=0&bf=0'
m3u8_url, video_name = get_m3u8_url(url)
makedir_video(video_name)
all_videos = get_videos_url(video_name, m3u8_url)
sem_num = 100
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(download_all_videos(sem_num, video_name, all_videos))
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(parse_all_videos(video_name))
解密成功,在统计目录下会生成一个parse文件夹
视频合并可以采用windows
或者Linux
中自带的命令进行完成。
并且视频的合并应该按照播放顺序进行合并,即m3u8
文件的视频顺序。
def merge(video_name):
new_video_name = video_name + '_parse'
m3u8_file = [i for i in os.listdir(video_name) if i.endswith('m3u8')][0] with open(os.path.join(video_name, m3u8_file)) as f:
video_sort = [i.split('/')[-1].strip() for i in f.readlines() if not i.startswith("#") and len(i) > 0]
n = 1
os.chdir(new_video_name)
tmp = []
for i in range(len(video_sort)):
tmp.append(video_sort[i])
if i != 0 and i % 20 == 0:
cmd = f"copy /b {'+'.join(tmp)} {n}_copy.ts"
os.system(cmd)
tmp = []
n = n + 1
cmd = f"copy /b {'+'.join(tmp)} {n}_copy.ts"
os.system(cmd)
n = n + 1
last_temp = []
for i in range(1, n):
last_temp.append(f"{i}_copy.ts")
cmd = f"copy /b {'+'.join(last_temp)} {video_name}.mp4"
os.system(cmd)
if __name__ == '__main__':
url = 'https://www.66s.cc/e/DownSys/play/?classid=4&id=20778&pathid1=0&bf=0'
m3u8_url, video_name = get_m3u8_url(url)
makedir_video(video_name)
all_videos = get_videos_url(video_name, m3u8_url)
sem_num = 100
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(download_all_videos(sem_num, video_name, all_videos))
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(parse_all_videos(video_name))
merge(video_name)
打开文件夹可以看到文件夹内有一个MP4文件。即为完整的视频资源。
打开后也可以直接播放
由于网吧电影网站已经关闭,经过查询将案例目标修改为6v电影 https://www.66s.cc/
1. 获取m3u8文件地址
2. 根据m3u8文件,获取所有分段视频
3. 观察视频是否经过加密,如果经过加密则需要进行解密
4. 合并所有解密视频
import asyncio
import aiofiles
import aiohttp
import requests
from lxml import etree
import re
import os
from Crypto.Cipher import AES
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
def get_m3u8_url(url: str):
这里m3u8地址有多种情况
1. iframe直接链接到m3u8文件
2. 通过一个中间包
3. 通过两个中间包
res = requests.get(url, headers=headers)
res_html = etree.HTML(res.text)
video_name = res_html.xpath('//title/text()')[0].split('-')[0].strip()
iframe_url = str(res_html.xpath('//iframe/@src')[0])
if iframe_url.endswith('m3u8'):
return iframe_url, video_name
iframe_res = requests.get(iframe_url, headers=headers)
m3u8_url = re.findall("url: '(.*?)',", iframe_res.text, re.S)
if len(m3u8_url) == 0:
m3u8_url = iframe_url.rsplit('/', maxsplit=2)[0] + re.findall('"url":"(.*?)"', iframe_res.text, re.S)[0]
else:
m3u8_url = m3u8_url[0]
return m3u8_url, video_name
def makedir_video(video_name: str):
if not os.path.exists(video_name):
os.mkdir(video_name)
def get_videos_url(video_name, m3u8_url: str):
res = requests.get(m3u8_url, headers=headers)
m3u8_filename = m3u8_url.split(r'/')[-1]
with open(os.path.join(video_name, m3u8_filename), 'w', encoding='GBK') as f:
f.write(res.text)
all_videos_url = []
for i in res.text.strip().split('\n'):
if 'AES' in i and 'KEY' in i:
key_uri = re.findall('URI="(.*?)"', i)[0]
if "http" not in key_uri:
key_uri = r'/'.join(m3u8_url.split(r'/')[:-1]) + '/' + key_uri
key = requests.get(key_uri, headers=headers).content
with open(os.path.join(video_name, 'enc.key'), 'wb') as f:
f.write(key)
if i.startswith('#') or len(i) == 0:
continue
all_videos_url.append(i)
return all_videos_url
async def download_video(filepath, video_url, sem):
async with sem:
for i in range(10):
try:
video_name = video_url.split(r'/')[-1]
async with aiohttp.ClientSession() as session:
async with session.get(video_url, headers=headers, timeout=10) as res:
content = await res.content.read()
async with aiofiles.open(os.path.join(filepath, video_name), 'wb') as f:
await f.write(content)
break
except Exception as e:
if i == 9:
with open(filepath+'_Error.txt', 'a', encoding='utf-8') as f:
f.write(video_url+'\n')
print(f'----- {video_name}下载失败,请求次数达到上限({i+1}次),已写入文件{filepath+"_Error.txt"} -----')
break
print(f'----- {video_name}下载失败,正在重试{i} -----')
print(e)
async def download_all_videos(sem_num, filepath, all_videos_url):
sem = asyncio.Semaphore(sem_num)
tasks = []
for video_url in all_videos_url:
tasks.append(asyncio.create_task(download_video(filepath, video_url, sem)))
await asyncio.wait(tasks)
async def parse_video(video_file, video_name, new_video_name, key):
print(os.path.join(video_name, video_file))
print(os.path.join(new_video_name, video_file))
async with aiofiles.open(os.path.join(video_name, video_file), 'rb') as f1, aiofiles.open(os.path.join(new_video_name, video_file), 'wb') as f2:
content = await f1.read()
aes = AES.new(key=key, mode=AES.MODE_CBC, IV=b'0000000000000000')
new_content = aes.decrypt(content)
await f2.write(new_content)
print(f'------ {video_file}解密成功 ------')
async def parse_all_videos(video_name):
all_file = os.listdir(video_name)
new_video_name = video_name + '_parse'
if 'enc.key' in all_file:
video_files = [i for i in all_file if i.endswith('ts')]
makedir_video(new_video_name)
print('------ 开始解密视频 ------')
with open(os.path.join(video_name, 'enc.key'), 'rb') as f:
key = f.read()
tasks = []
for video_file in video_files:
tasks.append(asyncio.create_task(parse_video(video_file, video_name, new_video_name, key)))
await asyncio.wait(tasks)
print('------ 视频解密完成 ------')
else:
os.rename(video_name, new_video_name)
print('------ 视频无加密 ------')
def merge(video_name):
new_video_name = video_name + '_parse'
m3u8_file = [i for i in os.listdir(video_name) if i.endswith('m3u8')][0]
with open(os.path.join(video_name, m3u8_file)) as f:
video_sort = [i.split('/')[-1].strip() for i in f.readlines() if not i.startswith("#") and len(i) > 0]
n = 1
os.chdir(new_video_name)
tmp = []
for i in range(len(video_sort)):
tmp.append(video_sort[i])
if i != 0 and i % 20 == 0:
cmd = f"copy /b {'+'.join(tmp)} {n}_copy.ts"
os.system(cmd)
tmp = []
n = n + 1
cmd = f"copy /b {'+'.join(tmp)} {n}_copy.ts"
os.system(cmd)
n = n + 1
last_temp = []
for i in range(1, n):
last_temp.append(f"{i}_copy.ts")
cmd = f"copy /b {'+'.join(last_temp)} {video_name}.mp4"
os.system(cmd)
if __name__ == '__main__':
url = 'https://www.66s.cc/e/DownSys/play/?classid=4&id=20778&pathid1=0&bf=0'
m3u8_url, video_name = get_m3u8_url(url)
makedir_video(video_name)
all_videos = get_videos_url(video_name, m3u8_url)
sem_num = 1000
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(download_all_videos(sem_num, video_name, all_videos))
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(parse_all_videos(video_name))
merge(video_name)
情景:女友买的学习视频将在一个月后到期(到期后下载在本地也无法看),让我帮忙把视频下载下来,之前抓过m3u8文件下载过视频切片合成后是一个完整视频,以为这次的任务非常简单~
然鹅,查看一下app信息,已经被加固处理(伪加固)
已经加固了,暂时不考虑脱壳编译
于是开始抓包,我的安卓手机没有root,在抓取某课app时由于 检测到代理导致某课app里面没网络,之前在玩安卓逆向的时候偶然发现
部分APP可以放在容器中,通过抓取容器获得运行APP的抓包数据
也就是用把 xx 安装在 VirtualXposed.
这段时间为了下载网页视频(网课、电影等),接触到了 m3u8 文件。在折腾了一段时间之后,猫君越发喜爱上这个格式的文件。今儿特意将关于 m3u8 格式视频链接获取方式、下载方法,以及相关的进阶操作整理成文。若诸位看官有什么更妙的使用方案,不妨分享一二。
恐怕有些朋友对 m3u8 文件还不甚了解,先作个科普吧——
m3u8 是 HLS 协议的部分内容,而 HLS 又是由苹果公司提出的基于 HTTP 的流媒体网络传输协议。其实现的基本原理是将一个大的媒体文件进行分片,而分片文件资源的路径记录于 m3u8 文
超给力的网站资源分享给你们,娱乐、生活、学习应有尽有,赶紧来看看吧!
千千音乐:http://music.taihe.com/songlist千千音乐是中国音乐门户之一,为你提供海量正版高品质音乐,权威的音乐榜单,快速的新歌速递,契合你的主题电台,人性化的歌曲搜索,让你更快地找到喜爱的音乐。
6V电影网:http://www.hao6v.com/每×××集互联网最新电影和电视剧,为使用迅雷软件的...
《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf《Python网络爬虫技术案例教程》PPT课件(共10单元)七单元爬取APP和PC客户端数据.pdf
python爬虫相关:
由于很多网站上的视频只提供在线观看,没有下载入口,故有必要进行网络爬虫获取视频资源。
利用requests获取网页源代码中的m3u8链接,对链接进行逐步解析,获取ts列表,下载所有ts文件,将其合并生成mp4文件。做到对视频的爬取。
同名博文相关代码。
不知道大家平常喜不喜欢待在宿舍一个人看电影?
作为一个高龄屌丝,电影对我来说是必不可少的。平常无聊时自己一个人待在宿舍看看电影,看看书。
(人闲下来就会胡思乱想,不能让寂寞侵蚀自己的内心)
其实还是喜欢和朋友一起去看电影的,更有气氛,有感觉点。可惜几个朋友都和我一样是钢铁男,极其尴尬(⊙o⊙)!
哈哈哈。。。
所以呢,我都是自己在网上找些电影资源来消遣时间,像电影天堂、6V电影网等等(你们有什...
对于爬虫反爬机制m3u8视频流的解决方案,有以下几种:
1. 使用代理:使用代理服务器可以隐藏真实IP地址,从而避免被网站识别为爬虫。同时,可以使用多个代理轮流访问目标网站,以降低被封禁的风险。
2. 随机UA:修改User-Agent可以模拟不同的浏览器访问目标网站,从而避免被识别为爬虫。可以使用随机UA的方式来增加访问的随机性。
3. 模拟人类行为:通过模拟人类的浏览行为,如随机点击、滑动、停留等,可以更好地隐藏爬虫的特征,以降低被封禁的风险。
4. 解密m3u8链接:有些网站会对m3u8视频流进行加密,需要先解密才能进行下载。可以使用相关工具对加密的链接进行解密。
希望这些解决方案能够帮助您解决爬虫反爬机制m3u8视频流的问题。