Задача: Найти только яблоки (красные круги).
Приходится читать каждую "коробку" (строку) целиком, даже ненужные фрукты.
Открываем только "колонку" с яблоками. Остальное игнорируем.
Демонстрация различий форматов и работы с JSON на Python (pandas).
import pandas as pd
import json
import os
# --- 1. Работа с CSV (Традиционный) ---
# Создаем данные
data = {
'ID': range(1, 10001),
'Name': ['User'] * 10000,
'Age': [25] * 10000
}
df = pd.DataFrame(data)
# Сохраняем в CSV
df.to_csv('data.csv', index=False)
csv_size = os.path.getsize('data.csv')
print(f"CSV Size: {csv_size/1024:.2f} KB")
# --- 2. Работа с Parquet (Современный) ---
# Конвертируем в Parquet (требует pyarrow/fastparquet)
df.to_parquet('data.parquet')
pq_size = os.path.getsize('data.parquet')
print(f"Parquet Size: {pq_size/1024:.2f} KB")
# Результат: Parquet будет весить меньше благодаря сжатию
# --- 3. Parsing JSON (Semi-structured) ---
json_str = '{"users": [{"id": 1, "role": "admin"}, {"id": 2, "role": "guest"}]}'
parsed = json.loads(json_str)
# Доступ по ключам
admin_role = parsed['users'][0]['role']
print(f"First user role: {admin_role}") Лекция #2 | Aiganym Zhandaulet
Сравнение хранения данных в памяти:
Список хранит ссылки на объекты, разбросанные в памяти.
*Overhead: Type info + Ref count for each item
Данные упакованы плотно, тип единый для всех.
*Raw data buffer (C-style)
Пример слайсинга c = a[0:2, :]. Разные заголовки, один буфер данных.
Задача: Умножить каждый элемент на 2.
import numpy as np
# Python List (поведение по умолчанию - дублирование списка)
lst = [1, 2, 3]
print(lst * 2)
# Результат: [1, 2, 3, 1, 2, 3] (Concatenation)
# NumPy Array (векторизованная математика)
arr = np.array([1, 2, 3])
print(arr * 2)
# Результат: [2 4 6] (Multiplication) a = np.array([1, 2, 3, 4])
b = a[0:2] # Срез - это view
b[0] = 99 # Меняем элемент в срезе
print(a)
# Результат: [99, 2, 3, 4] -> Исходный массив изменился! Нажмите на вопрос, чтобы увидеть ответ.
Lecture #3 | Duman Makhanbetov
Как Pandas расширяет возможности NumPy:
| Характеристика | NumPy | Pandas |
|---|---|---|
| Тип данных | Гомогенные (только числа) | Гетерогенные (смешанные) |
| Доступ | По числовому индексу (0, 1...) | По меткам (Label) и индексам |
| Применение | Тяжелые численные расчеты | Анализ реальных данных, ETL |
| Файлы (I/O) | Ограничено (binary, txt) | Мощное (CSV, Excel, SQL, JSON) |
Pandas построен на NumPy, поэтому легко конвертирует массивы.
import numpy as np
import pandas as pd
# 1. Создаем NumPy массив
np_array = np.array([1, 2, 3, 4, 5])
# 2. Создаем Series
p_series = pd.Series(np_array)
print(p_series.values)
# Вывод: [1 2 3 4 5]
# Примечание: изменение исходного массива может повлиять на Series
# (в зависимости от copy=True/False)
np_array[0] = 9
print(p_series.values) # Часто: [9 2 3 4 5] Использование конструктора pd.DataFrame(data, index, ...).
# Данные: словарь списков (наиболее частый способ)
data = {
'Walmart': [485873, 13643.0, 'USA'],
'State Grid': [315199, 9571.3, 'China']
}
# Создание DataFrame с кастомным индексом
df = pd.DataFrame(data, index=['Revenues', 'Profits', 'Country'])
# Транспонирование (строки -> столбцы)
df = df.T
print(df)
# Revenues Profits Country
# Walmart 485873 13643.0 USA
# State Grid 315199 9571.3 China Нажмите на вопрос, чтобы увидеть ответ.
df.describe(), который показывает среднее, станд. отклонение, минимумы/максимумы и квартили.False. Pandas старается не копировать данные без нужды (быть View), но часто создает копию для безопасности типов.Lecture #3 | Duman Makhanbetov
.agg() (сводка/редукция), .transform() (сохранение размерности) и .apply() (гибкая кастомная логика).merge() (SQL-style joins по ключам), join() (слияние по индексам) и concat() (склеивание по осям).pivot) и наоборот (через melt) критически важно для анализа и визуализации.MultiIndex для иерархических данных и метода explode() для разбиения списков внутри ячеек на отдельные строки.| Метод | Размер результата | Типичное использование |
|---|---|---|
.agg() | Сокращенный (1 строка на группу) | Получение статистики (сумма, среднее) |
.transform() | Такой же, как входной (Same size) | Feature engineering (нормализация внутри группы) |
.apply() | Гибкий | Сложная кастомная логика, возвращающая разные формы |
Вычитание среднего значения по группе из каждого элемента этой группы.
# Центрирование данных по регионам
# Результат сохраняет размерность исходного DataFrame
df.groupby("region")["sales"].transform(lambda x: x - x.mean()) Преобразование широкой таблицы в длинную для анализа.
# id_vars: колонки-идентификаторы (остаются как есть)
# value_vars: колонки, которые станут строками
pd.melt(df,
id_vars=["id"],
value_vars=["math", "english"]) Разбиение списков в ячейках на отдельные строки.
df = pd.DataFrame({
"id": [1, 2],
"tags": ["book,stationery", "electronics,home"]
})
# 1. Превращаем строку в список
df["tags"] = df["tags"].str.split(",")
# 2. "Взрываем" список в строки
df_exploded = df.explode("tags") Нажмите на вопрос, чтобы увидеть ответ.
.agg() возвращает одну строку на группу (редуцирует данные), а .transform() возвращает объект той же длины, что и исходный DataFrame, сохраняя выравнивание строк.merge(). Он поддерживает inner, left, right и outer joins.pivot() не поддерживает дубликаты индексов/колонок и просто меняет форму. pivot_table() умеет агрегировать данные (например, суммировать), если для одной пары индекс-колонка есть несколько значений..loc[]. Например: df.loc[("RegionA", "ProductB")].pd.concat() с параметром axis=0.transform).melt для приведения данных в формат "Tidy Data", необходимый для библиотек типа Seaborn или Altair.pivot_table..str) и explode.Lecture Notes | Aiganym Zhandaulet
| Характеристика | Matplotlib | Seaborn |
|---|---|---|
| Уровень | Low-level, гибкий | High-level, простой |
| Входные данные | Списки, NumPy массивы | Pandas DataFrames |
| Стиль по умолчанию | Базовый (Basic) | Современный, привлекательный |
| Сильные стороны | Полный контроль, кастомизация | Статистические графики (box, violin, heatmap) |
| Сценарий | Точная настройка деталей | Быстрый EDA и сторителлинг |
Требует явного указания всех меток и осей.
import matplotlib.pyplot as plt
# Базовый подход
plt.scatter(tips['total_bill'], tips['tip'])
plt.xlabel("Total Bill")
plt.ylabel("Tip")
plt.title("Scatter Plot with Matplotlib")
plt.show() Интеграция с DataFrame и автоматическая легенда (hue).
import seaborn as sns
# Высокоуровневый подход
# hue="day" автоматически раскрашивает точки по дням и добавляет легенду
sns.scatterplot(
x="total_bill",
y="tip",
data=tips,
hue="day"
) Нажмите на вопрос, чтобы увидеть ответ.
Lecture Notes | Duman Makhanbetov
Простая ментальная модель работы API:
| Характеристика | REST | SOAP | GraphQL |
|---|---|---|---|
| Формат данных | JSON / XML | Только XML | Кастомный язык запросов |
| Эндпоинты | Много (под каждый ресурс) | Один на операцию | Единый эндпоинт |
| Гибкость | Высокая | Низкая (жесткая структура) | Очень высокая |
| Сложность | Просто (Easy) | Сложно (Complex) | Мощно, но требует обучения |
| Применение | Веб и мобильные приложения | Enterprise, банкинг, legacy | Сложные UI, минимизация трафика |
Использование библиотеки requests для получения данных (например, ВВП Казахстана от World Bank API).
import requests
# 1. Определение URL (Endpoint)
# Пример API Всемирного банка
url = "http://api.worldbank.org/v2/country/kz/indicator/NY.GDP.MKTP.CD"
# 2. Параметры запроса (Query Parameters)
params = {
"format": "json",
"date": "2020:2022"
}
# 3. Выполнение GET запроса
response = requests.get(url, params=params)
# 4. Проверка статуса (200 = OK)
if response.status_code == 200:
data = response.json() # Парсинг JSON
# Вывод данных (структура зависит от API)
for entry in data[1]:
print(f"Year: {entry['date']}, GDP: ${entry['value']}")
else:
print(f"Error: {response.status_code}") Клиент запрашивает только название, столицу и население для страны с кодом "KZ".
{
country(code: "KZ") {
name
capital
population
}
} Нажмите на вопрос, чтобы увидеть ответ.
Lecture Notes | Duman Makhanbetov
requests для загрузки HTML и BeautifulSoup для парсинга (разбора) и навигации по дереву тегов.robots.txt, не перегружать серверы запросами, идентифицировать своего бота (User-Agent) и не нарушать авторские права.Пример: <p class="text">Hello World</p>
| Тег | Описание | Что извлекаем? |
|---|---|---|
<div>, <span>, <p> | Блоки контента и текст | Текст (.get_text()) |
<a href="..."> | Гиперссылки | Атрибут href (ссылка) |
<img src="..."> | Изображения | Атрибут src (адрес картинки) |
<ul>, <ol>, <li> | Списки | Элементы списка |
<table>, <tr>, <td> | Таблицы | Табличные данные |
import requests
url = "https://quotes.toscrape.com/"
# Важно: используйте User-Agent, чтобы выглядеть как браузер
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)..."
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
html_content = response.text
print("Page downloaded successfully!")
else:
print(f"Failed to retrieve page: {response.status_code}") from bs4 import BeautifulSoup
# Создаем объект супа
soup = BeautifulSoup(html_content, "html.parser")
# Пример 1: Поиск одного элемента
title = soup.find("h1").get_text(strip=True)
# Пример 2: Поиск всех элементов определенного класса
quotes = soup.find_all("span", class_="text")
for quote in quotes:
print(quote.get_text()) Допустим, HTML выглядит так: <div class="product"><h2>Phone X</h2><span class="price">$999</span></div>
# Логика извлечения
product_block = soup.find("div", class_="product")
data = {
"name": product_block.find("h2").get_text(),
"price": product_block.find("span", class_="price").get_text()
}
print(data)
# Output: {'name': 'Phone X', 'price': '$999'} domain.com/robots.txt перед началом работы, чтобы узнать правила.time.sleep(1)) между запросами, чтобы не "уронить" чужой сервер (DoS-атака).Нажмите на вопрос, чтобы увидеть ответ.
.find() возвращает первый найденный элемент (или None), а .find_all() возвращает список всех подходящих элементов./robots.txt.element["href"]..select().Lecture Notes | Duman Makhanbetov
requests скачивают только исходный HTML. На современных сайтах (React, Vue) данные загружаются через JavaScript/AJAX уже после загрузки страницы, поэтому статический подход возвращает пустые блоки.time.sleep().| Характеристика | Selenium | Playwright |
|---|---|---|
| Драйверы | Требуются отдельные (ChromeDriver) | Встроенные (Built-in) |
| Скорость | Средняя | Высокая (асинхронность) |
| Ожидания (Waits) | Ручные (Explicit Waits) | Автоматические (Auto-wait) |
| Надежность | Хорошая, но возможны "flaky" тесты | Отличная (меньше ошибок) |
| Установка | Простая, но нужно следить за версиями | Требует `playwright install` |
Инициализация драйвера и использование WebDriverWait для надежности.
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Инициализация Chrome
driver = webdriver.Chrome()
# Загрузка страницы
driver.get("https://quotes.toscrape.com/js/")
# Явное ожидание загрузки элемента (вместо time.sleep)
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "quote"))
)
# Извлечение данных
quotes = driver.find_elements(By.CLASS_NAME, "quote")
for q in quotes:
print(q.text)
driver.quit() Использование синхронного API. Браузер запускается в headless-режиме по умолчанию.
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
# Запуск браузера (Chromium)
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# Переход на страницу (авто-ожидание загрузки)
page.goto("https://quotes.toscrape.com/js/")
# Локатор элементов
quotes = page.locator(".quote")
# Получение текста первого элемента
print(quotes.nth(0).inner_text())
browser.close() from selenium.webdriver.chrome.options import Options
chrome_options = Options()
# Запуск без графического интерфейса
chrome_options.add_argument('--headless')
driver = webdriver.Chrome(options=chrome_options) Нажмите на вопрос, чтобы увидеть ответ.
requests скачивает только начальный HTML, но не исполняет JavaScript, который подгружает реальные данные.driver.quit() или browser.close(), чтобы не забить оперативную память "зомби-процессами".Instructor: Duman Makhanbetov
pandas (очистка), re (Regex для строк), pandera (валидация схем) и ydata-profiling (автоматические отчеты).datetime и category) критичны для производительности и корректной сортировки. Дубликаты следует удалять с осторожностью, проверяя, не являются ли они частью временных рядов или транзакций.Pandera позволяет задать жесткие правила (схемы) для данных и ломать пайплайны при их нарушении. Data Profiling — это финальный этап автоматического исследования данных перед моделированием.errors='coerce' превращает ошибочные значения в NaN.
| Метод | Описание | Применение |
|---|---|---|
.fillna(value) | Замена константой (0, "Unknown") | Категории, счетчики |
.fillna(mean/median) | Замена статистикой | Числовые данные (возраст, цена) |
.ffill() / .bfill() | Протягивание значений вперед/назад | Временные ряды (погода, акции) |
Удаление всего, кроме цифр, и валидация email.
# Оставить только цифры в телефоне (\D+ удаляет не-цифры)
df["phone"] = df["phone"].str.replace(r"\D+", "", regex=True)
# Извлечение домена из email
df["domain"] = df["email"].str.extract(r"@([A-Za-z0-9.-]+)") Использование errors='coerce' для превращения ошибок в NaN.
# Если "123a" не число -> превратится в NaN
df["price"] = pd.to_numeric(df["price"], errors="coerce")
# Конвертация в даты
df["date"] = pd.to_datetime(df["date"], errors="coerce") import pandera as pa
from pandera import Column, Check, DataFrameSchema
schema = DataFrameSchema({
# Проверка: возраст от 0 до 120
"age": Column(pa.Int, Check.in_range(0, 120)),
# Проверка: зарплата больше 0
"salary": Column(pa.Float, Check.greater_than(0)),
# Проверка email по regex
"email": Column(pa.String, Check.str_matches(r".+@.+\..+"))
})
# Валидация (вызовет ошибку, если данные плохие)
validated_df = schema.validate(df) Нажмите на вопрос, чтобы увидеть ответ.
df.isna() возвращает True для пропущенных значений, а df.notna() — для существующих (непустых).ProfileReport(df) из библиотеки ydata-profiling.df.dropna(subset=["email"]).int в старых версиях Pandas не поддерживает NaN. Нужно использовать float или специальный Int64.Lecture #10 | Duman Makhanbetov
The flow of data from the web to a data warehouse.
| JOIN Type | Description | Result Logic |
|---|---|---|
| INNER JOIN | Returns records that have matching values in both tables. | Intersection (A ∩ B) |
| LEFT JOIN | Returns all records from the left table, and the matched records from the right table. | Left Side (A) |
| RIGHT JOIN | Returns all records from the right table, and the matched records from the left table. | Right Side (B) |
| FULL OUTER JOIN | Returns all records when there is a match in either left or right table. | Union (A ∪ B) |
Calculating a running total using a CTE for cleaner code.
-- Common Table Expression (CTE)
WITH high_value_orders AS (
SELECT order_id, customer_id, total_amount
FROM orders
WHERE total_amount > 500
)
-- Window Function: Running Total
SELECT
order_id,
customer_id,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM high_value_orders; Connecting to a database and reading/writing data.
import pandas as pd
from sqlalchemy import create_engine
# 1. Create Connection Engine (SQLite example)
engine = create_engine('sqlite:///data.db')
# 2. Writing Data: Pandas -> Database
# Saves dataframe to 'jobs' table
df.to_sql('jobs', engine, index=False, if_exists='replace')
# 3. Reading Data: Database -> Pandas
# Executes SQL query and returns DataFrame
query = "SELECT * FROM jobs WHERE salary > 100000"
df_result = pd.read_sql(query, engine)
print(df_result.head()) Tap to reveal answers.
GROUP BY reduces the number of rows to one per group.Lecture #10 | Duman Makhanbetov
The flow of data from the web to a data warehouse.
| JOIN Type | Description | Result Logic |
|---|---|---|
| INNER JOIN | Returns records that have matching values in both tables. | Intersection (A ∩ B) |
| LEFT JOIN | Returns all records from the left table, and the matched records from the right table. | Left Side (A) |
| RIGHT JOIN | Returns all records from the right table, and the matched records from the left table. | Right Side (B) |
| FULL OUTER JOIN | Returns all records when there is a match in either left or right table. | Union (A ∪ B) |
Calculating a running total using a CTE for cleaner code.
-- Common Table Expression (CTE)
WITH high_value_orders AS (
SELECT order_id, customer_id, total_amount
FROM orders
WHERE total_amount > 500
)
-- Window Function: Running Total
SELECT
order_id,
customer_id,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS running_total
FROM high_value_orders; Connecting to a database and reading/writing data.
import pandas as pd
from sqlalchemy import create_engine
# 1. Create Connection Engine (SQLite example)
engine = create_engine('sqlite:///data.db')
# 2. Writing Data: Pandas -> Database
# Saves dataframe to 'jobs' table
df.to_sql('jobs', engine, index=False, if_exists='replace')
# 3. Reading Data: Database -> Pandas
# Executes SQL query and returns DataFrame
query = "SELECT * FROM jobs WHERE salary > 100000"
df_result = pd.read_sql(query, engine)
print(df_result.head()) Tap to reveal answers.
GROUP BY reduces the number of rows to one per group.Instructor: Aiganym Zhandaulet
| Feature | Cron | Apache Airflow |
|---|---|---|
| Complexity | Simple, lightweight | Robust orchestrator |
| Monitoring | None (check logs manually) | Rich UI for status & logs |
| Retries | No | Built-in retry logic |
| Dependencies | Hard to manage | First-class citizen (DAGs) |
| Scalability | Single machine | Distributed executors |
Running a python script every day at 6:00 AM.
# Minute Hour Day Month Weekday Command
0 6 * * * /usr/bin/python3 /home/user/pipeline.py >> /home/user/pipeline.log 2>&1 A basic pipeline defined in Python.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
print("Pipeline running!")
with DAG(
dag_id="simple_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False
) as dag:
run_etl = PythonOperator(
task_id="run_etl",
python_callable=my_task
) Tap to reveal answers.
Instructor: Duman Makhanbetov
| Feature | Batch Processing | Streaming Processing |
|---|---|---|
| Latency | High (Minutes/Hours) | Low (Milliseconds/Seconds) |
| Data | Bounded chunks | Continuous flow (unbounded) |
| Execution | Scheduled (Periodic) | Always Running |
| Use Cases | Reports, Billing, ETL | Fraud detection, Monitoring |
| Feature | REST | WebSocket |
|---|---|---|
| Connection | Request-Response (Short) | Persistent (Always open) |
| Direction | Client pulls from Server | Bidirectional (Server pushes) |
| Ideal for | Static/Infrequent data | Real-time/Live updates |
Connecting to a live crypto stream (Binance) using Python.
import websocket
import json
def on_message(ws, message):
# Parse the incoming JSON message
data = json.loads(message)
# Print the live price
print("Trade Price:", data["p"], "USD")
# Connect to Binance WebSocket for Bitcoin/USDT trades
ws = websocket.WebSocketApp(
"wss://stream.binance.com:9443/ws/btcusdt@trade",
on_message=on_message
)
# Keep the connection open forever
ws.run_forever() Tap to reveal answers.