#!/usr/bin/python # coding: utf-8 from ConfigParser import RawConfigParser, NoOptionError, NoSectionError from os.path import dirname, splitext, basename, isfile from os import devnull from subprocess import check_call, CalledProcessError, STDOUT import unicodedata import logging ### CATEGORIES ### YOUTUBE_CATEGORY = { "music": 10, "films": 1, "vehicles": 2, "sport": 17, "travels": 19, "gaming": 20, "people": 22, "comedy": 23, "entertainment": 24, "news": 25, "how to": 26, "education": 27, "activism": 29, "science & technology": 28, "science": 28, "technology": 28, "animals": 15 } PEERTUBE_CATEGORY = { "music": 1, "films": 2, "vehicles": 3, "sport": 5, "travels": 6, "gaming": 7, "people": 8, "comedy": 9, "entertainment": 10, "news": 11, "how to": 12, "education": 13, "activism": 14, "science & technology": 15, "science": 15, "technology": 15, "animals": 16 } ### LANGUAGES ### YOUTUBE_LANGUAGE = { "arabic": 'ar', "english": 'en', "french": 'fr', "german": 'de', "hindi": 'hi', "italian": 'it', "japanese": 'ja', "korean": 'ko', "mandarin": 'zh-CN', "portuguese": 'pt-PT', "punjabi": 'pa', "russian": 'ru', "spanish": 'es' } PEERTUBE_LANGUAGE = { "arabic": "ar", "english": "en", "french": "fr", "german": "de", "hindi": "hi", "italian": "it", "japanese": "ja", "korean": "ko", "mandarin": "zh", "portuguese": "pt", "punjabi": "pa", "russian": "ru", "spanish": "es" } ###################### def getCategory(category, platform): if platform == "youtube": return YOUTUBE_CATEGORY[category.lower()] else: return PEERTUBE_CATEGORY[category.lower()] def getLanguage(language, platform): if platform == "youtube": return YOUTUBE_LANGUAGE[language.lower()] else: return PEERTUBE_LANGUAGE[language.lower()] # return the nfo as a RawConfigParser object def loadNFO(options): video_directory = dirname(options.get('--file')) + "/" if options.get('--nfo'): try: logging.info("Using " + options.get('--nfo') + " as NFO, loading...") if isfile(options.get('--nfo')): nfo = RawConfigParser() nfo.read(options.get('--nfo')) return nfo else: logging.error("Given NFO file does not exist, please check your path.") exit(1) except Exception as e: logging.error("Problem with NFO file: " + str(e)) exit(1) else: if options.get('--name'): nfo_file = video_directory + options.get('--name') + ".txt" print nfo_file if isfile(nfo_file): try: logging.info("Using " + nfo_file + " as NFO, loading...") nfo = RawConfigParser() nfo.read(nfo_file) return nfo except Exception as e: logging.error("Problem with NFO file: " + str(e)) exit(1) # if --nfo and --name does not exist, use --file as default video_file = splitext(basename(options.get('--file')))[0] nfo_file = video_directory + video_file + ".txt" if isfile(nfo_file): try: logging.info("Using " + nfo_file + " as NFO, loading...") nfo = RawConfigParser() nfo.read(nfo_file) return nfo except Exception as e: logging.error("Problem with nfo file: " + str(e)) exit(1) logging.info("No suitable NFO found, skipping.") return False def parseNFO(options): nfo = loadNFO(options) if nfo: # We need to check all options and replace it with the nfo value if not defined (None or False) for key, value in options.iteritems(): key = key.replace("-", "") try: # get string options if value is None and nfo.get('video', key): options['--' + key] = nfo.get('video', key) # get boolean options elif value is False and nfo.getboolean('video', key): options['--' + key] = nfo.getboolean('video', key) except NoOptionError: continue except NoSectionError: logging.error("Given NFO file miss section [video], please check syntax of your NFO.") exit(1) return options def upcaseFirstLetter(s): return s[0].upper() + s[1:] def publishAt(publishAt, oauth, url, idvideo, secret): try: FNULL = open(devnull, 'w') check_call(["at", "-V"], stdout=FNULL, stderr=STDOUT) except CalledProcessError: logging.error("You need to install the atd daemon to use the publishAt option.") exit(1) try: FNULL = open(devnull, 'w') check_call(["curl", "-V"], stdout=FNULL, stderr=STDOUT) except CalledProcessError: logging.error("You need to install the curl command line to use the publishAt option.") exit(1) try: FNULL = open(devnull, 'w') check_call(["jq", "-V"], stdout=FNULL, stderr=STDOUT) except CalledProcessError: logging.error("You need to install the jq command line to use the publishAt option.") exit(1) time = publishAt.split("T") # Remove leading seconds that atd does not manage if time[1].count(":") == 2: time[1] = time[1][:-3] atTime = time[1] + " " + time[0] refresh_token=str(oauth.__dict__['_client'].__dict__['refresh_token']) atFile = "/tmp/peertube_" + idvideo + "_" + publishAt + ".at" try: openfile = open(atFile,"w") openfile.write('token=$(curl -X POST -d "client_id=' + str(secret.get('peertube', 'client_id')) + '&client_secret=' + str(secret.get('peertube', 'client_secret')) + '&grant_type=refresh_token&refresh_token=' + str(refresh_token) + '" "' + url + '/api/v1/users/token" | jq -r .access_token)') openfile.write("\n") openfile.write('curl "' + url + '/api/v1/videos/' + idvideo + '" -X PUT -H "Authorization: Bearer ${token}"' + ' -H "Content-Type: multipart/form-data" -F "privacy=1"') openfile.write("\n ") # atd needs an empty line at the end of the file to load... openfile.close() except Exception as e: if hasattr(e, 'message'): logging.error("Error: " + str(e.message)) else: logging.error("Error: " + str(e)) try: FNULL = open(devnull, 'w') check_call(["at", "-M", "-f", atFile, atTime], stdout=FNULL, stderr=STDOUT) except Exception as e: if hasattr(e, 'message'): logging.error("Error: " + str(e.message)) else: logging.error("Error: " + str(e)) def mastodonTag(tag): tags = tag.split(' ') mtag = '' for s in tags: if s == '': continue strtag = unicodedata.normalize('NFKD', unicode (s, 'utf-8')).encode('ASCII', 'ignore') strtag = ''.join(e for e in strtag if e.isalnum()) strtag = upcaseFirstLetter(strtag) mtag = mtag + strtag return mtag