OfpayDataSync.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # -*- coding: utf-8 -*-
  2. import os
  3. from datetime import datetime
  4. import pymysql
  5. import csv
  6. import json
  7. from decimal import Decimal
  8. # 自定义 JSON 编码器
  9. class CustomEncoder(json.JSONEncoder):
  10. def default(self, obj):
  11. if isinstance(obj, Decimal):
  12. return str(obj);
  13. elif isinstance(obj, datetime):
  14. return obj.strftime('%Y-%m-%d %H:%M:%S');
  15. return super().default(obj);
  16. class OfpayDataSyncer:
  17. def __init__(self):
  18. self.db_conn = None;
  19. self.connect_mysql();
  20. def connect_mysql(self):
  21. config = {
  22. 'host':'47.106.225.136',
  23. 'port':3306,
  24. 'user':'root',
  25. 'passwd':'sjojo123456',
  26. 'database':'mitmproxy',
  27. 'charset':'utf8'
  28. };
  29. db_conn = None;
  30. while True:
  31. try:
  32. db_conn = pymysql.connect(**config);
  33. db_conn.ping(reconnect=True);
  34. except pymysql.OperationalError as e:
  35. print(e);
  36. print('连接断开,正在尝试重新连接...');
  37. if db_conn:
  38. db_conn.close();
  39. db_conn = pymysql.connect(**config);
  40. time.sleep(1);
  41. else:
  42. break;
  43. self.db_conn = db_conn;
  44. def sync_account(self, filename='results.csv'):
  45. sql_query = f'''
  46. SELECT * FROM elife_account_data WHERE update_time >= CURDATE() AND update_time < CURDATE() + INTERVAL 1 DAY;
  47. ''';
  48. cursor = self.db_conn.cursor();
  49. cursor.execute(sql_query);
  50. results = cursor.fetchall();
  51. suffix = os.path.splitext(filename)[-1];
  52. if suffix == '.csv' or suffix == '':
  53. filepath = filename if suffix != '' else filename+'.csv';
  54. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  55. writer = csv.writer(f);
  56. if results:
  57. headers = [desc[0] for desc in cursor.description];
  58. writer.writerow(headers);
  59. for row in results:
  60. writer.writerow(row);
  61. if suffix == '.json' or suffix == '':
  62. filepath = filename if suffix != '' else filename+'.json';
  63. data_list = [];
  64. column_names = [desc[0] for desc in cursor.description];
  65. for row in results:
  66. data_dict = dict(zip(column_names, row));
  67. # update_time = data_dict['update_time'];
  68. # if isinstance(update_time, datetime):
  69. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  70. # data_dict['update_time'] = formatted_date;
  71. data_list.append(data_dict);
  72. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  73. json_str = json.dumps(data_list, cls=CustomEncoder, ensure_ascii=False);
  74. f.write(json_str);
  75. def sync_activities(self, filename='results.json'):
  76. sql_query = f'''
  77. SELECT * FROM elife_activities WHERE groupId = '1' GROUP BY sortNum;
  78. ''';
  79. cursor = self.db_conn.cursor();
  80. cursor.execute(sql_query);
  81. results = cursor.fetchall();
  82. activity_list = [];
  83. activity_dict = {};
  84. column_names = [desc[0] for desc in cursor.description];
  85. activity_ids = [];
  86. for row in results:
  87. data_dict = dict(zip(column_names, row));
  88. activity_id = data_dict['activityId'];
  89. # update_time = data_dict['updateTime'];
  90. # if isinstance(update_time, datetime):
  91. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  92. # data_dict['updateTime'] = formatted_date;
  93. activity_list.append(data_dict);
  94. activity_ids.append(activity_id);
  95. activity_dict[activity_id] = data_dict;
  96. sql_query = f'''
  97. SELECT * FROM elife_activity_awards WHERE activityId in %s;
  98. ''';
  99. sql_query = sql_query % repr(tuple(activity_ids));
  100. cursor.execute(sql_query);
  101. aw_results = cursor.fetchall();
  102. column_names = [desc[0] for desc in cursor.description];
  103. for row in aw_results:
  104. data_dict = dict(zip(column_names, row));
  105. activity_id = data_dict['activityId'];
  106. # update_time = data_dict['updateTime'];
  107. # if isinstance(update_time, datetime):
  108. # formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  109. # data_dict['updateTime'] = formatted_date;
  110. if activity_id in activity_dict:
  111. entity = activity_dict[activity_id];
  112. if 'awardList' not in entity:
  113. entity['awardList'] = [];
  114. entity['awardList'].append(data_dict);
  115. # print(activity_ids);
  116. # print(sql_query);
  117. # print(activity_list);
  118. with open(filename, 'w', newline='', encoding='utf-8') as f:
  119. json_str = json.dumps(activity_list, cls=CustomEncoder, ensure_ascii=False);
  120. f.write(json_str);
  121. def read_csv(self, filename='results.csv'):
  122. data = []
  123. try:
  124. with open(filename, mode='r', encoding='utf-8') as csvfile:
  125. reader = csv.DictReader(csvfile);
  126. for row in reader:
  127. data.append(row);
  128. except IOError as e:
  129. print(f"read_csv error occurred: {e}");
  130. # print(data);
  131. return data;
  132. def read_json(self, filename='results.json'):
  133. data = None;
  134. try:
  135. with open(filename, 'r', encoding='utf-8') as file:
  136. data = json.load(file);
  137. except IOError as e:
  138. print(f"read_json error occurred: {e}");
  139. # print(data);
  140. return data;
  141. def main():
  142. syncer = OfpayDataSyncer();
  143. syncer.sync_activities('elife_activities_data.json');
  144. # syncer.read_json('elife_activities_data.json');
  145. syncer.sync_account('elife_accout_data');
  146. # syncer.read_csv('elife_accout_data.csv');
  147. # syncer.read_json('elife_accout_data.json');
  148. if __name__ == "__main__":
  149. main();