Przeglądaj źródła

优化e生活脚本

shawenguan 1 rok temu
rodzic
commit
a62b6e3d8f
1 zmienionych plików z 117 dodań i 54 usunięć
  1. 117 54
      addons/elife/OfpayGrab.py

+ 117 - 54
addons/elife/OfpayGrab.py

@@ -9,7 +9,11 @@ from datetime import datetime
 from cachetools import TTLCache
 import pickle
 import multiprocessing
-from multiprocessing import Manager, Pool
+from multiprocessing import Manager
+from multiprocessing.dummy import Pool as ThreadPool
+
+from functools import partial
+import time
 
 def read_csv(filename='results.csv'):
     data = []
@@ -41,28 +45,45 @@ class SharedCache:
 
     def init(self):
         cache = None;
-        cache_file_path = SharedCache.cache_file_path;
-        if os.path.exists(cache_file_path):
-            # 从文件中加载缓存对象
-            with open(cache_file_path, 'rb') as file:
-                cache = pickle.load(file);
-        else:
-            cache = TTLCache(maxsize=100, ttl=60*60*24);
-            # 将缓存对象持久化到文件
-            with open(cache_file_path, 'wb') as file:
-                pickle.dump(cache, file);
+        try:
+            cache_file_path = SharedCache.cache_file_path;
+            if os.path.exists(cache_file_path):
+                # 从文件中加载缓存对象
+                with open(cache_file_path, 'rb') as file:
+                    cache = pickle.load(file);
+            else:
+                cache = TTLCache(maxsize=100, ttl=60*60*24);
+                # 将缓存对象持久化到文件
+                with open(cache_file_path, 'wb') as file:
+                    pickle.dump(cache, file);
+        except Exception as e:
+            cache = {};
+            print(e);
+        finally:
+            pass
         self.cache = cache;
 
     def save(self):
-        cache_file_path = SharedCache.cache_file_path;
-        with open(cache_file_path, 'wb') as file:
-            pickle.dump(self.cache, file);
+        try:
+            cache_file_path = SharedCache.cache_file_path;
+            with open(cache_file_path, 'wb') as file:
+                pickle.dump(self.cache, file);
+        except Exception as e:
+            print(e);
+        finally:
+            pass;
 
     def set(self, key, value):
         self.cache[key] = value;
 
     def get(self, key):
-        return self.cache.get(key);
+        ret = None;
+        try:
+            ret = self.cache.get(key);
+        except Exception as e:
+            print(e);
+        finally:
+            return ret;
 
 class OfpayGrabber:
     FastModeEnable = True;
@@ -125,25 +146,27 @@ class OfpayGrabber:
             return;
         if ret_code == 1:
             self.sync_new_token();
-        cate_items = self.get_market_items_from_cache();
-        if not cate_items:
-            cate_items = self.get_market_items_from_svr(self.market_id, self.event_visitor_id);
-        if not cate_items:
+        cate_items_list = self.get_market_items_from_cache();
+        if not cate_items_list:
+            cate_items_list = self.get_market_items_from_svr(self.market_id, self.event_visitor_id);
+        if not cate_items_list:
             return;
-        all_buy_list = self.get_will_market_buy_list_all();
-        will_cate_count = len(all_buy_list);
+        cate_items_dict = {};
+        for activity_data in cate_items_list:
+            out_activity_code = activity_data['outActivityCode'];
+            cate_items_dict[out_activity_code] = activity_data;
+
+        all_buy_dict = self.get_will_market_buy_list_all();
         all_ret_list = [];
-        sort_num = 0;
-        for activity_data in cate_items:
-            if 'sortNum' in activity_data:
-                sort_num = activity_data['sortNum'];
-            if sort_num >= will_cate_count:
+        for cate_type in all_buy_dict:
+            buy_list = all_buy_dict[cate_type];
+            if not buy_list:
+                continue;
+            activity_data = cate_items_dict[cate_type] if cate_type in cate_items_dict else None;
+            if not activity_data:
                 continue;
-            buy_list = all_buy_list[sort_num];
-            print(sort_num, buy_list);
             buy_ret_list = self.check_to_buy_all(buy_list, activity_data);
             all_ret_list.extend(buy_ret_list);
-            sort_num += 1;
         if len(all_ret_list):
             g_shared_cache.save();
         return all_ret_list;
@@ -160,8 +183,14 @@ class OfpayGrabber:
         login_params = self.accout_data['login_params'];
         url = f'https://{self.host}/h5/union/interactiveIGoChoose/index?loginParams={login_params}';
         response = self.get_request(url);
-        # print(response.content);
-        cookie_dict = dict(response.cookies);
+        # content = response.content;
+        # print(content.decode("utf-8"));
+        # cookie_dict = dict(response.cookies);
+        cookie_dict = {}
+        for cookie in response.cookies:
+            if cookie.name:
+                key = cookie.name;
+                cookie_dict[key] = cookie.value;
         authorization = None;
         for key in cookie_dict:
             self.cookies[key] = cookie_dict[key];
@@ -178,21 +207,23 @@ class OfpayGrabber:
         print('sync_new_token');
 
     def get_will_market_buy_list_all(self):
-        def_val = '星巴克|霸王茶姬|百果园|京东E卡|滴滴快车';
-        name_arr = [];
-        name_str = self.accout_data['will_buy_list'] if self.accout_data['will_buy_list'] else def_val;
-        if len(name_str):
-            segments = name_str.strip().split('|');
-            name_arr = [];
-            for vstr in segments:
-                if not len(vstr):
-                    name_arr.append(None);
-                else:
-                    vlist = vstr.strip().split(',');
-                    name_arr.append(vlist);
+        def_val = 'eSupermarket#京东E卡|eTravel#滴滴快车|eCoffee#星巴克|eFood#百果园|eTea#霸王茶姬|eMovie#|eBicycle#|eOffice#';
+        buy_ret_dict = {};
+        buy_conf_str = self.accout_data['will_buy_list'] if self.accout_data['will_buy_list'] else def_val;
+        if len(buy_conf_str):
+            buy_ret_dict = {};
+            items = buy_conf_str.strip().split('|');
+            for vstr in items:
+                if len(vstr):
+                    cate_item = vstr.strip().split('#');
+                    cate_type_key = cate_item[0];
+                    cate_item_val = cate_item[1] if len(cate_item) > 1 else None;
+                    if cate_item_val and len(cate_item_val):
+                        vlist = cate_item_val.strip().split('+');
+                        buy_ret_dict[cate_type_key] = vlist;
         else:
-            name_arr = [];
-        return name_arr;
+            buy_ret_dict = {};
+        return buy_ret_dict;
 
     def get_award_expected_discount(self, price, prize_name):
         if not self.award_want_discount_dict:
@@ -511,15 +542,40 @@ def init_data_pre_excute():
     pass;
 
 def thread_worker_func(index, shared_namespace, account_info, activities_data):
-    print(f'thread_worker_func#{index}');
     global g_shared_cache;
     g_shared_cache = shared_namespace.g_shared_cache;
     cli_options = shared_namespace.cli_options;
-    print('########账号[%s]开始抢券工作########' % account_info['account']);
-    grabber = OfpayGrabber(account_info, activities_data, cli_options);
-    results = grabber.start();
-    print('########################################');
-    return f"账号[{account_info['account']}]任务完成";
+    print('########[%d]账号[%s]开始抢券工作########' % (index, account_info['account']));
+    try:
+        grabber = OfpayGrabber(account_info, activities_data, cli_options);
+        results = grabber.start();
+    except Exception as e:
+        print(e);
+    finally:
+        pass
+    if results:
+        print(results);
+    print(f"账号[{account_info['account']}]任务完成");
+    print('###############################################\n');
+    return {'account_info': account_info, 'results': results};
+
+# 守护进程
+def thread_daemon(func, *args, **kwargs):
+    # print('thread_daemon');
+    timeout = kwargs.get('timeout', None);
+    p = ThreadPool(1);
+    res = p.apply_async(func, args=args);
+    try:
+        out = res.get(timeout=timeout)  # Wait timeout seconds for func to complete.
+        return out;
+    except multiprocessing.TimeoutError:
+        print("Aborting due to timeout={}".format(args[0]));
+        return (args[0], 444);
+
+def collect_result(result):
+    # print("{}".format(result));
+    pass;
+
 
 def main(cli_options):
     global g_shared_cache;
@@ -539,7 +595,7 @@ def main(cli_options):
         shared_namespace = manager.Namespace();
         shared_namespace.g_shared_cache = g_shared_cache;
         shared_namespace.cli_options = cli_options;
-        pool = multiprocessing.Pool(cpu_threads, initializer=init_data_pre_excute);
+        pool = multiprocessing.Pool(maxtasksperchild=1);
         results = [];
         index = 0;
         for item in accout_data:
@@ -547,8 +603,15 @@ def main(cli_options):
             if enable != 1:
                 print('########账号[%s]抢券功能关闭########' % item['account']);
                 continue;
-            result = pool.apply_async(thread_worker_func,(index, shared_namespace, item, activities_data,));
-            results.append(result);
+            # result = pool.apply_async(thread_worker_func,(index, shared_namespace, item, activities_data,));
+            # try:
+            #     print(result.get(timeout=3));
+            # except multiprocessing.TimeoutError:
+            #     # pool.terminate();
+            #     print('########账号[%s]抢券执行超时########' % item['account']);
+            # results.append(result);
+            abortable_func = partial(thread_daemon, thread_worker_func, timeout=60);
+            pool.apply_async(abortable_func, args=[index, shared_namespace, item, activities_data,], callback=collect_result);
             index += 1;
         # 关闭进程池,不再接受新的任务
         pool.close();