streamMessage method

  1. @override
Stream<String> streamMessage({
  1. required String systemPrompt,
  2. required List<Map<String, String>> messages,
  3. int maxTokens = 8192,
})
override

Stream a response from the Claude Messages API. Yields text deltas as they arrive via SSE.

Implementation

@override
Stream<String> streamMessage({
  required String systemPrompt,
  required List<Map<String, String>> messages,
  int maxTokens = 8192,
}) async* {
  final apiKey = await _getApiKey();
  if (apiKey == null || apiKey.isEmpty) {
    throw Exception('Claude API key is not configured.');
  }

  final model = await _getModel();

  dev.log(
    'Claude API stream request: model=$model, '
    'systemPrompt=${systemPrompt.length} chars, '
    'messages=${messages.length}',
    name: 'ClaudeDatasource',
  );

  final response = await _dio.post<ResponseBody>(
    _provider.apiUrl,
    options: Options(
      headers: {
        'x-api-key': apiKey,
        'anthropic-version': AiProviderConfig.claudeApiVersion,
        'content-type': 'application/json',
      },
      responseType: ResponseType.stream,
    ),
    data: {
      'model': model,
      'max_tokens': maxTokens,
      'system': [
        {
          'type': 'text',
          'text': systemPrompt,
          'cache_control': {'type': 'ephemeral'},
        },
      ],
      'messages': messages,
      'stream': true,
    },
  );

  final stream = response.data?.stream;
  if (stream == null) {
    throw Exception('No stream in Claude API response.');
  }

  // Parse SSE events from the byte stream.
  String buffer = '';

  await for (final chunk in stream) {
    buffer += utf8.decode(chunk);

    // SSE events are separated by double newlines.
    while (buffer.contains('\n\n')) {
      final eventEnd = buffer.indexOf('\n\n');
      final eventBlock = buffer.substring(0, eventEnd);
      buffer = buffer.substring(eventEnd + 2);

      final dataLine = _extractSseData(eventBlock);
      if (dataLine == null) continue;

      // Parse the JSON payload.
      try {
        final json = jsonDecode(dataLine) as Map<String, dynamic>;
        final type = json['type'] as String?;

        if (type == 'content_block_delta') {
          final delta = json['delta'] as Map<String, dynamic>?;
          if (delta != null && delta['type'] == 'text_delta') {
            final text = delta['text'] as String?;
            if (text != null) {
              yield text;
            }
          }
        }
        // message_stop and other events are silently ignored.
      } catch (_) {
        // Skip malformed JSON lines (e.g. "[DONE]").
      }
    }
  }
}