use crate::handler::blockers::intercept_manager::NetworkInterceptManager;
use hashbrown::{HashMap, HashSet};
use std::pin::Pin;
use std::time::{Duration, Instant};
use fnv::FnvHashMap;
use futures::channel::mpsc::Receiver;
use futures::channel::oneshot::Sender as OneshotSender;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::task::{Context, Poll};
use crate::listeners::{EventListenerRequest, EventListeners};
use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
use chromiumoxide_cdp::cdp::browser_protocol::target::*;
use chromiumoxide_cdp::cdp::events::CdpEvent;
use chromiumoxide_cdp::cdp::events::CdpEventMessage;
use chromiumoxide_types::{CallId, Message, Method, Response};
use chromiumoxide_types::{MethodId, Request as CdpRequest};
pub(crate) use page::PageInner;
use crate::cmd::{to_command_response, CommandMessage};
use crate::conn::Connection;
use crate::error::{CdpError, Result};
use crate::handler::browser::BrowserContext;
use crate::handler::frame::FrameRequestedNavigation;
use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
use crate::handler::job::PeriodicJob;
use crate::handler::session::Session;
use crate::handler::target::TargetEvent;
use crate::handler::target::{Target, TargetConfig};
use crate::handler::viewport::Viewport;
use crate::page::Page;
pub const REQUEST_TIMEOUT: u64 = 60_000;
pub mod blockers;
pub mod browser;
pub mod commandfuture;
pub mod domworld;
pub mod emulation;
pub mod frame;
pub mod http;
pub mod httpfuture;
mod job;
pub mod network;
mod page;
mod session;
pub mod target;
pub mod target_message_future;
pub mod viewport;
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Handler {
pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
from_browser: Fuse<Receiver<HandlerMessage>>,
pub default_browser_context: BrowserContext,
pub browser_contexts: HashSet<BrowserContext>,
target_ids: Vec<TargetId>,
targets: HashMap<TargetId, Target>,
navigations: FnvHashMap<NavigationId, NavigationRequest>,
sessions: HashMap<SessionId, Session>,
conn: Connection<CdpEventMessage>,
evict_command_timeout: PeriodicJob,
next_navigation_id: usize,
config: HandlerConfig,
event_listeners: EventListeners,
closing: bool,
}
impl Handler {
pub(crate) fn new(
mut conn: Connection<CdpEventMessage>,
rx: Receiver<HandlerMessage>,
config: HandlerConfig,
) -> Self {
let discover = SetDiscoverTargetsParams::new(true);
let discover_id = discover.identifier();
if let Ok(params) = serde_json::to_value(discover) {
let _ = conn.submit_command(discover_id, None, params);
}
let browser_contexts = config
.context_ids
.iter()
.map(|id| BrowserContext::from(id.clone()))
.collect();
Self {
pending_commands: Default::default(),
from_browser: rx.fuse(),
default_browser_context: Default::default(),
browser_contexts,
target_ids: Default::default(),
targets: Default::default(),
navigations: Default::default(),
sessions: Default::default(),
conn,
evict_command_timeout: PeriodicJob::new(config.request_timeout),
next_navigation_id: 0,
config,
event_listeners: Default::default(),
closing: false,
}
}
pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
self.targets.get(target_id)
}
pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
self.targets.values()
}
pub fn default_browser_context(&self) -> &BrowserContext {
&self.default_browser_context
}
pub fn set_default_browser_context(&mut self, context_id: BrowserContextId) -> &BrowserContext {
let browser_context = BrowserContext {
id: Some(context_id),
};
self.browser_contexts.insert(browser_context.clone());
self.default_browser_context = browser_context;
&self.default_browser_context
}
pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
self.browser_contexts.iter()
}
fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
if let Some(nav) = self.navigations.remove(&id) {
match nav {
NavigationRequest::Navigate(mut nav) => {
if nav.navigated {
let _ = nav.tx.send(Ok(resp));
} else {
nav.set_response(resp);
self.navigations
.insert(id, NavigationRequest::Navigate(nav));
}
}
}
}
}
fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
match res {
Ok(ok) => {
let id = *ok.navigation_id();
if let Some(nav) = self.navigations.remove(&id) {
match nav {
NavigationRequest::Navigate(mut nav) => {
if let Some(resp) = nav.response.take() {
let _ = nav.tx.send(Ok(resp));
} else {
nav.set_navigated();
self.navigations
.insert(id, NavigationRequest::Navigate(nav));
}
}
}
}
}
Err(err) => {
if let Some(nav) = self.navigations.remove(err.navigation_id()) {
match nav {
NavigationRequest::Navigate(nav) => {
let _ = nav.tx.send(Err(err.into()));
}
}
}
}
}
}
fn on_response(&mut self, resp: Response) {
if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
match req {
PendingRequest::CreateTarget(tx) => {
match to_command_response::<CreateTargetParams>(resp, method) {
Ok(resp) => {
if let Some(target) = self.targets.get_mut(&resp.target_id) {
target.set_initiator(tx);
}
}
Err(err) => {
let _ = tx.send(Err(err)).ok();
}
}
}
PendingRequest::GetTargets(tx) => {
match to_command_response::<GetTargetsParams>(resp, method) {
Ok(resp) => {
let targets: Vec<TargetInfo> = resp.result.target_infos;
let results = targets.clone();
for target_info in targets {
let target_id = target_info.target_id.clone();
let event: EventTargetCreated = EventTargetCreated { target_info };
self.on_target_created(event);
let attach = AttachToTargetParams::new(target_id);
let _ = self.conn.submit_command(
attach.identifier(),
None,
serde_json::to_value(attach).unwrap(),
);
}
let _ = tx.send(Ok(results)).ok();
}
Err(err) => {
let _ = tx.send(Err(err)).ok();
}
}
}
PendingRequest::Navigate(id) => {
self.on_navigation_response(id, resp);
if self.config.only_html && !self.config.created_first_target {
self.config.created_first_target = true;
}
}
PendingRequest::ExternalCommand(tx) => {
let _ = tx.send(Ok(resp)).ok();
}
PendingRequest::InternalCommand(target_id) => {
if let Some(target) = self.targets.get_mut(&target_id) {
target.on_response(resp, method.as_ref());
}
}
PendingRequest::CloseBrowser(tx) => {
self.closing = true;
let _ = tx.send(Ok(CloseReturns {})).ok();
}
}
}
}
pub(crate) fn submit_external_command(
&mut self,
msg: CommandMessage,
now: Instant,
) -> Result<()> {
let call_id = self
.conn
.submit_command(msg.method.clone(), msg.session_id, msg.params)?;
self.pending_commands.insert(
call_id,
(PendingRequest::ExternalCommand(msg.sender), msg.method, now),
);
Ok(())
}
pub(crate) fn submit_internal_command(
&mut self,
target_id: TargetId,
req: CdpRequest,
now: Instant,
) -> Result<()> {
let call_id = self.conn.submit_command(
req.method.clone(),
req.session_id.map(Into::into),
req.params,
)?;
self.pending_commands.insert(
call_id,
(PendingRequest::InternalCommand(target_id), req.method, now),
);
Ok(())
}
fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
let msg = GetTargetsParams { filter: None };
let method = msg.identifier();
if let Ok(params) = serde_json::to_value(msg) {
if let Ok(call_id) = self.conn.submit_command(method.clone(), None, params) {
self.pending_commands
.insert(call_id, (PendingRequest::GetTargets(tx), method, now));
}
}
}
fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
if let Ok(call_id) = self.conn.submit_command(
req.method.clone(),
req.session_id.map(Into::into),
req.params,
) {
self.pending_commands
.insert(call_id, (PendingRequest::Navigate(id), req.method, now));
}
}
fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
let close_msg = CloseParams::default();
let method = close_msg.identifier();
if let Ok(call_id) = self.conn.submit_command(
method.clone(),
None,
serde_json::to_value(close_msg).unwrap(),
) {
self.pending_commands
.insert(call_id, (PendingRequest::CloseBrowser(tx), method, now));
}
}
fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
if msg.is_navigation() {
let (req, tx) = msg.split();
let id = self.next_navigation_id();
target.goto(FrameRequestedNavigation::new(id, req));
self.navigations.insert(
id,
NavigationRequest::Navigate(NavigationInProgress::new(tx)),
);
} else {
let _ = self.submit_external_command(msg, now);
}
}
fn next_navigation_id(&mut self) -> NavigationId {
let id = NavigationId(self.next_navigation_id);
self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
id
}
fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
match url::Url::parse(¶ms.url) {
Ok(_) => {
let method = params.identifier();
match serde_json::to_value(params) {
Ok(params) => match self.conn.submit_command(method.clone(), None, params) {
Ok(call_id) => {
self.pending_commands.insert(
call_id,
(PendingRequest::CreateTarget(tx), method, Instant::now()),
);
}
Err(err) => {
let _ = tx.send(Err(err.into())).ok();
}
},
Err(err) => {
let _ = tx.send(Err(err.into())).ok();
}
}
}
Err(err) => {
let _ = tx.send(Err(err.into())).ok();
}
}
}
fn on_event(&mut self, event: CdpEventMessage) {
if let Some(ref session_id) = event.session_id {
if let Some(session) = self.sessions.get(session_id.as_str()) {
if let Some(target) = self.targets.get_mut(session.target_id()) {
return target.on_event(event);
}
}
}
let CdpEventMessage { params, method, .. } = event;
match params.clone() {
CdpEvent::TargetTargetCreated(ev) => self.on_target_created(ev),
CdpEvent::TargetAttachedToTarget(ev) => self.on_attached_to_target(ev),
CdpEvent::TargetTargetDestroyed(ev) => self.on_target_destroyed(ev),
CdpEvent::TargetDetachedFromTarget(ev) => self.on_detached_from_target(ev),
_ => {}
}
chromiumoxide_cdp::consume_event!(match params {
|ev| self.event_listeners.start_send(ev),
|json| { let _ = self.event_listeners.try_send_custom(&method, json);}
});
}
fn on_target_created(&mut self, event: EventTargetCreated) {
let browser_ctx = match event.target_info.browser_context_id {
Some(ref context_id) => {
let browser_context = BrowserContext {
id: Some(context_id.clone()),
};
if self.default_browser_context.id.is_none() {
self.default_browser_context = browser_context.clone();
};
self.browser_contexts.insert(browser_context.clone());
browser_context
}
_ => event
.target_info
.browser_context_id
.clone()
.map(BrowserContext::from)
.filter(|id| self.browser_contexts.contains(id))
.unwrap_or_else(|| self.default_browser_context.clone()),
};
let target = Target::new(
event.target_info,
TargetConfig {
ignore_https_errors: self.config.ignore_https_errors,
request_timeout: self.config.request_timeout,
viewport: self.config.viewport.clone(),
request_intercept: self.config.request_intercept,
cache_enabled: self.config.cache_enabled,
ignore_visuals: self.config.ignore_visuals,
ignore_stylesheets: self.config.ignore_stylesheets,
ignore_javascript: self.config.ignore_javascript,
ignore_analytics: self.config.ignore_analytics,
extra_headers: self.config.extra_headers.clone(),
only_html: self.config.only_html && self.config.created_first_target,
intercept_manager: self.config.intercept_manager,
},
browser_ctx,
);
self.target_ids.push(target.target_id().clone());
self.targets.insert(target.target_id().clone(), target);
}
fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
let session = Session::new(event.session_id.clone(), event.target_info.target_id);
if let Some(target) = self.targets.get_mut(session.target_id()) {
target.set_session_id(session.session_id().clone())
}
self.sessions.insert(event.session_id, session);
}
fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
if let Some(session) = self.sessions.remove(&event.session_id) {
if let Some(target) = self.targets.get_mut(session.target_id()) {
target.session_id().take();
}
}
}
fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
if let Some(target) = self.targets.remove(&event.target_id) {
if let Some(session) = target.session_id() {
self.sessions.remove(session);
}
}
}
fn evict_timed_out_commands(&mut self, now: Instant) {
let timed_out = self
.pending_commands
.iter()
.filter(|(_, (_, _, timestamp))| now > (*timestamp + self.config.request_timeout))
.map(|(k, _)| *k)
.collect::<Vec<_>>();
for call in timed_out {
if let Some((req, _, _)) = self.pending_commands.remove(&call) {
match req {
PendingRequest::CreateTarget(tx) => {
let _ = tx.send(Err(CdpError::Timeout));
}
PendingRequest::GetTargets(tx) => {
let _ = tx.send(Err(CdpError::Timeout));
}
PendingRequest::Navigate(nav) => {
if let Some(nav) = self.navigations.remove(&nav) {
match nav {
NavigationRequest::Navigate(nav) => {
let _ = nav.tx.send(Err(CdpError::Timeout));
}
}
}
}
PendingRequest::ExternalCommand(tx) => {
let _ = tx.send(Err(CdpError::Timeout));
}
PendingRequest::InternalCommand(_) => {}
PendingRequest::CloseBrowser(tx) => {
let _ = tx.send(Err(CdpError::Timeout));
}
}
}
}
}
pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
&mut self.event_listeners
}
}
impl Stream for Handler {
type Item = Result<()>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pin = self.get_mut();
loop {
let now = Instant::now();
while let Poll::Ready(Some(msg)) = Pin::new(&mut pin.from_browser).poll_next(cx) {
match msg {
HandlerMessage::Command(cmd) => {
pin.submit_external_command(cmd, now)?;
}
HandlerMessage::FetchTargets(tx) => {
pin.submit_fetch_targets(tx, now);
}
HandlerMessage::CloseBrowser(tx) => {
pin.submit_close(tx, now);
}
HandlerMessage::CreatePage(params, tx) => {
pin.create_page(params, tx);
}
HandlerMessage::GetPages(tx) => {
let pages: Vec<_> = pin
.targets
.values_mut()
.filter(|p: &&mut Target| p.is_page())
.filter_map(|target| target.get_or_create_page())
.map(|page| Page::from(page.clone()))
.collect();
let _ = tx.send(pages);
}
HandlerMessage::InsertContext(ctx) => {
if pin.default_browser_context.id.is_none() {
pin.default_browser_context = ctx.clone();
}
pin.browser_contexts.insert(ctx);
}
HandlerMessage::DisposeContext(ctx) => {
pin.browser_contexts.remove(&ctx);
}
HandlerMessage::GetPage(target_id, tx) => {
let page = pin
.targets
.get_mut(&target_id)
.and_then(|target| target.get_or_create_page())
.map(|page| Page::from(page.clone()));
let _ = tx.send(page);
}
HandlerMessage::AddEventListener(req) => {
pin.event_listeners.add_listener(req);
}
}
}
for n in (0..pin.target_ids.len()).rev() {
let target_id = pin.target_ids.swap_remove(n);
if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
while let Some(event) = target.poll(cx, now) {
match event {
TargetEvent::Request(req) => {
let _ = pin.submit_internal_command(
target.target_id().clone(),
req,
now,
);
}
TargetEvent::Command(msg) => {
pin.on_target_message(&mut target, msg, now);
}
TargetEvent::NavigationRequest(id, req) => {
pin.submit_navigation(id, req, now);
}
TargetEvent::NavigationResult(res) => {
pin.on_navigation_lifecycle_completed(res)
}
}
}
target.event_listeners_mut().poll(cx);
pin.event_listeners_mut().poll(cx);
pin.targets.insert(id, target);
pin.target_ids.push(target_id);
}
}
let mut done = true;
while let Poll::Ready(Some(ev)) = Pin::new(&mut pin.conn).poll_next(cx) {
match ev {
Ok(Message::Response(resp)) => {
pin.on_response(resp);
if pin.closing {
return Poll::Ready(None);
}
}
Ok(Message::Event(ev)) => {
pin.on_event(ev);
}
Err(err) => {
tracing::error!("WS Connection error: {:?}", err);
return Poll::Ready(Some(Err(err)));
}
}
done = false;
}
if pin.evict_command_timeout.poll_ready(cx) {
pin.evict_timed_out_commands(now);
}
if done {
return Poll::Pending;
}
}
}
}
#[derive(Debug, Clone)]
pub struct HandlerConfig {
pub ignore_https_errors: bool,
pub viewport: Option<Viewport>,
pub context_ids: Vec<BrowserContextId>,
pub request_timeout: Duration,
pub request_intercept: bool,
pub cache_enabled: bool,
pub ignore_visuals: bool,
pub ignore_stylesheets: bool,
pub ignore_javascript: bool,
pub ignore_analytics: bool,
pub ignore_ads: bool,
pub extra_headers: Option<std::collections::HashMap<String, String>>,
pub only_html: bool,
pub created_first_target: bool,
pub intercept_manager: NetworkInterceptManager,
}
impl Default for HandlerConfig {
fn default() -> Self {
Self {
ignore_https_errors: true,
viewport: Default::default(),
context_ids: Vec::new(),
request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
request_intercept: false,
cache_enabled: true,
ignore_visuals: false,
ignore_stylesheets: false,
ignore_ads: false,
ignore_javascript: false,
ignore_analytics: true,
only_html: false,
extra_headers: Default::default(),
created_first_target: false,
intercept_manager: NetworkInterceptManager::UNKNOWN,
}
}
}
#[derive(Debug)]
pub struct NavigationInProgress<T> {
navigated: bool,
response: Option<Response>,
tx: OneshotSender<T>,
}
impl<T> NavigationInProgress<T> {
fn new(tx: OneshotSender<T>) -> Self {
Self {
navigated: false,
response: None,
tx,
}
}
fn set_response(&mut self, resp: Response) {
self.response = Some(resp);
}
fn set_navigated(&mut self) {
self.navigated = true;
}
}
#[derive(Debug)]
enum NavigationRequest {
Navigate(NavigationInProgress<Result<Response>>),
}
#[derive(Debug)]
enum PendingRequest {
CreateTarget(OneshotSender<Result<Page>>),
GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
Navigate(NavigationId),
ExternalCommand(OneshotSender<Result<Response>>),
InternalCommand(TargetId),
CloseBrowser(OneshotSender<Result<CloseReturns>>),
}
#[derive(Debug)]
pub(crate) enum HandlerMessage {
CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
InsertContext(BrowserContext),
DisposeContext(BrowserContext),
GetPages(OneshotSender<Vec<Page>>),
Command(CommandMessage),
GetPage(TargetId, OneshotSender<Option<Page>>),
AddEventListener(EventListenerRequest),
CloseBrowser(OneshotSender<Result<CloseReturns>>),
}