광주인력개발원
파이썬 개인 프로젝트 - ICU Flowsheet 프로그램 만들기 1일차 [2023-06-29 개발일지]
플광
2023. 7. 2. 12:41
import datetime
import random
import sqlite3
import numpy as np
from faker import Faker
from class_employee import Employee
from class_patient import Patient
class DBConnector:
_instance = None
def __new__(cls, test_option=None):
if not isinstance(cls._instance, cls):
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, test_option=None):
self.conn = None
self.test_option = test_option
self.faker = Faker("ko-KR")
def start_conn(self):
if self.test_option is True:
self.conn = sqlite3.connect('db_test.db')
else:
self.conn = sqlite3.connect('db_flow.db')
return self.conn.cursor()
def end_conn(self):
if self.conn is not None:
self.conn.close()
self.conn = None
def commit_db(self):
if self.conn is not None:
self.conn.commit()
else:
raise f"cannot commit database! {self.__name__}"
## Patients ======================================================================= ##
def insert_dummy_patient(self):
c = self.start_conn()
random_register_id_list = DBConnector.get_random_register_num_list(100)
random_back_ssn_list = DBConnector.get_random_back_ssn_num_list(100)
for i in range(100):
name = self.faker.name()
birth_date = self.faker.date()
date_obj = datetime.datetime.strptime(birth_date, '%Y-%m-%d')
front_ssn = f'{str(date_obj.year)[2:]}{date_obj.month:02d}{date_obj.day:02d}'
ssn = f"{front_ssn}-{random_back_ssn_list[i]}"
register_id = random_register_id_list[i]
c.execute("insert into patients(name, birth_date, ssn, register_id) values (?, ?, ?, ?)",
(name, birth_date, ssn, register_id,))
self.commit_db()
self.end_conn()
def clear_patients_table(self):
c = self.start_conn()
c.execute("delete from patients")
self.commit_db()
self.end_conn()
def get_patient_by_register_id(self, register_id):
c = self.start_conn()
patient_data = c.execute("select * from patients where register_id = (?)", (register_id,)).fetchone()
patient_obj = Patient(*patient_data)
self.end_conn()
return patient_obj
## Employees ======================================================================= ##
def insert_dummy_employees(self):
c = self.start_conn()
patient_list = c.execute('select id, name from patients').fetchall()
random.shuffle(patient_list)
for i in range(49):
name = self.faker.name()
cell_num = self.faker.phone_number()
simple_profile = self.faker.simple_profile()
username = simple_profile['username']
pw = "12345678"
if random.random() > 0.7:
patient_id, name = patient_list.pop()
print(name, patient_id, cell_num, username, pw)
c.execute(
"insert into employees(name, patient_id, cell_phone_num, login_id, login_password) values(?, ?, ?, ?, ?)",
(name, patient_id, cell_num, username, pw,))
else:
print(name, cell_num, username, pw)
c.execute("insert into employees(name, cell_phone_num, login_id, login_password) values (?,?,?,?)",
(name, cell_num, username, pw,))
c.execute("insert into employees(name, cell_phone_num, login_id, login_password) values (?,?,?,?)",
('광현', self.faker.phone_number(), 'park', '1234',))
self.commit_db()
self.end_conn()
def findAll_employees(self):
result_list = list()
c = self.start_conn()
fetched_data = c.execute("""
select * from employees
""").fetchall()
for row in fetched_data:
if row[2] is not None:
fetched_patient_data = c.execute("""select * from patients where id = (?)""", (row[2],)).fetchone()
patient = Patient(*fetched_patient_data)
temp_list = list(row)
temp_list.pop(2)
temp_list.insert(2, patient)
result_list.append(Employee(*temp_list))
else:
result_list.append(Employee(*row))
self.end_conn()
return result_list
## Flowsheet ======================================================================= ##
def insert_dummy_flowsheet(self):
c = self.start_conn()
fetched_patient_id_list = [x[0] for x in c.execute("select id from patients").fetchall()]
fetched_timeline_list = [x[0] for x in c.execute("select id from timeline").fetchall()]
fetched_vital_list = [x[0] for x in c.execute("select id from vital").fetchall()]
offset = 0
for idx, patient_id in enumerate(fetched_patient_id_list):
while True:
timeline_id = random.choice(fetched_timeline_list)
hospital_hours = random.randrange(40, 100)
if timeline_id + hospital_hours < 55845:
break
for i in range(hospital_hours):
vital_id = fetched_vital_list[offset]
timeline_id += i
offset += 1
c.execute("insert into flowsheet(patient_id, timeline_id, vital_id) values (?, ?, ?)",
(patient_id, timeline_id, vital_id,))
# print((patient_id, timeline_id, vital_id,))
self.commit_db()
self.end_conn()
## TimeLine ======================================================================= ##
def clear_time_line_table(self):
c = self.start_conn()
c.execute("""delete from timeline""")
self.commit_db()
self.end_conn()
def insert_dummy_timeline(self):
c = self.start_conn()
offset = 0
date = datetime.datetime.strptime("2022-01-01 00:00", "%Y-%m-%d %H:%M")
for i in range(365 * 3 * 24):
temp_date = date + datetime.timedelta(hours=offset)
offset += 1
c.execute('insert into timeline(time_line_information) values (?)', (temp_date.strftime('%Y-%m-%d %H:%M'),))
self.commit_db()
self.end_conn()
## Vital ======================================================================= ##
def clear_vital_table(self):
c = self.start_conn()
c.execute("""delete from vital""")
self.commit_db()
self.end_conn()
def insert_dummy_vital_data(self):
c = self.start_conn()
for i in range(10000):
bt = int(np.random.normal(scale=4) + 370)
hr = int(np.random.normal(scale=10) + 80)
rr = int(np.random.normal(scale=4) + 16)
while True:
sbp = int(np.random.normal(scale=30) + 140)
dbp = int(np.random.normal(scale=20) + 100)
mbp = (dbp * 2 + sbp) // 3
if 60 > sbp - dbp > 25 and (dbp < 105):
break
c.execute("""
insert into vital(bt, hr, rr, sbp, dbp, mbp) values (?, ?, ?, ?, ?, ?)
""", (bt, hr, rr, sbp, dbp, mbp,))
self.commit_db()
self.end_conn()
@staticmethod
def get_random_register_num_list(size):
result_list = list()
while len(result_list) < size:
rand_num = f"{int(random.random() * (10 ** 6)):06d}"
if rand_num not in result_list:
result_list.append(rand_num)
return result_list
@staticmethod
def get_random_back_ssn_num_list(size):
result_list = list()
while len(result_list) < size:
gender = random.randint(1, 5)
back_num = f'{int(random.random() * (10 ** 6)):06d}'
sample_ssn = f'{gender}{back_num}'
if sample_ssn not in result_list:
result_list.append(sample_ssn)
return result_list
if __name__ == '__main__':
connector = DBConnector(test_option=True)
Patient Class
import datetime
class Patient:
def __init__(self, patient_id, name, birth_date, ssn, register_id):
self.patient_id = patient_id
self.name = name
self.birth_date = Patient.set_date_time(birth_date)
self.ssn = ssn
self.register_id = register_id
self.flowSheet_list = None
def __str__(self):
return f"{self.__repr__()}"
def __repr__(self):
return self.__dict__
def set_flowsheet_list(self, list_):
assert isinstance(list_, list)
self.flowSheet_list = list_
@staticmethod
def set_date_time(birth_date_str):
return datetime.datetime.strptime(birth_date_str, '%Y-%m-%d')
특별한 개발이슈는 없었습니다.
감사합니다.