Добавил:
SuperciliousMe
Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз:
Предмет:
Файл:Лаба 5 / 20_var
.pyimport sys
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import uniform, expon
warnings.filterwarnings("ignore")
def __plot__(figure: int, plotting_T: dict = {}, plotting_Q: dict = {}):
if plotting_Q:
fig = plt.figure(figure, figsize=(15, 5))
# Первый график (plotting_T)
ax1 = fig.add_subplot(121)
ax1.spines['right'].set_color('none')
ax1.spines['top'].set_color('none')
ax1.plot(plotting_T["arr_x"], plotting_T["arr_y1"], label=plotting_T["legend1"], color=plotting_T["color1"])
if "arr_y2" in plotting_T:
ax1.plot(plotting_T["arr_x"], plotting_T["arr_y2"], label=plotting_T["legend2"], color=plotting_T["color2"])
ax1.grid()
ax1.set_xlabel(plotting_T["xlabel"])
ax1.set_ylabel(plotting_T["ylabel"])
ax1.set_title(plotting_T["title"])
if plotting_T["legend1"] != None or plotting_T["legend2"] != None:
ax1.legend()
# Второй график (plotting_Q)
ax2 = fig.add_subplot(122)
ax2.spines['right'].set_color('none')
ax2.spines['top'].set_color('none')
ax2.plot(plotting_Q["arr_x"], plotting_Q["arr_y1"], label=plotting_Q["legend1"], color=plotting_Q["color1"])
if "arr_y2" in plotting_Q:
ax2.plot(plotting_Q["arr_x"], plotting_Q["arr_y2"], label=plotting_Q["legend2"], color=plotting_Q["color2"])
ax2.grid()
ax2.set_xlabel(plotting_Q["xlabel"])
ax2.set_ylabel(plotting_Q["ylabel"])
ax2.set_title(plotting_Q["title"])
if plotting_Q["legend1"] != None and plotting_Q["legend2"] != None:
ax2.legend()
else:
fig = plt.figure(figure)
ax = fig.add_axes((0.1, 0.1, 0.8, 0.8))
ax.spines['right'].set_color('none')
ax.spines['top'].set_color('none')
ax.plot(plotting_T["arr_x"], plotting_T["arr_y"], label=plotting_T["legend"], color=plotting_T["color"])
ax.grid()
ax.set_xlabel(plotting_T["xlabel"])
ax.set_ylabel(plotting_T["ylabel"])
ax.set_title(plotting_T["title"])
if plotting_T["legend"] != None:
ax.legend()
def plot_expon_distribution(lambda_, figure):
# Определение границ графика:
temp = expon.rvs(scale= 1/lambda_, size=10000)
x_min, x_max = min(temp), max(temp)
# Построение графика:
x = np.linspace(x_min, x_max, 1000)
y = expon.pdf(x, scale=1/lambda_)
title = 'Плотность экспоненциального распределения распределения'
legend = fr'$\lambda = {lambda_}$'
# Параметры для построения графика
plot = {}
plot["arr_x"], plot["arr_y"] = x, y
plot["title"], plot["legend"] = title, legend
plot["color"] = "black"
plot["xlabel"], plot["ylabel"] = ("",)*2
__plot__(figure, plot)
def plot_uniform_distribution(a, b, figure):
# Определение границ интервала
x_min, x_max = a - 0.05, b + 0.05
# Построение графика
x = np.linspace(x_min, x_max, 100)
y = uniform.pdf(x, loc=a, scale=b-a)
title = 'Плотность равномерного распределения'
legend = f'{a = :.2f}\n{b = :.2f}'
# Параметры для построения графика
plot = {}
plot["arr_x"], plot["arr_y"] = x, y
plot["title"], plot["legend"] = title, legend
plot["color"] = "black"
plot["xlabel"], plot["ylabel"] = ("",)*2
__plot__(figure, plot)
def model_of_queuing_system(mu, l, N, is_test):
# Основные параметры
a = 1 / mu - 0.05 * mu
b = a + 0.1 * mu
new_t_income = lambda: expon.rvs(scale=1 / l)
new_t_outcome = lambda: expon.rvs(scale= 1 / mu) if is_test else uniform.rvs(loc=a, scale=b-a)
ts = 0
busy = False
t_in = new_t_income()
t_ins = []
t_out = t_in
t_outs = []
n, k, m, fail = 0, 0, 0, 0
q_Old = sys.float_info.max
q_Data = []
t_mean_old = sys.float_info.max
counter = 0
# Процесс моделирования
while True:
counter += 1
# Проверяем наступило ли время освобождения ОУ
if t_in <= t_out:
# Записываем в системное время момент поступления заявки
ts = t_in
n += 1 # Увеличиваем счетчик поступивших заявок
# Сохраняем в список момент поступления заявки (Записываем только в случае когда busy = False, m < N)
if not busy or m < N:
t_ins.append(t_in)
# Момент поступления следующей заявкм
t_in_new = new_t_income()
# Для подсчета производительности
q_Data.append(t_in_new)
# Система занята?
if busy:
# Количество заявок в буфере превышает объем буфера?
if m < N:
m += 1
else:
fail += 1
else:
busy = True
t_out = ts + new_t_outcome()
# Вычисляем следующий момент поступления заявки
t_in = ts + t_in_new
else:
# Записываем в системное время момент освобождения ОУ
ts = t_out
k += 1 # Увеличиваем счетчик обслуженных заявок
# Сохраняем в список момент освобождения ОУ
t_outs.append(t_out)
# В буфере больше 0 заявок?
if m > 0:
m -= 1
t_out = ts + new_t_outcome()
else:
busy = False
t_out = t_in
# Рассматриваем возможность выхода каждые 1000 итераций
if counter % 1000 == 0:
t_mean_new = np.mean([t_outs[i] - t_ins[i] for i in range(len(t_outs))])
p_fail = fail / n
p_success = 1 - p_fail
q_New = (p_success) / np.mean(q_Data)
if (
abs((t_mean_new - t_mean_old) / t_mean_old) < 0.0001
or abs((q_New - q_Old) / q_Old) < 0.0001
):
return t_mean_new, q_New
t_mean_old = t_mean_new
q_Old = q_New
def some_experiments(mu, N, is_test, figure):
# Параметры для тестирования
lambdas = np.linspace(0.1, 1, 10) * mu
plotting_T, plotting_Q = {}, {}
T_theoretical = []
T_experimental = []
Q_theoretical = []
Q_experimental = []
# Сбор практических данных
for l in lambdas:
print(f'{l = :.2}')
t_exp, q_exp = model_of_queuing_system(mu, l, N, is_test)
T_experimental = np.append(T_experimental, t_exp)
Q_experimental = np.append(Q_experimental, q_exp)
if is_test:
# Сбор теоретических данных
for l in lambdas:
ro = l / mu
temp_t = (1 / l) * sum((k * (1 - ro) * ro**k) / (1 - ro ** (N + 1)) for k in range(1, N + 2))
p_fail = ((1 - ro) * (ro ** (N + 1))) / (1 - ro ** (N + 2))
Q_theoretical.append(l * (1 - p_fail))
T_theoretical.append(temp_t)
# Параметры для построения графиков
plotting_T["arr_y2"], plotting_Q["arr_y2"] = T_theoretical[:9], Q_theoretical[:9]
plotting_T["arr_x"], plotting_Q["arr_x"] = lambdas[:9], lambdas[:9]
plotting_T["arr_y1"], plotting_Q["arr_y1"] = T_experimental[:9], Q_experimental[:9]
plotting_T["legend2"], plotting_Q["legend2"] = ('Теоретическое',)*2
plotting_T["legend1"], plotting_Q["legend1"] = ('Экспериментальное',)*2
plotting_T['color1'], plotting_Q['color1'] = 'black', "orange"
plotting_T['color2'], plotting_Q['color2'] = 'red', "purple"
plotting_T["xlabel"], plotting_Q["xlabel"] = ('Интенсивность входного потока $(\lambda)$',)*2
plotting_T["ylabel"], plotting_Q["ylabel"] = 'Ср. время в системе', 'Производительность'
plotting_T["title"], plotting_Q["title"] = 'Моделирование среднего времени', 'Моделирование производительности'
__plot__(figure, plotting_T, plotting_Q)
return T_experimental, Q_experimental
if __name__ == "__main__":
# Параметры по варианту
mu, N = 5, 4
# Определение границ для равномерного распределения
a = np.random.random()
b = a + 0.1 * mu
# Построение графиков функций распределений
plot_expon_distribution(mu, 1)
plot_uniform_distribution(a, b, 2)
# Запуск моделирования на тестовом случае
print('Моделирование на тестовом случае:')
some_experiments(mu, N, is_test=True, figure=3)
# Запуск моделирования на основном случае
print('\nМоделирование на основном случае:')
index = [rf"{i:.1f}mu" for i in np.linspace(0.1, 1, 10)]
arr_T, arr_Q = some_experiments(mu, N, is_test=False, figure=4)
df = pd.DataFrame(arr_T, index=index, columns=["T"])
df["Q"] = arr_Q
print(df.T)
plt.show()