Peter Cai d4e86cab47 client: try to parse upstream response and skip malformed DNS response
* For some reason, some of our upstreams return malformed DNS response
2021-04-06 07:29:57 +08:00

233 lines
9 KiB

use crate::cache::DnsCache;
use crate::r#override::OverrideResolver;
use domain::{
iana::{Opcode, Rcode},
rdata::{ParseRecordData, UnknownRecordData},
Dname, Message, MessageBuilder, ParsedDname, Question, Record, ToDname,
use js_sys::{ArrayBuffer, Uint8Array};
use wasm_bindgen_futures::JsFuture;
use web_sys::{Headers, Request, RequestInit, Response};
// The DNS client implementation
pub struct Client {
upstream_urls: Vec<String>,
cache: DnsCache,
override_resolver: OverrideResolver,
impl Client {
pub fn new(upstream_urls: Vec<String>, override_resolver: OverrideResolver) -> Client {
Client {
cache: DnsCache::new(),
pub async fn query(
questions: Vec<Question<Dname<Vec<u8>>>>,
) -> Result<Vec<Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>>, String> {
// Attempt to answer locally first
let (mut local_answers, questions) = self.try_answer_from_local(questions).await;
if questions.len() == 0 {
// No remaining questions to be handled. Return directly.
return Ok(local_answers);
let msg = Self::build_query(questions)?;
let upstream = self.select_upstream();
let resp = Self::do_query(&upstream, msg).await?;
match resp.header().rcode() {
Rcode::NoError => {
let mut ret = Self::extract_answers(resp)?;
// Concatenate the cached answers we retrived previously with the newly-fetched answers
ret.append(&mut local_answers);
// NXDOMAIN is not an error we want to retry / panic upon
// It simply means the domain doesn't exist
Rcode::NXDomain => Ok(Vec::new()),
rcode => Err(format!("Server error: {}", rcode)),
pub async fn query_with_retry(
questions: Vec<Question<Dname<Vec<u8>>>>,
retries: usize,
) -> Result<Vec<Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>>, String> {
let mut last_res = Err("Dummy".to_string());
for _ in 0..retries {
last_res = self.query(questions.clone()).await;
if last_res.is_ok() {
return last_res;
// Select an upstream randomly
fn select_upstream(&self) -> String {
let idx = crate::util::random_range(0, self.upstream_urls.len() as u16);
self.upstream_urls[idx as usize].clone()
// Build UDP wireformat query from a list of questions
// We don't use the client's query directly because we want to validate
// it first, and we also want to be able to do caching and overriding
fn build_query(questions: Vec<Question<Dname<Vec<u8>>>>) -> Result<Message<Vec<u8>>, String> {
let mut builder = MessageBuilder::new_vec();
// Set up the header
let header = builder.header_mut();
// We don't use set_random_id because `getrandom` seems to be
// unreliable on Cloudflare Workers for some reason
header.set_id(crate::util::random_range(0, u16::MAX));
header.set_qr(false); // For queries, QR = false
header.set_rd(true); // Ask for recursive queries
// Set up the questions
let mut question_builder = builder.question();
for q in questions {
.map_err(|_| "Size limit exceeded".to_string())?;
async fn do_query(upstream: &str, msg: Message<Vec<u8>>) -> Result<Message<Vec<u8>>, String> {
let body = Uint8Array::from(msg.as_slice());
let headers = Headers::new().map_err(|_| "Could not create headers".to_string())?;
.append("Accept", "application/dns-message")
.map_err(|_| "Could not append header".to_string())?;
.append("Content-Type", "application/dns-message")
.map_err(|_| "Could not append header".to_string())?;
let mut request_init = RequestInit::new();
let request = Request::new_with_str_and_init(upstream, &request_init)
.map_err(|_| "Failed to create Request object".to_string())?;
let resp: Response = crate::util::fetch_rs(&request)
.map_err(|_| "Upstream request error".to_string())?
if resp.status() != 200 {
return Err(format!("Unknown response status {}", resp.status()));
let resp_body = resp
.map_err(|_| "Cannot get body".to_string())?;
let resp_body: ArrayBuffer = JsFuture::from(resp_body)
.map_err(|_| "Failure receiving response body".to_string())?
fn extract_answers(
msg: Message<Vec<u8>>,
) -> Result<Vec<Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>>, String> {
let answer_section = msg
.map_err(|_| "Failed to parse DNS answer from upstream".to_string())?;
// Answers can be empty; that is when upstream has no records for the questions
// so we don't need to error out here if answers are empty
// this is different from the server impl
let answers: Vec<_> = answer_section.collect();
let mut ret: Vec<Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>> = Vec::new();
for a in answers {
let parsed_record = a.map_err(|_| "Failed to parse DNS answer record".to_string())?;
// Use UnknownRecordData here because we don't really care about the actual type of the record
// It saves time and saves sanity (because of the type signature of AllRecordData)
let record: Record<ParsedDname<&Vec<u8>>, UnknownRecordData<&[u8]>> = parsed_record
.map_err(|_| "Cannot parse record".to_string())?
.ok_or("Cannot parse record".to_string())?;
// Convert everything to owned for sanity in type signature...
// We'll need to do a copy before returning outside of the main
// query function anyway
let owned_record = Record::new(
.map_err(|_| "Failed to parse Dname".to_string())?,
// Try to parse the record; if failed, fail this entire query
let parsed_record_data: Result<Option<AllRecordData<&[u8], ParsedDname<&[u8]>>>, _> =
&mut Parser::from_ref(,
if let Err(_) = parsed_record_data {
return Err("Failed to parse response from upstream".to_string());
} else if let Ok(None) = parsed_record_data {
return Err("Upstream did not respond our query".to_string());
// Try to answer the questions as much as we can from the cache / override map
// returns the available answers, and the remaining questions that cannot be
// answered from cache or the override resolver
async fn try_answer_from_local(
questions: Vec<Question<Dname<Vec<u8>>>>,
) -> (
Vec<Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>>,
) {
let mut answers = Vec::new();
let mut remaining = Vec::new();
for q in questions {
if let Some(ans) = self.override_resolver.try_resolve(&q) {
// Try to resolve from override map first
} else if let Some(mut ans) = self.cache.get_cache(&q).await {
// Then try cache
answers.append(&mut ans);
} else {
// If both failed, resolve via upstream
(answers, remaining)
async fn cache_answers(&self, answers: &[Record<Dname<Vec<u8>>, UnknownRecordData<Vec<u8>>>]) {
for a in answers {
// Ignore error -- we don't really care