diff --git a/advisors/check_inactive_repo.rb b/advisors/check_inactive_repo.rb deleted file mode 100755 index 6a6792d57f5057e78fbbf698a4544732151134c2..0000000000000000000000000000000000000000 --- a/advisors/check_inactive_repo.rb +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'set' -require 'optparse' - -require './helper/download_spec' -require './gitee/advisor' - -INACTIVE_THRESHOLD = 3 - -options = {} -OptionParser.new do |opts| - opts.banner = "Usage: check_inactive_repo.rb [options]" - opts.on("-p", "--push", "Push the advise to gitee.com/openeuler") do |v| - options[:push] = v - end - opts.on("-r", "--repo REPO_NAME", "Repo to check upstream info") do |n| - puts "Checking #{n}" - options[:repo] = n - end - opts.on("-h", "--help", "Prints this help") do - puts opts - exit - end -end.parse! - -if not options[:repo] then - puts "Missing repo name\n" - exit 1 -end - -cmd = "git ls-remote https://gitee.com/openeuler/#{options[:repo]}/" -refs = %x[#{cmd}] -merge_count = 0 -refs.each_line { |line| - if line.match(/\/pull\/(\d*)\/MERGE/) then - merge_count = merge_count + 1 - end - puts line -} -if merge_count < INACTIVE_THRESHOLD then - if options[:push] then - ad = Advisor.new - ad.new_issue("openeuler", options[:repo], - "Inactive repository", - "Dear #{options[:repo]} developer:\n亲爱的 #{options[:repo]} 开发者:\n\n We found this repository has not fulfill what it prupose to be.\n我们发现这个代码仓并没有承载它被期望的功能。\n\n Long time no progress will discourge other developers to follow and participant this initiative.\n长期没有代码会使得关注这个项目的开发者失望。\n\n Please start submit something as soon as possible.\n建议您尽快向代码仓提交进展。\n\n This is a automatic advise from openEuler-Advisor. If you think the advise is not correct, please fill an issue at https\:\/\/gitee.com\/openeuler\/openEuler-Advisor to help us improve.\n这是一条由 openEuler-Advisor 自动生成的建议。如果您认为这个建议不对,请访问 https\:\/\/gitee.com\/openeuler\/openEuler-Advisor 来帮助我们改进。\n\n Yours openEuler Advisor.") - else - puts "#{options[:repo]} is not active. But we keep it between us" - end -else - puts "#{options[:repo]} is active and good." -end - diff --git a/advisors/check_upgradable.rb b/advisors/check_upgradable.rb deleted file mode 100755 index 05392b248f437d0e857cff20fed8a162a42c27e0..0000000000000000000000000000000000000000 --- a/advisors/check_upgradable.rb +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' -require 'optparse' - -require './check_upstream/github' -require './check_upstream/git' -require './check_upstream/hg' -require './check_upstream/svn' -require './check_upstream/metacpan' -require './check_upstream/gnome' -require './check_upstream/pypi' -require './helper/download_spec' -require './helper/rpmparser' -require './gitee/advisor' - -options = {} - -OptionParser.new do |opts| - opts.banner = "Usage: check_upgradable.rb [options]" - opts.on("-p", "--push", "Push the advise to gitee.com/src-openeuler") do |v| - options[:push] = v - end - opts.on("-r", "--repo REPO_NAME", "Repo to check upstream info") do |n| - puts "Checking #{n}" - options[:repo] = n - end - opts.on("-h", "--help", "Prints this help") do - puts opts - exit - end -end.parse! - -if not options[:repo] then - puts "Missing repo name\n" - exit 1 -end - -Prj_name = options[:repo] -specfile=download_spec(Prj_name) -if specfile == "" then - puts "no specfile found for project\n" - exit 1 -end -spec_struct = Specfile.new(specfile) -Cur_ver = spec_struct.get_version - -Prj_info = YAML.load(File.read "upstream-info/"+Prj_name+".yaml") - -def compare_tags (a, b) - arr_a = a.split(".") - arr_b = b.split(".") - len = [arr_a.length, arr_b.length].min - idx = 0 - while idx < len do - res1 = arr_a[idx].to_i <=> arr_b[idx].to_i - return res1 if res1 != 0 - res2 = arr_a[idx].length <=> arr_b[idx].length - return -res2 if res2 != 0 - res3 = arr_a[idx][-1].to_i <=> arr_b[idx][-1].to_i - return res3 if res3 != 0 - idx = idx + 1 - end - return arr_a.length <=> arr_b.length -end - -def sort_tags (tags) - tags.sort! { |a, b| - compare_tags(a,b) - } - return tags -end - -def clean_tags(tags) - new_tags = [] - tags.each{|line| - new_tags = new_tags.append clean_tag(line, Prj_info) - } - return new_tags -end - -def upgrade_recommend(tags_param, cur_tag, policy) - tags = tags_param.reverse - tag1 = cur_tag - tag2 = cur_tag - if policy == "latest" then - return tags[0] - elsif policy == "latest-stable" then - tags.each { |tag| - if tag.split(".").count {|f| f.to_i != 0 } >= 3 then - tag1 = tag - break - end - } - tags.each { |tag| - if tag.split(".").count {|f| f.to_i != 0} >= 2 then - tag2 = tag - break - end - } - if tag2[0].to_i > tag1[0].to_i then - return tag2 - else - return tag1 - end - elsif policy == "perfer-stable" then - tags.each { |tag| - if tag.start_with?(cur_tag) then - return tag - end - } - if cur_tag.split(".").length >= 3 then - search_tag = cur_tag.split(".")[0..1].join(".") - tags.each { |tag| - if tag.start_with?(search_tag) then - return tag - end - } - end - return cur_tag - else - return cur_tag - end - -end - -print Prj_name, ":\n" - -if Prj_info["version_control"] == "svn" then - tags = check_upstream_svn(Prj_info) -elsif Prj_info["version_control"] == "github" then - tags = check_upstream_github_by_api(Prj_info) - if tags == nil or tags == "" then - tags = check_upstream_github_by_git(Prj_info) - end - tags = clean_tags(tags.lines) -elsif Prj_info["version_control"] == "git" then - tags = check_upstream_git(Prj_info) - tags = clean_tags(tags.lines) -elsif Prj_info["version_control"] == "hg" then - tags = check_upstream_hg(Prj_info) - tags = clean_tags(tags.lines) -elsif Prj_info["version_control"] == "metacpan" then - tags = check_upstream_metacpan(Prj_info) - tags = clean_tags(tags.lines) -elsif Prj_info["version_control"] == "gitlab.gnome" then - tags = check_upstream_gnome(Prj_info) - tags = clean_tags(tags.lines) -elsif Prj_info["version_control"] == "pypi" then - tags = check_upstream_pypi(Prj_info) - tags = clean_tags(tags.lines) -end - -tags = sort_tags(tags) -print "Latest upstream is ", tags[-1], "\n" -#print "Recommended is ", upgrade_recommend(tags, Cur_ver, "latest-stable"), "\n" -print "Current version is ", Cur_ver, "\n" - -puts "This package has #{spec_struct.get_diverse} patches" - -if tags.length == 0 or compare_tags(tags[-1], Cur_ver) < 0 then - STDERR.puts "DEBUG #{Prj_name} > tags are #{tags}" - File.delete("upstream-info/"+Prj_name+".yaml") if File.exist?("upstream-info/"+Prj_name+".yaml") - File.open("known-issues/"+Prj_name+".yaml", "w") { |file| file.write(Prj_info.to_yaml) } -else - File.open("upstream-info/"+Prj_name+".yaml", "w") { |file| file.write(Prj_info.to_yaml) } -end -File.delete(specfile) if specfile != "" - -if options[:push] then - puts "Push to gitee\n" - ad = Advisor.new - ad.new_issue("src-openeuler", Prj_name, "Upgrade to Latest Release", "Dear #{Prj_name} maintainer:\n\n We found the latst version of #{Prj_name} is #{tags[-1]}, while the current version in openEuler is #{Cur_ver}.\n\n Please consider upgrading.\n\n\nYours openEuler Advisor.") -else - puts "keep it to us\n" -end diff --git a/advisors/check_upstream.py b/advisors/check_upstream.py new file mode 100755 index 0000000000000000000000000000000000000000..a1c84700ccae4268ffebdf047368b44996aa4c7b --- /dev/null +++ b/advisors/check_upstream.py @@ -0,0 +1,262 @@ +#!/usr/bin/python3 + +import http.cookiejar +import urllib.request +import re +import yaml +import json +import sys +import subprocess +import requests + +from urllib.parse import urljoin +from datetime import datetime + +time_format = "%Y-%m-%dT%H:%M:%S%z" + +def eprint(*args, **kwargs): + print("DEBUG: ", *args, file=sys.stderr, **kwargs) + +def load_last_query_result(info, force_reload=False): + if force_reload: + last_query = info.pop("last_query") + eprint("{repo} > Force reload".format(repo=info["src_repo"])) + return "" + else: + if "last_query" in info.keys(): + last_query = info.pop("last_query") + #age = datetime.now() - datetime.strptime(last_query["time_stamp"], time_format) + age = datetime.now() - last_query["time_stamp"].replace(tzinfo=None) + if age.days < 7: + eprint("{repo} > Reuse Last Query".format(repo=info["src_repo"])) + return last_query["raw_data"] + else: + eprint("{repo} > Last Query Too Old.".format(repo=info["src_repo"])) + return "" + else: + return "" + +def clean_tags(tags, info): + + if info.get("tag_pattern", "") != "" and info.get("tag_pattern", "") is not None: + pattern_regex = re.compile(info["tag_pattern"]) + result_list = [pattern_regex.sub("\\1", x) for x in tags] + elif info.get("tag_prefix", "") != "" and info.get("tag_prefix", "") is not None: + prefix_regex = re.compile(info["tag_prefix"]) + result_list = [prefix_regex.sub("", x) for x in tags] + else: + result_list = tags + + if info.get("seperator", ".") != "." and info.get("seperator", ".") is not None: + seperator_regex = re.compile(info["seperator"]) + result_list = [seperator_regex.sub(".", x) for x in result_list] + + result_list = [x for x in result_list if x[0].isdigit()] + + return result_list + + +def dirty_redirect_tricks(url, resp): + cookie = set() + href = "" + need_trick = False + for line in resp.splitlines(): + line = line.strip() + if line.startswith("Redirecting"): + eprint("Redirecting with document.cookie") + need_trick = True + m = re.search(r"document\.cookie=\"(.*)\";", line) + if m: + cookie = cookie | set(m.group(1).split(';')) + m = re.search(r"document\.location\.href=\"(.*)\";", line) + if m: + href = m.group(1) + new_url = urljoin(url, href) + if "" in cookie: cookie.remove("") + return need_trick, new_url, list(cookie) + + +def check_hg(info): + resp = load_last_query_result(info) + if resp == "": + headers = { + 'User-Agent' : 'Mozilla/5.0 (X11; Linux x86_64)' + } + url = urljoin(info["src_repo"] + "/", "json-tags") + resp = requests.get(url, headers=headers) + resp = resp.text + need_trick, url, cookie = dirty_redirect_tricks(url, resp) + if need_trick: + # I dont want to introduce another dependency on requests + # but urllib handling cookie is outragely complex + c_dict = {} + for c in cookie: + k, v = c.split('=') + c_dict[k] = v + resp = requests.get(url, headers=headers, cookies=c_dict) + resp = resp.text + + last_query = {} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + # try and except ? + tags_json = json.loads(resp) + sort_tags = tags_json["tags"] + sort_tags.sort(reverse=True, key=lambda x: x['date'][0]) + result_list = [tag['tag'] for tag in sort_tags] + result_list = clean_tags(result_list, info) + return result_list + +def check_metacpan(info): + resp = load_last_query_result(info) + if resp == "": + headers = { + 'User-Agent' : 'Mozilla/5.0 (X11; Linux x86_64)' + } + url = urljoin("https://fastapi.metacpan.org/release/", info["src_repo"]) + resp = requests.get(url, headers=headers) + resp = resp.text + + tags = [] + result_json = json.loads(resp) + if result_json != {}: + if "version" not in result_json.keys(): + eprint("{repo} > ERROR FOUND".format(repo=info["src_repo"])) + sys.exit(1) + else: + tags.append(result_json["version"]) + else: + eprint("{repo} found unsorted on cpan.metacpan.org".format(repo=info["src_repo"])) + sys.exit(1) + + last_query = {} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + return tags + +def check_pypi(info): + resp = load_last_query_result(info) + tags = [] + if resp == "": + headers = { + 'User-Agent' : 'Mozilla/5.0 (X11; Linux x86_64)' + } + url = urljoin("https://pypi.org/pypi/", info["src_repo"] + "/json") + resp = requests.get(url, headers=headers) + resp = resp.text + + result_json = json.loads(resp) + if result_json != {}: + tags.append(result_json["info"]["version"]) + else: + eprint("{repo} > No Response or JSON parse failed".format(repo=info["src_repo"])) + sys.exit(1) + return tags + +def __check_subprocess(cmd_list): + subp = subprocess.Popen(cmd_list, stdout=subprocess.PIPE) + resp = subp.stdout.read().decode("utf-8") + if subp.wait() != 0: + eprint("{cmd} > encount errors".format(cmd=" ".join(cmd_list))) + sys.exit(1) + return resp + +def __check_svn_helper(repo_url): + eprint("{repo} > Using svn ls".format(repo=repo_url)) + cmd_list = ["/usr/bin/svn", "ls", "-v", repo_url] + return __check_subprocess(cmd_list) + +def __check_git_helper(repo_url): + eprint("{repo} > Using git ls-remote".format(repo=repo_url)) + cmd_list = ["git", "ls-remote", "--tags", repo_url] + return __check_subprocess(cmd_list) + +def __svn_resp_to_tags(resp): + tags = [] + for line in resp.splitlines(): + items = line.split() + for item in items: + if item[-1] == "/": + tags.append(item[:-1]) + break + return tags + +def __git_resp_to_tags(resp): + tags = [] + pattern = re.compile(r"^([^ \t]*)[ \t]*refs\/tags\/([^ \t]*)") + for line in resp.splitlines(): + m = pattern.match(line) + if m: + tag = m.group(2) + if not tag.endswith("^{}"): + tags.append(tag) + return tags + +def check_git (info): + resp = load_last_query_result(info) + if resp == "": + resp = __check_git_helper(info["src_repo"]) + last_query={} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + + tags = __git_resp_to_tags(resp) + tags = clean_tags(tags, info) + + return tags + +def check_github(info): + resp = load_last_query_result(info) + if info.get("query_type", "git-ls") != "git-ls": + resp = "" + + repo_url = "https://github.com/" + info["src_repo"] + ".git" + + if resp == "": + resp = __check_git_helper(repo_url) + last_query = {} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + info["query_type"] = "git-ls" + + tags = __git_resp_to_tags(resp) + tags = clean_tags(tags, info) + return tags + +def check_gnome(info): + resp = load_last_query_result(info) + repo_url = "https://gitlab.gnome.org/GNOME/"+info["src_repo"]+".git" + + if resp == "": + resp = __check_git_helper(repo_url) + last_query={} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + + tags = __git_resp_to_tags(resp) + tags = clean_tags(tags, info) + return tags + +def check_svn(info): + resp = load_last_query_result(info) + repo_url = info["src_repo"] + "/tags" + if resp == "": + resp = __check_svn_helper(repo_url) + last_query = {} + last_query["time_stamp"] = datetime.now() + last_query["raw_data"] = resp + info["last_query"] = last_query + + tags = __svn_resp_to_tags(resp) + tags = clean_tags(tags, info) + return tags + + +if __name__ == "__main__": + pass + diff --git a/advisors/check_upstream/common.rb b/advisors/check_upstream/common.rb deleted file mode 100755 index 4ff8752d7b0e09652c105654ee2d6e5dffb607bd..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/common.rb +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' - -def compare_tags (a, b) - arr_a = a.split(".") - arr_b = b.split(".") - len = [arr_a.length, arr_b.length].min - idx = 0 - while idx < len do - res1 = arr_a[idx].to_i <=> arr_b[idx].to_i - return res1 if res1 != 0 - res2 = arr_a[idx].length <=> arr_b[idx].length - return -res2 if res2 != 0 - res3 = arr_a[idx][-1].to_i <=> arr_b[idx][-1].to_i - return res3 if res3 != 0 - idx = idx + 1 - end - return arr_a.length <=> arr_b.length -end - -def clean_tag(tag, prj_info) - if prj_info.has_key?("tag_pattern") then - tag = tag.gsub(Regexp.new(prj_info["tag_pattern"]), "\\1") - elsif prj_info.has_key?("tag_prefix") then - tag = tag.gsub(Regexp.new(prj_info["tag_prefix"]), "") - end - if prj_info.has_key?("seperator") and prj_info["seperator"] != "." then - tag = tag.gsub(Regexp.new(prj_info["seperator"]), ".") - end - return tag.gsub("\n", "") -end - -def sort_tags (tags) - tags.sort! { |a, b| - compare_tags(a,b) - } - return tags -end - -def upgrade_recommend(tags, cur_tag, policy) - tags.reverse! - tag1 = cur_tag - tag2 = cur_tag - if policy == "latest" then - return tags[0] - elsif policy == "latest-stable" then - tags.each { |tag| - if tag.split(".").count {|f| f.to_i != 0 } >= 3 then - tag1 = tag - break - end - } - tags.each { |tag| - if tag.split(".").count {|f| f.to_i != 0} >= 2 then - tag2 = tag - break - end - } - if tag2[0].to_i > tag1[0].to_i then - return tag2 - else - return tag1 - end - elsif policy == "perfer-stable" then - tags.each { |tag| - if tag.start_with?(cur_tag) then - return tag - end - } - if cur_tag.split(".").length >= 3 then - search_tag = cur_tag.split(".")[0..1].join(".") - tags.each { |tag| - if tag.start_with?(search_tag) then - return tag - end - } - end - return cur_tag - else - return cur_tag - end - -end - -def load_last_query_result(prj_info, force_reload=false) - if force_reload == true then - prj_info.delete("last_query") - STDERR.puts "DEBUG: #{prj_info["src_repo"].gsub("\n", "")} > Force Reload\n" - return "" - else - if prj_info.has_key?("last_query") then - last_query = prj_info["last_query"] - if Time.now - last_query["time_stamp"] < 60*60*24*3 then - STDERR.puts "DEBUG: #{prj_info["src_repo"].gsub("\n", "")} > Reuse Last Query\n" - return last_query["raw_data"].dup - else - prj_info.delete("last_query") - STDERR.puts "DEBUG: #{prj_info["src_repo"].gsub("\n", "")} > Last Query Too Old.\n" - return "" - end - else - return "" - end - end -end - -def resp_to_git_tags(resp) - tags = "" - resp.each_line { |line| - if line.match(/refs\/tags/) then - match = line.scan(/^([^ \t]*)[ \t]*refs\/tags\/([^ \t]*)\n/) - if match != nil then - tags = tags + match[0][1].to_s + "\n" - end - end - } - return tags -end diff --git a/advisors/check_upstream/git.rb b/advisors/check_upstream/git.rb deleted file mode 100755 index 0e663f20ab3bfd4fc969dbdfd4fc74f95ca9cf86..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/git.rb +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'date' -require_relative 'common' - -def check_upstream_git (prj_info) - resp = load_last_query_result(prj_info) - cmd="git ls-remote --tags "+prj_info["src_repo"] - if resp == "" then - resp=%x[#{cmd}] - last_query={} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - end - tags = resp_to_git_tags(resp) - return tags -end - diff --git a/advisors/check_upstream/github.rb b/advisors/check_upstream/github.rb deleted file mode 100755 index c05d40be0301392fe40136c035de4c282d7f1bc6..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/github.rb +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' -require_relative 'common' - -def check_upstream_github_by_api (prj_info) - cmd="/usr/bin/curl -m 60 -s https://api.github.com/repos/"+prj_info["src_repo"]+"/releases" - resp = load_last_query_result(prj_info) - if resp == "" then - STDERR.puts "DEBUG #{prj_info["src_repo"]} > Using api.github to get releases" - begin - retries ||= 0 - resp=%x[#{cmd}] - release = JSON.parse(resp) - rescue - STDERR.puts "DEBUG #{prj_info["src_repo"]} > No Response or JSON Parse failed. Retry in 3 seconds.\n" - sleep 3 - retry if (retries+=1) < 10 - end - if release != [] and release != nil then - last_query = {} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - prj_info["query_type"] = "api.github.releases" - else - # fall back to tags - STDERR.puts "DEBUG #{prj_info["src_repo"]} > Using api.github to get tags" - resp="" - cmd="/usr/bin/curl -m 60 -s https://api.github.com/repos/"+prj_info["src_repo"]+"/tags" - tags=[] - begin - retries ||= 0 - resp=%x[#{cmd}] - tags=JSON.parse(resp) - rescue - STDERR.puts "DEBUG #{prj_info["src_repo"]} > No Response or JSON Parse failed. Retry in 3 seconds.\n" - sleep 3 - retry if (retries += 1) < 10 - end - if tags == [] or tags == nil then - print "WARNING: #{prj_info["src_repo"]}'s upstream version not available~" - return "" - else - last_query = {} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - prj_info["query_type"] = "api.github.tags" - end - end - end - - if prj_info["query_type"] == "api.github.releases" then - result = "" - begin - release = JSON.parse(resp) - release.sort_by! { |e| e["created_at"]} - release.each { |r| - result = result + clean_tag(r["tag_name"], prj_info) + "\n" - } - rescue - end - return result - elsif prj_info["query_type"] == "api.github.tags" then - result = "" - begin - tags = JSON.parse(resp) - tags.each { |r| - result = result + clean_tag(r["name"], prj_info) + "\n" - } - rescue - end - return result - else - return "" - end -end - -def check_upstream_github_by_git(prj_info) - resp = load_last_query_result(prj_info) - if prj_info.has_key?("query_type") and prj_info["query_type"] != "git-ls" then - resp = "" - end - cmd="git ls-remote --tags https://github.com/"+prj_info["src_repo"]+".git" - if resp == "" then - STDERR.puts "DEBUG #{prj_info["src_repo"]} > Using git ls-remote" - resp=%x[#{cmd}] - last_query = {} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - prj_info["query_type"] = "git-ls" - end - tags = resp_to_git_tags(resp) - return tags -end - diff --git a/advisors/check_upstream/gnome.rb b/advisors/check_upstream/gnome.rb deleted file mode 100755 index 0a0fd1129dd3b411a6412d354daee2a5e0ba61dc..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/gnome.rb +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'date' -require_relative 'common' - -def check_upstream_gnome (prj_info) - resp = "" - resp = load_last_query_result(prj_info) - if resp == "" then - cmd="git ls-remote --tags https://gitlab.gnome.org/GNOME/"+prj_info["src_repo"]+".git" - resp = %x[#{cmd}] - last_query={} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - end - tags = resp_to_git_tags(resp) - return tags -end - diff --git a/advisors/check_upstream/hg.rb b/advisors/check_upstream/hg.rb deleted file mode 100755 index 23413853e1dc92b4599dc51f5277e9d4247eed41..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/hg.rb +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'date' -require_relative 'common' - -def check_upstream_hg (prj_info) - cookie = "" - cmd="curl -s "+prj_info["src_repo"]+"/raw-tags" - resp = load_last_query_result(prj_info) - if resp == "" then - resp = %x[#{cmd}] - if resp.lines[0].match(/html/) then # we got html response, resend with cookie - resp.each_line { |line| - match = line.scan(/document\.cookie=\"(.*)\";/) - if match != [] then - cookie = cookie + match[0][0] - end - } - cmd="curl -s --cookie \""+cookie+"\" "+prj_info["src_repo"]+"/raw-tags" - resp = %x[#{cmd}] - end - last_query={} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - end - tags = "" - resp.each_line { |line| - if line.match(/^tip/) then - next - end - match = line.scan(/^([\w\d\-\.]*)[ \t]*([\w\d\-\.]*)/) - if match != [] then - tags = tags + match[0][0].to_s + "\n" - end - } - return tags -end diff --git a/advisors/check_upstream/metacpan.rb b/advisors/check_upstream/metacpan.rb deleted file mode 100755 index 937ee618d2cbed74f443181913d5eab0cd4b4f97..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/metacpan.rb +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' -require './check_upstream/common' - -def check_upstream_metacpan (prj_info) - resp = "" - info={} - tags = "" - cmd="curl -m 60 -s https://fastapi.metacpan.org/release/"+prj_info["src_repo"] - resp = load_last_query_result(prj_info) - if resp == "" - begin - retries ||= 0 - resp=%x[#{cmd}] - info=JSON.parse(resp) - rescue - STDERR.puts "DEBUG #{prj_info["src_repo"]} > No Respose or JSON parse failed\n" - sleep 3 - retry if (retries += 1) < 10 - end - else - info = JSON.parse(resp) - end - if info != {} then - if ! info.key?("version") then - STDERR.puts "DEBUG #{prj_info["src_repo"]} > ERROR FOUND" - return tags - else - tags = tags +info["version"].to_s+"\n" - end - else - STDERR.puts "DEBUG #{prj_info["src_repo"]} > found unsorted on cpan.metacpan.org\n" - return tags - end - last_query = {} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - return tags -end diff --git a/advisors/check_upstream/pypi.rb b/advisors/check_upstream/pypi.rb deleted file mode 100755 index 34902a40da380d24eba92e6df98fb759161b7183..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/pypi.rb +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' -require './check_upstream/common.rb' - -def check_upstream_pypi (prj_info) - resp = "" - info={} - tags = "" - resp = load_last_query_result(prj_info) - if resp == "" then - last_query={} - last_query["time_stamp"] = Time.now - cmd="curl -m 60 -s -L https://pypi.org/pypi/"+prj_info["src_repo"]+"/json" - begin - retries ||= 0 - resp=%x[#{cmd}] - info=JSON.parse(resp) - rescue - STDERR.puts "DEBUG: #{prj_info["src_repo"].gsub("\n", "")} > No Respose or JSON parse failed\n" - sleep 3 - retry if (retries+=1)<10 - end - if info != {} then - last_query["raw_data"] = resp - prj_info["last_query"] = last_query - end - else - info=JSON.parse(resp) - end - if info != {} then - tags = tags + info["info"]["version"].to_s+"\n" - end - return tags -end diff --git a/advisors/check_upstream/svn.rb b/advisors/check_upstream/svn.rb deleted file mode 100755 index cac2e0344fa7a677a7c02f6d005bd79c93486710..0000000000000000000000000000000000000000 --- a/advisors/check_upstream/svn.rb +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'json' -require 'date' -require_relative 'common' - -def check_upstream_svn (prj_info) - cmd="/usr/bin/svn ls -v "+prj_info["src_repo"]+"/tags" - resp = load_last_query_result(prj_info) - if resp == "" then - resp = %x[#{cmd}] - last_query = {} - last_query["time_stamp"] = Time.now - last_query["raw_data"] = resp.dup - prj_info["last_query"] = last_query - else - end - sorted_tags = [] - resp.each_line { |tag_line| - match = tag_line.scan(/([.\w]+)/) - if match != nil then - if match[5][0].include?(prj_info["tag_prefix"]) then - new_tag = Hash.new - new_tag["Date"] = Date.parse(match[2][0]+" "+match[3][0]+" "+match[4][0]) - tag = match[5][0] - new_tag["Tag"] = tag.gsub(prj_info["tag_prefix"], "").gsub(prj_info["seperator"], ".") - sorted_tags.append(new_tag) - end - end - } - sorted_tags.sort_by! {|t| t["Date"] } - result = [] - sorted_tags.each { |t| - result.append(t["Tag"]) - } - return result -end - diff --git a/advisors/gitee.py b/advisors/gitee.py index 5e24a7c5f68ed4cf30e0e33ed1b2328c24301f0b..6925c2eaa62f17ea61edb2709cd41871de7fceb2 100755 --- a/advisors/gitee.py +++ b/advisors/gitee.py @@ -33,7 +33,7 @@ class Gitee(object): #self.advisor_url_template = "https://gitee.com/openeuler/openEuler-Advisor/raw/master/upstream-info/{package}.yaml" self.advisor_url_template = self.advisor_url + "upstream-info/{package}.yaml" #self.specfile_exception_url = "https://gitee.com/openeuler/openEuler-Advisor/raw/master/helper/specfile_exceptions.yaml" - self.specfile_exception_url = self.advisor_url + "helper/specfile_exceptions.yaml" + self.specfile_exception_url = self.advisor_url + "advisors/helper/specfile_exceptions.yaml" self.time_format = "%Y-%m-%dT%H:%M:%S%z" def post_gitee(self, url, values, headers=None): @@ -116,32 +116,42 @@ Yours openEuler-Advisor. """ get openeuler spec file for specific package """ + specurl = self.specfile_url_template.format(package=pkg, specfile=pkg + ".spec") exp = self.get_spec_exception() if pkg in exp: dir_name = exp[pkg]["dir"] file_name = exp[pkg]["file"] - specurl = self.specfile_url_template.format(package=pkg, specfile=dir_name + "/" + file_name) - else: - specurl = self.specfile_url_template.format(package=pkg, specfile=pkg + ".spec") + specurl = urllib.parse.urljoin(specurl, os.path.join(dir_name, file_name)) + + try: + resp = self.get_gitee(specurl) + except urllib.error.HTTPError: + resp = "" - return self.get_gitee(specurl) + return resp def get_yaml(self, pkg): """ get upstream yaml metadata for specific package """ yamlurl = self.advisor_url_template.format(package=pkg) - resp = self.get_gitee(yamlurl) + try: + resp = self.get_gitee(yamlurl) + except urllib.error.HTTPError: + resp = "Not found" if re.match("Not found", resp): yamlurl = self.yamlfile_url_template.format(package=pkg) - resp = self.get_gitee(yamlurl) + try: + resp = self.get_gitee(yamlurl) + except urllib.error.HTTPError: + resp = "Not found" if re.match("Not found", resp): print("Cannot find upstream metadata") return False else: return resp else: - return False + return resp def get_issues(self, pkg, prj="src-openeuler"): """ @@ -152,7 +162,7 @@ Yours openEuler-Advisor. parameters = "state=open&sort=created&direction=desc&page=1&per_page=20" return self.get_gitee_json(issues_url + parameters) - def get_issue_comments(self, pkg, number, prj="src-openeuler"): + def get_issue_comments(self, pkg, prj="src-openeuler"): """ Get comments of specific issue """ diff --git a/advisors/gitee/advisor.rb b/advisors/gitee/advisor.rb deleted file mode 100755 index a4a71f030458cb9ca991691c8c315f5a56b109a9..0000000000000000000000000000000000000000 --- a/advisors/gitee/advisor.rb +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/ruby - -require 'json' - -class Advisor - def initialize - @token = JSON.parse(File.read (File.expand_path "~/.gitee_token.json")) - @cmd = "curl -s -X POST --header 'Content-Type: application/json;charset=UTF-8'" - @param = {} - end - - def new_issue(owner, repo, title, body) - @param["access_token"] = @token["access_token"] - @param["repo"] = repo - @param["title"] = title - @param["body"] = body - @cmd += " 'https://gitee.com/api/v5/repos/#{owner}/issues'" - @cmd += " -d '" + @param.to_json + "'" - #puts @cmd - resp = %x[#{@cmd}] - #puts resp - end -end - -#ad = Advisor.new -#ad.new_issue("Shinwell_Hu", "openEuler-Toolbox") diff --git a/advisors/helper/download_spec.rb b/advisors/helper/download_spec.rb deleted file mode 100755 index 0c3403d9598de441183f4db836c91335c78948ab..0000000000000000000000000000000000000000 --- a/advisors/helper/download_spec.rb +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' - -def download_spec(name) - output_dir = "." - exception_load = YAML.load(File.read(File.dirname(__FILE__)+"/specfile_exceptions.yaml")) - if exception_load.has_key?(name) then - output_file = "#{output_dir}/#{exception_load[name]["file"]}" - cmd = "curl -s https://gitee.com/src-openeuler/#{name}/raw/master/#{exception_load[name]["dir"]}/#{exception_load[name]["file"]} -o #{output_file}" - else - output_file = "#{output_dir}/#{name}.spec" - cmd = "curl -s https://gitee.com/src-openeuler/#{name}/raw/master/#{name}.spec -o #{output_file}" - - end - %x[#{cmd}] if ! File.exists?(output_file) - s = File.size(output_file) - if s == 52 then - STDERR.puts "> No SPEC file found for #{name}" - File.delete output_file - return "" - end - return output_file -end - diff --git a/advisors/helper/rpmparser.rb b/advisors/helper/rpmparser.rb deleted file mode 100755 index 8e2908cc0dde6444f474adaa377de221c28ba92f..0000000000000000000000000000000000000000 --- a/advisors/helper/rpmparser.rb +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/ruby - -require 'yaml' -require 'set' - -def rpmspec_split_file (line, prefix) - m = line.scan (/#{prefix}\s*(.*)/) - if m != [] then - return m[0][0] - else - return nil - end -end - -def rpmspec_split_tags (line, prefix) - m = line.scan (/#{prefix}\s*(.*)/) - if m != [] then - br = m[0][0] - if br.index(',') then - bra = br.split(',').map(&:strip) - return bra - elsif br =~ /\w\s+\w/ then - bra = br.split(/\s+/) - return bra - end - end - return nil -end - -def rpmspec_clean_tag (oset, mac) - - new_set = Set.new - - oset.each { |br| - if br[0] =~ /[\d<=>!]/ then - oset.delete(br) - elsif br =~ /[<=>!]/ then - bra = br.split("\s").map(&:strip) - oset.delete(br) - new_set << bra[0] - elsif br.match(/%{/) then - m = br.scan(/%{(.*?)}/) - i = 0 - nbr = br - while i < m.length do - if mac[m[i][0]] then - nbr = nbr.gsub(/%{#{m[i][0]}}/, mac[m[i][0]]) - else - # some strange RPM macro needs shell expand, I dont know ohw to handle this - end - i = i + 1 - end - oset.delete(br) - new_set << nbr - end - } - oset += new_set - return oset -end - -def rpmspec_macro_expand(tag, macro) - ##This needs a fix - if tag.match(/%{/) then - m = tag.scan(/%{(.*)}/) - if m != [] then - if macro[m[0][0]] then - tag = tag.gsub(/%{#{m[0][0]}}/, macro[m[0][0]]) - end - end - end - return tag -end - -class Specfile - def initialize(filepath) - spec = File.open("#{filepath}") - @macros = {} - @macros["epoch"] = "1" - @macros["?_isa"] = "aarch64" - @name = "" - @version = "" - @release = "" - - @build_requires = Set.new - @requires = Set.new - @provides = Set.new - - @sources = Set.new - @patches = Set.new - - spec.each_line { |line| - m = line.scan (/^[Nn]ame\s*:\s*([^\s]*)\s*/) - if m != [] then - @name = m[0][0] - end - m = line.scan (/^[Vv]ersion\s*:\s*([^\s]*)\s*/) - if m != [] then - @version = m[0][0] - end - m = line.scan (/^[Rr]elease\s*:\s*([^\s]*)\s*/) - if m != [] then - @release = m[0][0] - end - m = line.scan (/%global\s*([^\s]*)\s*(.*)/) - if m != [] then - @macros[m[0][0]] = m[0][1] - end - m = line.scan (/%define\s*([^\s]*)\s*(.*)/) - if m != [] then - @macros[m[0][0]] = m[0][1] - end - bra = rpmspec_split_tags(line, "BuildRequires:") - if bra != nil then - @build_requires += bra - end - ra = rpmspec_split_tags(line, "Requires:") - if ra != nil then - @requires += ra - end - po = rpmspec_split_tags(line, "Provides:") - if po != nil then - @provides += po - end - src = rpmspec_split_file(line, "Source\\d*:") - if src != nil then - @sources << src - end - pa = rpmspec_split_file(line, "Patch\\d*:") - if pa != nil then - @patches << pa - end - } - @name = rpmspec_macro_expand(@name, @macros) - @macros["name"] = @name - - @version = rpmspec_macro_expand(@version, @macros) - @macros["version"] = @version - - @release = rpmspec_macro_expand(@release, @macros) - @macros["release"] = @release - - @build_requires = rpmspec_clean_tag(@build_requires, @macros) - @requires = rpmspec_clean_tag(@requires, @macros) - @provides = rpmspec_clean_tag(@provides, @macros) - end - - def get_name - return @name - end - - def get_version - return @version - end - - def get_diverse - return @patches.length - end - - def get_sources - return @sources - end - - def expand_macros(s) - return rpmspec_clean_tag(s, @macros) - end -#newspec = {} -#newspec["name"] = name -#newspec["release"] = release -#newspec["version"] = version -#newspec["build_requires"] = build_requires -#newspec["provides"] = provides -#newspec["requires"] = requires - -end - diff --git a/advisors/oa_upgradable.py b/advisors/oa_upgradable.py new file mode 100755 index 0000000000000000000000000000000000000000..949f55b17227195a43c2c18e4c8db87ac43c0bf9 --- /dev/null +++ b/advisors/oa_upgradable.py @@ -0,0 +1,82 @@ +#!/usr/bin/python3 +""" +This is a script to check upgradable information against upstream +""" +from pyrpm.spec import Spec, replace_macros + +import yaml +import json +import datetime +import sys +import os +import argparse + +import urllib.error +import gitee +import check_upstream +import version_recommend + +if __name__ == "__main__": + parameters = argparse.ArgumentParser() + parameters.add_argument("-p", "--push", action="store_true", + help="Push the version bump as an issue to src-openeuler repository") + parameters.add_argument("-d", "--default", type=str, default=os.getcwd(), + help="The fallback place to look for YAML information") + parameters.add_argument("repo", type=str, + help="Repository to be checked for upstream version info") + + args = parameters.parse_args() + + gitee = gitee.Gitee() + prj_name = args.repo + spec_string = gitee.get_spec(prj_name) + if not spec_string: + print("{repo} seems to be an empty repository".format(repo=args.repo)) + sys.exit(1) + + s_spec = Spec.from_string(spec_string) + + current_version = replace_macros(s_spec.version, s_spec) + + print("Checking ", prj_name) + print("current version is ", current_version) + + try: + prj_info_string = open(os.path.join(args.default, prj_name + ".yaml")).read() + except FileNotFoundError: + prj_info_string = "" + + if not prj_info_string: + print("Get YAML info from gitee") + try: + prj_info_string = gitee.get_yaml(prj_name) + except urllib.error.HTTPError: + print("Failed to get YAML info for {pkg}".format(pkg=prj_name)) + sys.exit(1) + + + prj_info = yaml.load(prj_info_string, Loader=yaml.Loader) + + vc_type = prj_info["version_control"] + if vc_type == "hg": + tags = check_upstream.check_hg(prj_info) + elif vc_type == "github": + tags = check_upstream.check_github(prj_info) + elif vc_type == "git": + tags = check_upstream.check_git(prj_info) + elif vc_type == "gitlab.gnome": + tags = check_upstream.check_gnome(prj_info) + elif vc_type == "svn": + tags = check_upstream.check_svn(prj_info) + elif vc_type == "metacpan": + tags = check_upstream.check_metacpan(prj_info) + elif vc_type == "pypi": + tags = check_upstream.check_pypi(prj_info) + else: + print("Unsupport version control method {vc}".format(vc=vc_type)) + sys.exit(1) + + print("known release tags :", tags) + v = version_recommend.VersionRecommend(tags, current_version, 0) + print("Latest version is ", v.latest_version) + print("Maintain version is", v.maintain_version) diff --git a/advisors/tc_statistic.py b/advisors/tc_statistic.py new file mode 100755 index 0000000000000000000000000000000000000000..89ec1854742b8726261faa8c34ead8f6147decb5 --- /dev/null +++ b/advisors/tc_statistic.py @@ -0,0 +1,144 @@ +#!/usr/bin/python3 +""" +This is a command line tool to create reminder list for TC member +""" + +import urllib +import urllib.request +import urllib.parse +import argparse +import json +import sys +import os +import yaml +from pprint import pprint +from datetime import datetime + +class Advisor(object): + """ + This is a object abstract TC robot + """ + def __init__(self): + self.secret = open(os.path.expanduser("~/.gitee_personal_token.json"), "r") + self.token = json.load(self.secret) + self.header = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0"} + self.tc_members = None + self.time_format = "%Y-%m-%dT%H:%M:%S%z" + + def get_json(self, url): + """ + Return object parsed from remote json + """ + headers = self.header.copy() + headers["Content-Type"] = "application/json;charset=UTF-8" + req = urllib.request.Request(url = url, + headers = headers, + method = "GET") + + with urllib.request.urlopen(req) as u: + resp = json.loads(u.read().decode("utf-8")) + return resp + + def get_file(self, repo, path): + """ + Get remote raw file + """ + url = "https://gitee.com/{repo}/raw/master/{path}".format(repo=repo, path=path) + req = urllib.request.Request(url = url, + headers = self.header, + method = "GET") + + with urllib.request.urlopen(req) as u: + resp = u.read() + + return resp + + def get_prs(self): + """ + Get list of PRs + """ + pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls" + list_url = pulls_url + "?access_token={token}&state=open&sort=created&direction=desc&page=1&per_page=100" + url = list_url.format(token=self.token["access_token"]) + return self.get_json(url) + + def get_recent_prs(self, num): + """ + Get list of _recent_ PRs + """ + pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls" + list_all_url = pulls_url + "?access_token={token}&state=all&sort=created&direction=desc&" + #list_all_url = pulls_url + "?access_token={token}&state=all&sort=created&direction=desc&per_page=100&page=" + list_all_url = list_all_url.format(token=self.token["access_token"]) + + result = [] + page = 1 + + if num <= 100: + list_all_url = list_all_url + "per_page={num}&page=1".format(num=num) + return self.get_json(url) + + + list_all_url = list_all_url + "per_page=100&page=" + while num > 100: + url = list_all_url + str(page) + num -= 100 + page += 1 + result += self.get_json(url) + url = list_all_url + str(page) + result += self.get_json(url) + return result + + def get_pr_comments(self, number): + """ + Get Comments for a specific PR + """ + pulls_url = "https://gitee.com/api/v5/repos/openeuler/community/pulls" + desc_url = pulls_url + "/{number}/comments?access_token={token}&page=1&per_page=100" + url = desc_url.format(number=number, token=self.token["access_token"]) + return self.get_json(url) + + def get_tc_members(self): + """ + Get list of current TC members + """ + m = yaml.load(adv.get_file("openeuler/community", "sig/TC/OWNERS"), Loader=yaml.Loader) + self.tc_members = m["maintainers"] + return m["maintainers"] + + def filter_out_tc(self, users): + """ + Pick TC members from users + """ + if not self.tc_members: + self.get_tc_members() + return [x for x in self.tc_members if x in users] + + +if __name__ == "__main__": + par = argparse.ArgumentParser() + par.add_argument("-n", "--number", help="Number of recent PRs to be processed", default="100") + + args = par.parse_args() + + adv = Advisor() + tc_members = adv.get_tc_members() + print("Current TC members :", tc_members) + tc_statistic = {} + for t in tc_members: + tc_statistic[t] = 0 + PRs = adv.get_recent_prs(int(args.number)) + print("Statistic of recent {num} PRs".format(len(PRs)) + for pr in PRs: + commenter = pr["user"]["login"] + if commenter in tc_members: + tc_statistic[commenter] += 1 + comments = adv.get_pr_comments(pr["number"]) + for comment in comments: + commenter = comment["user"]["login"] + if commenter in tc_members: + tc_statistic[commenter] += 1 + + for tc in tc_statistic.keys(): + print("{tc} mades {num} comments".format(tc=tc, num=tc_statistic[tc])) + diff --git a/advisors/version_recommend.py b/advisors/version_recommend.py new file mode 100755 index 0000000000000000000000000000000000000000..c3f23414af3d133cea18b06fc2154d3e62993a50 --- /dev/null +++ b/advisors/version_recommend.py @@ -0,0 +1,767 @@ +#!-*- coding:utf-8 -*- + +import re +import datetime +import time +from typing import List + +__ALL__ = ["VersionRecommend"] + +class VersionType(object): + + def __init__(self): + self._version_type = None + + def version_match(self, pkg_version): + pass + + def latest_version(self, version_entry): + version_entry.sort(reverse=True) + return version_entry[0] + + def maintain_version(self, version_entry, current_version, pkg_type): + _ = version_entry, pkg_type + return current_version + + def _max(self, z1, z2): + d1 = tuple(self._split(z1)) # 第一个参数版本号拆分,获取里面的数字/字母,得到序列 + d2 = tuple(self._split(z2)) # 第二个参数版本号拆分,获取里面的数字/字母,得到序列 + len1 = len(d1) + len2 = len(d2) + length = min(len1, len2) + for index in range(length): + if d1[index].isdigit() and d1[index].isdigit(): + if int(d1[index]) > int(d2[index]): + return 1 + elif int(d1[index]) < int(d2[index]): + return -1 + else: + if d1[index] > d2[index]: + return 1 + elif d1[index] < d2[index]: + return -1 + if len1 > len2: + return 1 + else: + return -1 + + def get_version_mode(self): + return self._version_type + + def _split(self, x): + for f, s in re.findall(r'([\d.]+)|([^\d.]+)', x): + if f: + float(f) + yield f + else: + yield s + + +class VersionTypeXYZW(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 4: # 通过 '.'分割后,应该剩下4位 + return False + if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位 + return False + if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位 + return False + if len(digital_list[2]) > 3: # 第三位版本号不应该大于3位 + return False + if len(digital_list[3]) > 1: # 第四位版本号不应该大于1位 + return False + return True + + def maintain_version(self, version_entry, current_version, pkg_type): + if len(version_entry) == 1: # 仅一个版本,当前即为最新版本 + return version_entry[0] + + version_candidate = [] + version_digital = re.split(r'[._-]', current_version) # 将版本号做拆分 + xyz = version_digital[0:3] + for version in version_entry: + version_temp = re.split(r'[._-]', version) + if version_digital[0:3] == version_temp[0:3]: # 如果版本号与当前版本前三位一致,说明是维护分支版本 + version_candidate.append(version_temp) # 将同特性版本的子版本挑选出来 + + if len(version_candidate) == 1: + return '.'.join(version_candidate[0]) + + w = '0' + for version in version_candidate: + if len(version) <= 3: + continue + + if self._max(version[3], w) > 0: + w = version[3] + + xyz.append(w) + return '.'.join(xyz) + + def latest_version(self, version_entry): + if len(version_entry) == 1: # 仅一个版本,当前即为最新版本 + return version_entry[0] + version_list = [] + for version in version_entry: + version_list.append(re.split(r'[._-]', version)) # 将 version 拆分为列表,方便后续比较 + x = '0' + for version in version_list: # 第一轮比较取出最大的第一位 + if self._max(x, version[0]) < 0: + x = version[0] + + version_candidate = [] + for version in version_list: # 将第一位最大的列入候选列表,准备第二位比较 + if x == version[0]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + version_list = version_candidate[:] + y = '0' + for version in version_list: # 第二轮比较取出最大的第二位 + if len(version) <= 1: # 过滤仅一位的版本号 + continue + if self._max(y, version[1]) < 0: + y = version[1] + + version_candidate.clear() + for version in version_list: # 将第二位最大的列入候选列表,准备第三位比较 + if y == version[1]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + z = '0' + version_list = version_candidate[:] + for version in version_list: # 第三轮比较取出最大的第三位 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if self._max(z, version[2]) < 0: + z = version[2] + + version_candidate.clear() + for version in version_list: # 最后一位最大版本必须惟一,直接返回结果 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if z == version[2]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + w = '0' + version_list = version_candidate[:] + for version in version_list: # 最后一位最大版本必须惟一,直接返回结果 + if len(version) <= 3: # 过滤仅三位的版本号 + continue + if self._max(w, version[3]) < 0: + w = version[3] + + for version in version_list: # 最后一位最大版本必须惟一,直接返回结果 + if len(version) <= 3: # 过滤仅三位的版本号 + continue + if w == version[3]: + return '.'.join(version) + + return '' + + def __init__(self): + super().__init__() + self._version_type = 'x.y.z.w' + + +class VersionTypeXYZ(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位 + return False + if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位 + return False + if len(digital_list[1]) > 3: # 第二位版本号不应该大于3位 + return False + if len(digital_list[2]) > 3: # 第三位版本号不应该大于3位 + return False + return True + + def maintain_version(self, version_entry, current_version, pkg_type): + if len(version_entry) == 1: # 仅一个版本,当前即为最新版本 + return version_entry[0] + + version_candidate = [] + version_digital = re.split(r'[._-]', current_version) # 将版本号做拆分 + xy = version_digital[0:2] + for version in version_entry: + version_temp = re.split(r'[._-]', version) + if version_digital[0:2] == version_temp[0:2]: # 如果版本号与当前版本前两位一致,说明是维护分支版本 + version_candidate.append(version_temp) # 将同特性版本的子版本挑选出来 + + if len(version_candidate) == 1: + return '.'.join(version_candidate[0]) + + z = '0' + for version in version_candidate: + if len(version) <= 2: + continue + + if self._max(version[2], z) > 0: + z = version[2] + + xy.append(z) + return '.'.join(xy) + + def latest_version(self, version_entry): + if len(version_entry) == 1: # 仅一个版本,当前即为最新版本 + return version_entry[0] + version_list = [] + for version in version_entry: + version_list.append(re.split(r'[._-]', version)) # 将 version 拆分为列表,方便后续比较 + x = '0' + for version in version_list: # 第一轮比较取出最大的第一位 + if self._max(x, version[0]) < 0: + x = version[0] + + version_candidate = [] + for version in version_list: # 将第一位最大的列入候选列表,准备第二位比较 + if x == version[0]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + version_list = version_candidate[:] + y = '0' + for version in version_list: # 第二轮比较取出最大的第二位 + if len(version) <= 1: # 过滤仅一位的版本号 + continue + if self._max(y, version[1]) < 0: + y = version[1] + + version_candidate.clear() + for version in version_list: # 将第二位最大的列入候选列表,准备第三位比较 + if y == version[1]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + z = '0' + version_list = version_candidate[:] + for version in version_list: # 第三轮比较取出最大的第三位 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if self._max(z, version[2]) < 0: + z = version[2] + + for version in version_list: # 最后一位最大版本必须惟一,直接返回结果 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if z == version[2]: + return '.'.join(version) + + return '' + + def __init__(self): + super().__init__() + self._version_type = 'x.y.z' + + +class VersionTypeXY(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位 + return False + if len(digital_list[0]) > 2: # 第一位版本号不应该大于2位 + return False + if len(digital_list[1]) > 3: # 第二位版本号不应该大于2位 + return False + return True + + def __init__(self): + super().__init__() + self._version_type = 'x.y' + + def latest_version(self, version_entry): + if len(version_entry) == 1: # 仅一个版本,当前即为最新版本 + return version_entry[0] + version_list = [] + for version in version_entry: + version_list.append(re.split(r'[._-]', version)) # 将 version 拆分为列表,方便后续比较 + x = '0' + for version in version_list: # 第一轮比较取出最大的第一位 + if self._max(x, version[0]) < 0: + x = version[0] + + version_candidate = [] + for version in version_list: # 将第一位最大的列入候选列表,准备第二位比较 + if x == version[0]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + version_list = version_candidate[:] + y = '0' + for version in version_list: # 第二轮比较取出最大的第二位 + if len(version) <= 1: # 过滤仅一位的版本号 + continue + if self._max(y, version[1]) < 0: + y = version[1] + + version_candidate.clear() + for version in version_list: # x.y 版本类型中会小概率出现三位版本号,需要将第二位最大的列入候选列表,准备第三位比较 + if len(version) <= 1: # 过滤仅一位的版本号 + continue + if y == version[1]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + z = '0' + version_list = version_candidate[:] + for version in version_list: # 第三轮比较取出最大的第三位 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if self._max(z, version[2]) < 0: + z = version[2] + + for version in version_list: # 最后一位最大版本必须惟一,直接返回结果 + if len(version) <= 2: # 过滤仅二位的版本号 + continue + if z == version[2]: + return '.'.join(version) + + return '' + + def maintain_version(self, version_entry, current_version, pkg_type): + version_candidate = [] + version_digital = re.split(r'[._-]', current_version) # 将版本号做拆分 + x = [version_digital[0]] + for version in version_entry: + version_temp = re.split(r'[._-]', version) + if version_digital[0] == version_temp[0]: # 如果版本号与当前版本前两位一致,说明是维护分支版本 + version_candidate.append(version_temp) # 将同特性版本的子版本挑选出来 + + if len(version_candidate) == 1: + return '.'.join(version_candidate[0]) + + y = '0' + for version in version_candidate[0:]: + if len(version) <= 1: + continue + + if self._max(version[1], y) > 0: + y = version[1] + x.append(y) + return '.'.join(x) + + +class VersionTypeX(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 1: # 通过 '.'分割后,应该剩下1位 + return False + if len(digital_list[0]) > 3: # 第一位版本号不应该大于3位 + return False + return True + + def latest_version(self, version_entry): + if 1 == len(version_entry): # 仅一个版本,当前即为最新版本 + return version_entry[0] + version_list: List[List[str]] = [] + for version in version_entry: + version_list.append(re.split(r'[._-]', version)) # 将 version 拆分为列表,方便后续比较 + x = '0' + for version in version_list: # 第一轮比较取出最大的第一位 + if self._max(x, version[0]) < 0: + x = version[0] + + version_candidate = [] + for version in version_list: # 将第一位最大的列入候选列表,准备第二位比较 + if x == version[0]: + version_candidate.append(version) + + if len(version_candidate) == 1: # 仅一个版本,候选即为最新版本 + return '.'.join(version_candidate[0]) + + version_list = version_candidate[:] + y = '0' + for version in version_list: # 第二轮比较取出最大的第二位 + if len(version) <= 1: # 过滤仅一位的版本号 + continue + if self._max(y, version[1]) < 0: + y = version[1] + + x.append(y) + return '.'.join(x) + + def maintain_version(self, version_entry, current_version, pkg_type): + version_candidate = [] + version_digital = re.split(r'[._-]', current_version) # 将版本号做拆分 + x = [version_digital[0]] + for version in version_entry: + version_temp = re.split(r'[._-]', version) + if version_digital[0] == version_temp[0]: # 如果版本号与当前版本前两位一致,说明是维护分支版本 + version_candidate.append(version_temp) # 将同特性版本的子版本挑选出来 + + if len(version_candidate) == 1: + return '.'.join(version_candidate[0]) + + y = '0' + for version in version_candidate[0:]: + if len(version) <= 1: + continue + + if self._max(version[1], y) > 0: + y = version[1] + x.append(y) + return '.'.join(x) + + def __init__(self): + super().__init__() + self._version_type = 'x' + + +class VersionTypeYyyyXY(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位 + return False + + if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位 + return False + + year = int(digital_list[0]) + if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间 + return False + + if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位 + return False + + if str(int(digital_list[1])) != digital_list[1]: # 版本类型为数字,且非0 开头 + return False + + if len(digital_list[2]) > 2: # 第三位版本号不应该大于2位 + return False + + if str(int(digital_list[2])) != digital_list[2]: # 版本类型为数字,且非0 开头 + return False + + return True + + def __init__(self): + super().__init__() + self._version_type = 'yyyy.x.y' + + +class VersionTypeYyyyX(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位 + return False + + if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位 + return False + year = int(digital_list[0]) + if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间 + return False + + if len(digital_list[1]) > 2: # 第二位版本号不应该大于2位 + return False + return True + + def __init__(self): + super().__init__() + self._version_type = 'yyyy.x' + + +class VersionTypeYyyyW(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + if len(version) != 5: # 共5 位 + return False + if not str(version[0:4]).isdigit(): # 前四位为年份数字 + return False + + year = int(version[0:4]) + if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间 + return False + + return True + + def __init__(self): + super().__init__() + self._version_type = 'yyyyw' + + +class VersionTypeYyyyMmDd(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位 + return False + + if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位 + return False + + if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12 + return False + + if int(digital_list[2]) > 31 or int(digital_list[2]) == 0: # 第三位为发布日期,小于31 + return False + + # 判断日期是否为合法日期 + try: + if '_' in version: + d_time = time.mktime(time.strptime(version, "%Y_%m_%d")) + elif '-' in version: + d_time = time.mktime(time.strptime(version, "%Y-%m-%d")) + elif '.' in version: + d_time = time.mktime(time.strptime(version, "%Y.%m.%d")) + else: + return False + + now_str = datetime.datetime.now().strftime('%Y-%m-%d') + end_time = time.mktime(time.strptime(now_str, '%Y-%m-%d')) + if d_time > end_time: # 判断日期是否大于当前日期 + return False + else: + return True + except ValueError as e: # 时间格式非法 + print('Time foramt failed %s.', version) + return False + + def __init__(self): + super().__init__() + self._version_type = 'yyyy.mm.dd' + + +class VersionTypeYyyyMm(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 2: # 通过 '.'分割后,应该剩下2位 + return False + + if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位 + return False + year = int(digital_list[0]) + if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间 + return False + + if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12 + return False + + if year == datetime.datetime.now().year and \ + int(digital_list[1]) > datetime.datetime.now().month: + return False + + return True + + def __init__(self): + super().__init__() + self._version_type = 'yyyy.mm' + + +class VersionTypeYyyyMm(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + if len(version) != 6: # 长度为6 + return False + + if not version.isdigit(): # 时间格式为数字 + return False + + digital_list = [version[0:4], version[4:]] + + if len(digital_list[0]) != 4: # 第一位为发布年份,位数为4位 + return False + year = int(digital_list[0]) + if year < 2000 or year > datetime.datetime.now().year: # 软件发布时间应该大于2000 年,小于当前时间 + return False + + if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12 + return False + + if year == datetime.datetime.now().year and \ + int(digital_list[1]) > datetime.datetime.now().month: + return False + + return True + + def __init__(self): + super().__init__() + self._version_type = 'yyyymm' + + +class VersionTypeXYymmZ(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + digital_list = re.split(r'[._-]', version) + if len(digital_list) != 3: # 通过 '.'分割后,应该剩下3位 + return False + + if len(digital_list[0]) > 2: # 第一位为主版本号,小于2位 + return False + # 将年月拆分后分别判断 + if len(digital_list[1]) != 4: # 年月等于4位 + return False + year = str(digital_list[1][:2]) + month = str(digital_list[1][-2:]) + if year > datetime.datetime.now().year[-2:]: # 年份不能大于当前年份,不用考虑20000 年前的情况 + return False + if month > '12' or month == '0': + return False + if len(digital_list[2]) > 2: # 迭代号不大于2位 + return False + return True + + def __init__(self): + super().__init__() + self._version_type = 'x.yymm.z' + + +class VersionTypeYyyymmdd(VersionType): + + def version_match(self, pkg_version): + version = pkg_version.strip() + if len(version) != 8: # 日期长度满足 8 位要求 + return False + if not version.isdigit(): # 连续数字,没有其他分别符号 + return False + digital_list = [version[:4], version[4:6], version[6:]] + + if int(digital_list[1]) > 12 or int(digital_list[1]) == 0: # 第二位为发布月份,小于12 + return False + + if int(digital_list[2]) > 31 or int(digital_list[2]) == 0: # 第三位为发布日期,小于31 + return False + + # 判断日期是否为合法日期 + try: + d_time = time.mktime(time.strptime(version, "%Y%m%d")) + now_str = datetime.datetime.now().strftime('%Y-%m-%d') + end_time = time.mktime(time.strptime(now_str, '%Y-%m-%d')) + if d_time > end_time: # 判断日期是否大于当前日期 + return False + else: + return True + except ValueError as e: # 时间格式非法 + print('Time format failed %s,', version) + return False + + def __init__(self): + super().__init__() + self._version_type = 'yyyymmdd' + + +class VersionRecommend(object): + + def __init__(self, version_entry, current_version, pkg_type): + self.latest_version = current_version # 提供初值,避免 current_version 为空导致后面出现异常 + self.maintain_version = current_version + self.version_type = self._version_match(version_entry) + if self.version_type is None: + print('version type is None:', current_version) + return + + print('version type = ', self.version_type.get_version_mode()) + self.latest_version = self._get_latest_version(version_entry) + self.maintain_version = self._get_maintain_version(version_entry, current_version, pkg_type) + + def _version_match(self, version_entry): + + version_method = {VersionTypeXYZW(): 0, + VersionTypeXYZ(): 0, + VersionTypeXY(): 0, + VersionTypeX(): 0, + VersionTypeYyyyXY(): 0, + VersionTypeYyyyX(): 0, + VersionTypeYyyyW(): 0, + VersionTypeYyyyMmDd(): 0, + VersionTypeYyyyMm(): 0, + VersionTypeYyyyMm(): 0, + VersionTypeXYymmZ(): 0, + VersionTypeYyyymmdd(): 0} + if not version_entry: + return None + for version in version_entry[:]: + if not self._version_valid(version): + version_entry.remove(version) # 删除非法版本号 + continue + for method, count in version_method.items(): + if method.version_match(version): + version_method[method] = count + 1 + + # 解决多版本类型问题,选取类型最多的作为匹配,这个处理不是最优方案,需要改进 + method = max(version_method, key=lambda x: version_method[x]) + + if version_method[method] == 0: + return None + else: + return method + + def _version_valid(self, version): + m = re.match("^[0-9a-zA-Z._-]*$", version) + if m is None: # 版本号应该是 数字/小写字母/下划线/. 组成 + return False + + m = re.match('^[0-9].*', version) + if m is None: # 版本号应该是数字开头 + return False + + if 'rc' in version \ + or 'RC' in version \ + or 'dev' in version \ + or 'beta' in version \ + or 'Beta' in version \ + or 'BETA' in version \ + or 'alpha' in version \ + or 'pl' in version \ + or 'pre' in version \ + or 'PRE' in version \ + or 'bp' in version: # 仅获取正式版本 + return False + + if 'ubuntu' in version or 'fedora' in version: # 去掉厂家专用版本号 + return False + + return True + + def _get_latest_version(self, version_entry): + if self.version_type is None: + return '' + else: + return self.version_type.latest_version(version_entry) + + def _get_maintain_version(self, version_entry, current_version, pkg_type): + if self.version_type is None: + return '' + else: + return self.version_type.maintain_version(version_entry, current_version, pkg_type) + + +if __name__ == '__main__': + version_recommend = VersionRecommend( + ['0.1', '0.2', '1.2.3', '1.2.4', '1.2.6', '1.3.0', '1.5-rc', '2.0-rc'], '1.2.3', 0) + print('latest_version', version_recommend.latest_version) + print('maintain_version', version_recommend.maintain_version)