#!/usr/bin/python
|
|
# coding: utf-8
|
|
|
|
from ConfigParser import RawConfigParser, NoOptionError, NoSectionError
|
|
from os.path import dirname, splitext, basename, isfile
|
|
from os import devnull
|
|
from subprocess import check_call, CalledProcessError, STDOUT
|
|
import unicodedata
|
|
import logging
|
|
|
|
### CATEGORIES ###
|
|
YOUTUBE_CATEGORY = {
|
|
"music": 10,
|
|
"films": 1,
|
|
"vehicles": 2,
|
|
"sport": 17,
|
|
"travels": 19,
|
|
"gaming": 20,
|
|
"people": 22,
|
|
"comedy": 23,
|
|
"entertainment": 24,
|
|
"news": 25,
|
|
"how to": 26,
|
|
"education": 27,
|
|
"activism": 29,
|
|
"science & technology": 28,
|
|
"science": 28,
|
|
"technology": 28,
|
|
"animals": 15
|
|
}
|
|
|
|
PEERTUBE_CATEGORY = {
|
|
"music": 1,
|
|
"films": 2,
|
|
"vehicles": 3,
|
|
"sport": 5,
|
|
"travels": 6,
|
|
"gaming": 7,
|
|
"people": 8,
|
|
"comedy": 9,
|
|
"entertainment": 10,
|
|
"news": 11,
|
|
"how to": 12,
|
|
"education": 13,
|
|
"activism": 14,
|
|
"science & technology": 15,
|
|
"science": 15,
|
|
"technology": 15,
|
|
"animals": 16
|
|
}
|
|
|
|
### LANGUAGES ###
|
|
YOUTUBE_LANGUAGE = {
|
|
"arabic": 'ar',
|
|
"english": 'en',
|
|
"french": 'fr',
|
|
"german": 'de',
|
|
"hindi": 'hi',
|
|
"italian": 'it',
|
|
"japanese": 'ja',
|
|
"korean": 'ko',
|
|
"mandarin": 'zh-CN',
|
|
"portuguese": 'pt-PT',
|
|
"punjabi": 'pa',
|
|
"russian": 'ru',
|
|
"spanish": 'es'
|
|
}
|
|
|
|
PEERTUBE_LANGUAGE = {
|
|
"arabic": "ar",
|
|
"english": "en",
|
|
"french": "fr",
|
|
"german": "de",
|
|
"hindi": "hi",
|
|
"italian": "it",
|
|
"japanese": "ja",
|
|
"korean": "ko",
|
|
"mandarin": "zh",
|
|
"portuguese": "pt",
|
|
"punjabi": "pa",
|
|
"russian": "ru",
|
|
"spanish": "es"
|
|
}
|
|
######################
|
|
|
|
|
|
def getCategory(category, platform):
|
|
if platform == "youtube":
|
|
return YOUTUBE_CATEGORY[category.lower()]
|
|
else:
|
|
return PEERTUBE_CATEGORY[category.lower()]
|
|
|
|
|
|
def getLanguage(language, platform):
|
|
if platform == "youtube":
|
|
return YOUTUBE_LANGUAGE[language.lower()]
|
|
else:
|
|
return PEERTUBE_LANGUAGE[language.lower()]
|
|
|
|
|
|
# return the nfo as a RawConfigParser object
|
|
def loadNFO(options):
|
|
video_directory = dirname(options.get('--file')) + "/"
|
|
if options.get('--nfo'):
|
|
try:
|
|
logging.info("Using " + options.get('--nfo') + " as NFO, loading...")
|
|
if isfile(options.get('--nfo')):
|
|
nfo = RawConfigParser()
|
|
nfo.read(options.get('--nfo'))
|
|
return nfo
|
|
else:
|
|
logging.error("Given NFO file does not exist, please check your path.")
|
|
exit(1)
|
|
except Exception as e:
|
|
logging.error("Problem with NFO file: " + str(e))
|
|
exit(1)
|
|
else:
|
|
if options.get('--name'):
|
|
nfo_file = video_directory + options.get('--name') + ".txt"
|
|
print nfo_file
|
|
if isfile(nfo_file):
|
|
try:
|
|
logging.info("Using " + nfo_file + " as NFO, loading...")
|
|
nfo = RawConfigParser()
|
|
nfo.read(nfo_file)
|
|
return nfo
|
|
except Exception as e:
|
|
logging.error("Problem with NFO file: " + str(e))
|
|
exit(1)
|
|
|
|
# if --nfo and --name does not exist, use --file as default
|
|
video_file = splitext(basename(options.get('--file')))[0]
|
|
nfo_file = video_directory + video_file + ".txt"
|
|
if isfile(nfo_file):
|
|
try:
|
|
logging.info("Using " + nfo_file + " as NFO, loading...")
|
|
nfo = RawConfigParser()
|
|
nfo.read(nfo_file)
|
|
return nfo
|
|
except Exception as e:
|
|
logging.error("Problem with nfo file: " + str(e))
|
|
exit(1)
|
|
logging.info("No suitable NFO found, skipping.")
|
|
return False
|
|
|
|
|
|
def parseNFO(options):
|
|
nfo = loadNFO(options)
|
|
if nfo:
|
|
# We need to check all options and replace it with the nfo value if not defined (None or False)
|
|
for key, value in options.iteritems():
|
|
key = key.replace("-", "")
|
|
try:
|
|
# get string options
|
|
if value is None and nfo.get('video', key):
|
|
options['--' + key] = nfo.get('video', key)
|
|
# get boolean options
|
|
elif value is False and nfo.getboolean('video', key):
|
|
options['--' + key] = nfo.getboolean('video', key)
|
|
except NoOptionError:
|
|
continue
|
|
except NoSectionError:
|
|
logging.error("Given NFO file miss section [video], please check syntax of your NFO.")
|
|
exit(1)
|
|
return options
|
|
|
|
|
|
def upcaseFirstLetter(s):
|
|
return s[0].upper() + s[1:]
|
|
|
|
|
|
def publishAt(publishAt, oauth, url, idvideo, secret):
|
|
try:
|
|
FNULL = open(devnull, 'w')
|
|
check_call(["at", "-V"], stdout=FNULL, stderr=STDOUT)
|
|
except CalledProcessError:
|
|
logging.error("You need to install the atd daemon to use the publishAt option.")
|
|
exit(1)
|
|
try:
|
|
FNULL = open(devnull, 'w')
|
|
check_call(["curl", "-V"], stdout=FNULL, stderr=STDOUT)
|
|
except CalledProcessError:
|
|
logging.error("You need to install the curl command line to use the publishAt option.")
|
|
exit(1)
|
|
try:
|
|
FNULL = open(devnull, 'w')
|
|
check_call(["jq", "-V"], stdout=FNULL, stderr=STDOUT)
|
|
except CalledProcessError:
|
|
logging.error("You need to install the jq command line to use the publishAt option.")
|
|
exit(1)
|
|
time = publishAt.split("T")
|
|
# Remove leading seconds that atd does not manage
|
|
if time[1].count(":") == 2:
|
|
time[1] = time[1][:-3]
|
|
|
|
atTime = time[1] + " " + time[0]
|
|
refresh_token=str(oauth.__dict__['_client'].__dict__['refresh_token'])
|
|
atFile = "/tmp/peertube_" + idvideo + "_" + publishAt + ".at"
|
|
try:
|
|
openfile = open(atFile,"w")
|
|
openfile.write('token=$(curl -X POST -d "client_id=' + str(secret.get('peertube', 'client_id')) +
|
|
'&client_secret=' + str(secret.get('peertube', 'client_secret')) +
|
|
'&grant_type=refresh_token&refresh_token=' + str(refresh_token) +
|
|
'" "' + url + '/api/v1/users/token" | jq -r .access_token)')
|
|
openfile.write("\n")
|
|
openfile.write('curl "' + url + '/api/v1/videos/' + idvideo +
|
|
'" -X PUT -H "Authorization: Bearer ${token}"' +
|
|
' -H "Content-Type: multipart/form-data" -F "privacy=1"')
|
|
openfile.write("\n ") # atd needs an empty line at the end of the file to load...
|
|
openfile.close()
|
|
except Exception as e:
|
|
if hasattr(e, 'message'):
|
|
logging.error("Error: " + str(e.message))
|
|
else:
|
|
logging.error("Error: " + str(e))
|
|
|
|
try:
|
|
FNULL = open(devnull, 'w')
|
|
check_call(["at", "-M", "-f", atFile, atTime], stdout=FNULL, stderr=STDOUT)
|
|
except Exception as e:
|
|
if hasattr(e, 'message'):
|
|
logging.error("Error: " + str(e.message))
|
|
else:
|
|
logging.error("Error: " + str(e))
|
|
|
|
|
|
def mastodonTag(tag):
|
|
tags = tag.split(' ')
|
|
mtag = ''
|
|
for s in tags:
|
|
if s == '':
|
|
continue
|
|
strtag = unicodedata.normalize('NFKD', unicode (s, 'utf-8')).encode('ASCII', 'ignore')
|
|
strtag = ''.join(e for e in strtag if e.isalnum())
|
|
strtag = upcaseFirstLetter(strtag)
|
|
mtag = mtag + strtag
|
|
|
|
return mtag
|