441 lines
15 KiB
Python
441 lines
15 KiB
Python
from playwright.sync_api import sync_playwright
|
|
import time
|
|
import os
|
|
from datetime import datetime
|
|
import json
|
|
|
|
URL = "https://pmu.sisponto.com.br/Sispontoweb/open.do?sys=SPW"
|
|
|
|
# CREDENCIAIS FIXAS (TESTE)
|
|
USER = "04348941688"
|
|
PASS = "04348941688"
|
|
|
|
DEBUG_DIR = "/tmp/sisponto_debug"
|
|
|
|
|
|
def ts():
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
|
|
def snap(page, label):
|
|
path = f"{DEBUG_DIR}/{ts()}_{label}.png"
|
|
try:
|
|
page.screenshot(path=path, full_page=True)
|
|
print(f"[SNAP] {path}")
|
|
except Exception as e:
|
|
print(f"[SNAP-FAIL] {label}: {e}")
|
|
|
|
|
|
def dump_frames(page, title):
|
|
print(f"\n==== FRAMES ({title}) ====")
|
|
for i, fr in enumerate(page.frames):
|
|
print(f"[{i}] name={fr.name!r} url={fr.url}")
|
|
print("==== END FRAMES ====\n")
|
|
|
|
|
|
def open_menu_via_barra(inner_frame):
|
|
# Abre o menu lateral sem depender de mouseenter/hover
|
|
print("[DEBUG] Aguardando elemento #barra_me_nu...")
|
|
|
|
# Tentar encontrar o elemento com retry
|
|
max_attempts = 3
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
inner_frame.wait_for_selector("#barra_me_nu", timeout=20000)
|
|
print(f"[OK] Elemento #barra_me_nu encontrado (tentativa {attempt + 1})")
|
|
|
|
result = inner_frame.evaluate(
|
|
"""
|
|
() => {
|
|
const el = document.getElementById("barra_me_nu");
|
|
if (!el) return false;
|
|
el.style.marginLeft = "0px";
|
|
el.style.opacity = "1";
|
|
el.style.visibility = "visible";
|
|
return true;
|
|
}
|
|
"""
|
|
)
|
|
|
|
if result:
|
|
print("[OK] Menu aberto via JS")
|
|
return
|
|
else:
|
|
print(f"[WARN] Elemento não encontrado no DOM (tentativa {attempt + 1})")
|
|
|
|
except Exception as e:
|
|
print(f"[WARN] Erro ao abrir menu (tentativa {attempt + 1}): {e}")
|
|
if attempt < max_attempts - 1:
|
|
time.sleep(2)
|
|
else:
|
|
raise
|
|
|
|
|
|
def parse_grid_to_list(popup_frame):
|
|
"""
|
|
Extrai dados da grid a partir do padrão de IDs:
|
|
*.layout/data (container)
|
|
*.data.item:<row>.item:<col> (células)
|
|
e transforma no formato:
|
|
[
|
|
{"01/12/2025": ["10:05","11:32","12:32"]},
|
|
{"02/12/2025": ["10:26","11:27"]},
|
|
{"03/12/2025": ["folga"]},
|
|
...
|
|
]
|
|
"""
|
|
|
|
# (1) opcional: garantir que o container exista
|
|
container = popup_frame.locator('div[id*=".layout/data"]')
|
|
container.wait_for(state="attached", timeout=20000)
|
|
|
|
# (4) abordagem mais simples: pegar todas as células por padrão *.data.item:*.item:*
|
|
cells = popup_frame.locator('div[id*=".data.item:"][id*=".item:"]')
|
|
|
|
raw_rows = {}
|
|
count = cells.count()
|
|
for i in range(count):
|
|
el = cells.nth(i)
|
|
el_id = el.get_attribute("id") or ""
|
|
text = el.inner_text().strip()
|
|
|
|
# Ex: grid746802.data.item:0.item:1
|
|
try:
|
|
part = el_id.split(".data.item:", 1)[1] # "0.item:1"
|
|
row_str, col_str = part.split(".item:", 1) # "0", "1"
|
|
row_idx = int(row_str)
|
|
col_idx = int(col_str)
|
|
except Exception:
|
|
continue
|
|
|
|
raw_rows.setdefault(row_idx, {})[col_idx] = text
|
|
|
|
# Montagem final ordenada por índice de linha
|
|
result = []
|
|
for row_idx in sorted(raw_rows.keys()):
|
|
row = raw_rows[row_idx]
|
|
date = (row.get(0) or "").strip()
|
|
value = (row.get(1) or "").strip()
|
|
|
|
if not date:
|
|
continue
|
|
|
|
vlow = value.lower().strip()
|
|
|
|
if not value:
|
|
values = []
|
|
elif vlow == "folga":
|
|
values = ["folga"]
|
|
else:
|
|
# Ex: "- 10:05 - 11:32 - 12:32" => ["10:05","11:32","12:32"]
|
|
# Ex: "Observacao qualquer" => ["observacao qualquer"] (mantém como 1 item)
|
|
cleaned = value.replace("-", " ").strip()
|
|
parts = [p.strip() for p in cleaned.split() if p.strip()]
|
|
|
|
# Heurística simples: se tem tokens no formato HH:MM, assume lista de horários
|
|
times = [p for p in parts if len(p) == 5 and p[2] == ":" and p.replace(":", "").isdigit()]
|
|
if times:
|
|
values = times
|
|
else:
|
|
values = [value.strip()]
|
|
|
|
result.append({date: values})
|
|
|
|
return result
|
|
|
|
|
|
def process_timecard_data(data_list):
|
|
"""
|
|
Processa os dados de registro de ponto com as seguintes regras:
|
|
- Batidas pares: subtrai item 0 do item 1, item 2 do item 3, etc.
|
|
- Batidas ímpares e data != hoje: ignora
|
|
- Strings que não são horários: ignora
|
|
- Batidas ímpares e data == hoje: adiciona horário atual como último lançamento
|
|
|
|
Retorna um dicionário com o total de horas trabalhadas por dia.
|
|
"""
|
|
today_str = datetime.now().strftime("%d/%m/%Y")
|
|
current_time = datetime.now().strftime("%H:%M")
|
|
|
|
results = {}
|
|
|
|
for entry in data_list:
|
|
# Cada entry é um dict com uma única chave (a data)
|
|
if not entry:
|
|
continue
|
|
|
|
date = list(entry.keys())[0]
|
|
times = entry[date]
|
|
|
|
# Filtrar apenas horários válidos (formato HH:MM)
|
|
valid_times = []
|
|
for t in times:
|
|
t_str = str(t).strip()
|
|
# Verificar se é horário válido (HH:MM)
|
|
if len(t_str) == 5 and t_str[2] == ':' and t_str[:2].isdigit() and t_str[3:].isdigit():
|
|
valid_times.append(t_str)
|
|
|
|
num_times = len(valid_times)
|
|
|
|
# Se não há horários válidos, pular
|
|
if num_times == 0:
|
|
continue
|
|
|
|
# Se número ímpar de batidas
|
|
if num_times % 2 != 0:
|
|
# Se não é hoje, ignorar
|
|
if date != today_str:
|
|
continue
|
|
# Se é hoje, adicionar horário atual
|
|
valid_times.append(current_time)
|
|
num_times += 1
|
|
|
|
# Calcular total de horas trabalhadas (pares)
|
|
total_minutes = 0
|
|
for i in range(0, num_times, 2):
|
|
time_in = valid_times[i]
|
|
time_out = valid_times[i + 1]
|
|
|
|
# Converter para minutos
|
|
h_in, m_in = map(int, time_in.split(':'))
|
|
h_out, m_out = map(int, time_out.split(':'))
|
|
|
|
minutes_in = h_in * 60 + m_in
|
|
minutes_out = h_out * 60 + m_out
|
|
|
|
# Calcular diferença
|
|
diff = minutes_out - minutes_in
|
|
total_minutes += diff
|
|
|
|
# Converter total de minutos para horas:minutos
|
|
hours = total_minutes // 60
|
|
minutes = total_minutes % 60
|
|
|
|
results[date] = {
|
|
'total_minutes': total_minutes,
|
|
'total_formatted': f"{hours:02d}:{minutes:02d}",
|
|
'times': valid_times
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
os.makedirs(DEBUG_DIR, exist_ok=True)
|
|
print(f"[DEBUG] Arquivos em: {DEBUG_DIR}")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=False, # ver acontecendo
|
|
slow_mo=120 # desacelerar para debug
|
|
)
|
|
|
|
context = browser.new_context(
|
|
viewport={"width": 1500, "height": 950},
|
|
)
|
|
|
|
page = context.new_page()
|
|
|
|
page.on("console", lambda msg: print(f"[BROWSER-CONSOLE] {msg.type}: {msg.text}"))
|
|
page.on("pageerror", lambda exc: print(f"[BROWSER-ERROR] {exc}"))
|
|
|
|
# ============================================================
|
|
# ABERTURA
|
|
# ============================================================
|
|
print("[1] Abrindo URL...")
|
|
page.goto(URL, wait_until="domcontentloaded")
|
|
snap(page, "01_abriu_url")
|
|
dump_frames(page, "apos_open")
|
|
|
|
# ============================================================
|
|
# LOGIN (iframe mainform)
|
|
# ============================================================
|
|
print("[2] Login...")
|
|
login_frame = page.frame(name="mainform")
|
|
if not login_frame:
|
|
raise RuntimeError("Iframe mainform (login) não encontrado")
|
|
|
|
login_frame.locator('input[name="WFRInput744594"]').wait_for(state="visible", timeout=15000)
|
|
login_frame.locator('input[name="WFRInput744594"]').fill(USER)
|
|
login_frame.locator('input[name="WFRInput744595"]').fill(PASS)
|
|
snap(page, "02_login_preenchido")
|
|
|
|
login_frame.locator("div.HTMLButton_Logar").wait_for(state="visible", timeout=15000)
|
|
login_frame.locator("div.HTMLButton_Logar").click()
|
|
snap(page, "03_click_logar")
|
|
|
|
# ============================================================
|
|
# ESPERAR TELA PRINCIPAL (openform.do)
|
|
# ============================================================
|
|
print("[3] Aguardando openform.do pós-login...")
|
|
inner_frame = None
|
|
t0 = time.time()
|
|
timeout = 40 # aumentar timeout para 40 segundos
|
|
|
|
while time.time() - t0 < timeout:
|
|
# após login, o frame mainform muda de URL
|
|
for fr in page.frames:
|
|
url = fr.url or ""
|
|
name = fr.name or ""
|
|
|
|
# Procurar por frame mainform com openform.do OU frame sem nome com openform.do
|
|
if ("openform.do" in url and "formID" in url) and (name == "mainform" or name == ""):
|
|
inner_frame = fr
|
|
print(f"[DEBUG] Frame encontrado: name={name!r} url={url[:80]}...")
|
|
break
|
|
|
|
if inner_frame:
|
|
break
|
|
|
|
time.sleep(0.3)
|
|
|
|
# Debug a cada 5 segundos
|
|
if int(time.time() - t0) % 5 == 0:
|
|
print(f"[DEBUG] Ainda aguardando... ({int(time.time() - t0)}s)")
|
|
|
|
dump_frames(page, "apos_login")
|
|
|
|
if not inner_frame:
|
|
snap(page, "ERRO_openform_nao_apareceu")
|
|
print("[ERRO] Frames disponíveis:")
|
|
for i, fr in enumerate(page.frames):
|
|
print(f" [{i}] name={fr.name!r} url={fr.url}")
|
|
raise RuntimeError("openform.do não apareceu após login")
|
|
|
|
print(f"[OK] inner_frame: name={inner_frame.name!r} url={inner_frame.url}")
|
|
snap(page, "04_openform_ok")
|
|
|
|
# Aguardar estabilização do frame
|
|
print("[DEBUG] Aguardando estabilização do frame...")
|
|
time.sleep(2)
|
|
|
|
# Verificar se o frame ainda está válido
|
|
try:
|
|
test_url = inner_frame.url
|
|
print(f"[DEBUG] Frame URL ainda válida: {test_url[:80]}...")
|
|
except Exception as e:
|
|
print(f"[WARN] Frame pode ter sido destruído, procurando novamente: {e}")
|
|
# Re-procurar o frame
|
|
for fr in page.frames:
|
|
url = fr.url or ""
|
|
if "openform.do" in url and "formID" in url:
|
|
inner_frame = fr
|
|
print(f"[OK] Frame re-encontrado: {url[:80]}...")
|
|
break
|
|
|
|
# ============================================================
|
|
# ABRIR MENU VIA JS (barra_me_nu)
|
|
# ============================================================
|
|
print('[4] Abrindo menu via JS: #barra_me_nu.style.marginLeft="0px"')
|
|
|
|
# Aguardar página estar completamente carregada
|
|
try:
|
|
inner_frame.wait_for_load_state("domcontentloaded", timeout=10000)
|
|
inner_frame.wait_for_load_state("networkidle", timeout=15000)
|
|
except Exception as e:
|
|
print(f"[WARN] Timeout aguardando load_state: {e}")
|
|
|
|
time.sleep(1.5) # Aguardar um pouco mais antes de abrir o menu
|
|
|
|
open_menu_via_barra(inner_frame)
|
|
time.sleep(0.8)
|
|
snap(page, "05_menu_aberto_barra_me_nu")
|
|
|
|
# ============================================================
|
|
# NAVEGAR MENU: Menu do Colaborador -> Consultas -> Consulta do ponto
|
|
# ============================================================
|
|
print("[5] Menu do Colaborador...")
|
|
m1 = inner_frame.locator("text=Menu do Colaborador").first
|
|
m1.wait_for(state="visible", timeout=15000)
|
|
m1.hover()
|
|
time.sleep(0.25)
|
|
snap(page, "06_hover_menu_do_colaborador")
|
|
|
|
print("[6] Consultas...")
|
|
m2 = inner_frame.locator("text=Consultas").first
|
|
m2.wait_for(state="visible", timeout=15000)
|
|
m2.hover()
|
|
time.sleep(0.25)
|
|
snap(page, "07_hover_consultas")
|
|
|
|
print("[7] Clique em Consulta do ponto (popup)...")
|
|
m3 = inner_frame.locator("text=Consulta do ponto").first
|
|
m3.wait_for(state="visible", timeout=15000)
|
|
m3.hover()
|
|
time.sleep(0.15)
|
|
snap(page, "08_hover_consulta_do_ponto")
|
|
|
|
with context.expect_page(timeout=20000) as popup_info:
|
|
m3.click()
|
|
|
|
popup = popup_info.value
|
|
popup.wait_for_load_state("domcontentloaded")
|
|
popup.wait_for_load_state("networkidle")
|
|
time.sleep(0.5)
|
|
snap(popup, "09_popup_aberto")
|
|
|
|
# ============================================================
|
|
# ESCOLHER FRAME DO POPUP (se houver iframe interno)
|
|
# ============================================================
|
|
print("[8] Detectando frame final no popup...")
|
|
for i, fr in enumerate(popup.frames):
|
|
print(f"[POPUP-FRAME {i}] name={fr.name!r} url={fr.url}")
|
|
|
|
popup_frame = popup.main_frame
|
|
for fr in popup.frames:
|
|
if fr != popup.main_frame and fr.url and "about:blank" not in fr.url:
|
|
popup_frame = fr
|
|
break
|
|
|
|
print(f"[OK] popup_frame: name={popup_frame.name!r} url={popup_frame.url}")
|
|
snap(popup, "10_popup_frame_ok")
|
|
|
|
# ============================================================
|
|
# EXTRAÇÃO DA GRID E TRANSFORMAÇÃO
|
|
# ============================================================
|
|
print("[9] Extraindo grid (*.layout/data / *.data.item:*.item:*) ...")
|
|
data_list = parse_grid_to_list(popup_frame)
|
|
|
|
out_json = f"{DEBUG_DIR}/{ts()}_resultado.json"
|
|
with open(out_json, "w", encoding="utf-8") as f:
|
|
json.dump(data_list, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"[OK] Resultado salvo em: {out_json}")
|
|
print("[OK] Todos os registros extraídos:")
|
|
for item in data_list:
|
|
print(" ", item)
|
|
|
|
# ============================================================
|
|
# PROCESSAMENTO DE HORAS TRABALHADAS
|
|
# ============================================================
|
|
print("\n[10] Processando horas trabalhadas...")
|
|
processed_data = process_timecard_data(data_list)
|
|
|
|
out_processed = f"{DEBUG_DIR}/{ts()}_horas_trabalhadas.json"
|
|
with open(out_processed, "w", encoding="utf-8") as f:
|
|
json.dump(processed_data, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"[OK] Horas trabalhadas salvas em: {out_processed}")
|
|
print("[OK] Horas trabalhadas:")
|
|
for date, info in processed_data.items():
|
|
print(f" {date}: {info['total_formatted']}")
|
|
|
|
# Também salva o HTML do frame (útil para validar)
|
|
html_path = f"{DEBUG_DIR}/{ts()}_popup_frame.html"
|
|
with open(html_path, "w", encoding="utf-8") as f:
|
|
f.write(popup_frame.content())
|
|
print(f"[OK] HTML do popup_frame salvo em: {html_path}")
|
|
|
|
# ============================================================
|
|
# PAUSA PARA INSPEÇÃO
|
|
# ============================================================
|
|
#print("\n[PAUSE] Inspector aberto. Clique Resume para encerrar.\n")
|
|
#page.pause()
|
|
|
|
context.close()
|
|
browser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|