OfpayDataSync.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # -*- coding: utf-8 -*-
  2. import os
  3. from datetime import datetime
  4. import pymysql
  5. import csv
  6. import json
  7. from decimal import Decimal
  8. # 自定义 JSON 编码器
  9. class DecimalEncoder(json.JSONEncoder):
  10. def default(self, o):
  11. if isinstance(o, Decimal):
  12. return str(o);
  13. return super().default(o);
  14. class OfpayDataSyncer:
  15. def __init__(self):
  16. self.db_conn = None;
  17. self.connect_mysql();
  18. def connect_mysql(self):
  19. config = {
  20. 'host':'47.106.225.136',
  21. 'port':3306,
  22. 'user':'root',
  23. 'passwd':'sjojo123456',
  24. 'database':'mitmproxy',
  25. 'charset':'utf8'
  26. };
  27. db_conn = None;
  28. while True:
  29. try:
  30. db_conn = pymysql.connect(**config);
  31. db_conn.ping(reconnect=True);
  32. except pymysql.OperationalError as e:
  33. print(e);
  34. print('连接断开,正在尝试重新连接...');
  35. if db_conn:
  36. db_conn.close();
  37. db_conn = pymysql.connect(**config);
  38. time.sleep(1);
  39. else:
  40. break;
  41. self.db_conn = db_conn;
  42. def sync_account(self, filename='results.csv'):
  43. sql_query = f'''
  44. SELECT * FROM elife_account_data WHERE update_time >= CURDATE() AND update_time < CURDATE() + INTERVAL 1 DAY;
  45. ''';
  46. cursor = self.db_conn.cursor();
  47. cursor.execute(sql_query);
  48. results = cursor.fetchall();
  49. suffix = os.path.splitext(filename)[-1];
  50. if suffix == '.csv' or suffix == '':
  51. filepath = filename if suffix != '' else filename+'.csv';
  52. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  53. writer = csv.writer(f);
  54. if results:
  55. headers = [desc[0] for desc in cursor.description];
  56. writer.writerow(headers);
  57. for row in results:
  58. writer.writerow(row);
  59. if suffix == '.json' or suffix == '':
  60. filepath = filename if suffix != '' else filename+'.json';
  61. data_list = [];
  62. column_names = [desc[0] for desc in cursor.description];
  63. for row in results:
  64. data_dict = dict(zip(column_names, row));
  65. update_time = data_dict['update_time'];
  66. if isinstance(update_time, datetime):
  67. formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  68. data_dict['update_time'] = formatted_date;
  69. data_list.append(data_dict);
  70. with open(filepath, 'w', newline='', encoding='utf-8') as f:
  71. json_str = json.dumps(data_list, cls=DecimalEncoder, ensure_ascii=False);
  72. f.write(json_str);
  73. def sync_activities(self, filename='results.json'):
  74. sql_query = f'''
  75. SELECT * FROM elife_activities WHERE groupId = '1' GROUP BY sortNum;
  76. ''';
  77. cursor = self.db_conn.cursor();
  78. cursor.execute(sql_query);
  79. results = cursor.fetchall();
  80. activity_list = [];
  81. activity_dict = {};
  82. column_names = [desc[0] for desc in cursor.description];
  83. activity_ids = [];
  84. for row in results:
  85. data_dict = dict(zip(column_names, row));
  86. activity_id = data_dict['activityId'];
  87. update_time = data_dict['updateTime'];
  88. if isinstance(update_time, datetime):
  89. formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  90. data_dict['updateTime'] = formatted_date;
  91. activity_list.append(data_dict);
  92. activity_ids.append(activity_id);
  93. activity_dict[activity_id] = data_dict;
  94. sql_query = f'''
  95. SELECT * FROM elife_activity_awards WHERE activityId in %s;
  96. ''';
  97. sql_query = sql_query % repr(tuple(activity_ids));
  98. cursor.execute(sql_query);
  99. aw_results = cursor.fetchall();
  100. column_names = [desc[0] for desc in cursor.description];
  101. for row in aw_results:
  102. data_dict = dict(zip(column_names, row));
  103. activity_id = data_dict['activityId'];
  104. update_time = data_dict['updateTime'];
  105. if isinstance(update_time, datetime):
  106. formatted_date = update_time.strftime('%Y-%m-%d %H:%M:%S');
  107. data_dict['updateTime'] = formatted_date;
  108. if activity_id in activity_dict:
  109. entity = activity_dict[activity_id];
  110. if 'awardList' not in entity:
  111. entity['awardList'] = [];
  112. entity['awardList'].append(data_dict);
  113. # print(activity_ids);
  114. # print(sql_query);
  115. # print(activity_list);
  116. with open(filename, 'w', newline='', encoding='utf-8') as f:
  117. json_str = json.dumps(activity_list, cls=DecimalEncoder, ensure_ascii=False);
  118. f.write(json_str);
  119. def read_csv(self, filename='results.csv'):
  120. data = []
  121. try:
  122. with open(filename, mode='r', encoding='utf-8') as csvfile:
  123. reader = csv.DictReader(csvfile);
  124. for row in reader:
  125. data.append(row);
  126. except IOError as e:
  127. print(f"read_csv error occurred: {e}");
  128. # print(data);
  129. return data;
  130. def read_json(self, filename='results.json'):
  131. data = None;
  132. try:
  133. with open(filename, 'r', encoding='utf-8') as file:
  134. data = json.load(file);
  135. except IOError as e:
  136. print(f"read_json error occurred: {e}");
  137. # print(data);
  138. return data;
  139. def main():
  140. syncer = OfpayDataSyncer();
  141. syncer.sync_activities('elife_activities_data.json');
  142. # syncer.read_json('elife_activities_data.json');
  143. syncer.sync_account('elife_accout_data');
  144. # syncer.read_csv('elife_accout_data.csv');
  145. # syncer.read_json('elife_accout_data.json');
  146. if __name__ == "__main__":
  147. main();